当前位置:首页 > 开发 > 编程语言 > 编程 > 正文

阻塞队列实现

发表于: 2012-07-12   作者:cuiliwei   来源:转载   浏览:
摘要: 实现代码: import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class BlockingQuery { private Object[] item; private int takeIndex, putIndex,
实现代码:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class BlockingQuery {

private Object[] item;

private int takeIndex, putIndex, count;

private final ReentrantLock lock = new ReentrantLock();

private Condition notFull = lock.newCondition();

private Condition notEmpty = lock.newCondition();

BlockingQuery(int cap) {
if (cap <= 0) {
throw new IllegalArgumentException("init error");
}
item = new Object[cap];
}

public void put(Object x) throws InterruptedException {

lock.lock();
try {
while (count == item.length) {

System.out.println("current count == "+ count + Thread.currentThread().getName()+ " is waiting to put  ....");

notFull.await();
}
item[putIndex] = x;
++count;
if (++putIndex == item.length) {
putIndex = 0;
}
System.out.println("current count == "+ count + " is signal others to take  ....");
notEmpty.signal();

} finally {
lock.unlock();
}

}

public Object take() throws InterruptedException {

Object obj = null;
lock.lock();
try {
while (count == 0) {

System.out.println("current count == "+ count + Thread.currentThread().getName()+ " is waiting to take  ....");

notEmpty.await();
}
obj = item[takeIndex];
--count;
if (++takeIndex == item.length) {
takeIndex = 0;
}
System.out.println("current count == "+ count + " is signal others to put  ....");
notFull.signal();
return obj;
} finally {
            lock.unlock();
}

}

}

测试代码:

public class BlockingTest {

public static void main(String[] args) {

final BlockingQuery query = new BlockingQuery(10);
//final BlockingQueue<Object> query = new ArrayBlockingQueue<Object>(10);
/**
* 同时启动1000个线程放对象
*
*/
for (int i = 0; i < 1000; i++) {
final int seq = i;
new Thread(new Runnable() {

@Override
public void run() {

try {
query.put("INDEX==" + seq);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();

}

/**
*
* 同时启动1000个线程读取数据
*
*/
for (int i = 0; i < 1000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
                    try {
Object obj = query.take();
                        System.out.println(obj.toString());
                    } catch (InterruptedException e) {
e.printStackTrace();
}
                   
}
}

).start();

}

}

}

阻塞队列实现

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号