1) с использованием synchronized/wait()/notifyAll()
2) с использованием Lock/Condition
3) тест
Обе реализации получены модификацией исходного кода класса java.util.concurrent.ArrayBlockingQueue.
1) с использованием synchronized/wait()/notifyAll()
public class BoundedBufferSync {
private final int[] items;
private int getIndex = 0;
private int putIndex = 0;
private int count = 0;
public BoundedBufferSync(int size) {
this.items = new int[size];
}
public synchronized void put(int item) throws InterruptedException {
while (count == items.length) {
this.wait();
}
items[putIndex] = item;
putIndex = inc(putIndex);
++count;
this.notifyAll();
}
public synchronized int get() throws InterruptedException {
while (count == 0) {
this.wait();
}
int x = items[getIndex];
getIndex = inc(getIndex);
--count;
this.notifyAll();
return x;
}
/**
* Circularly increment i.
*/
private int inc(int i) {
return (++i == items.length) ? 0 : i;
}
}
2) с использованием Lock/Condition
public class BoundedBufferLock {
private final int[] items;
private int getIndex = 0;
private int putIndex = 0;
private int count = 0;
private final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
public BoundedBufferLock(int size, boolean fair) {
items = new int[size];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public void put(int item) throws InterruptedException {
lock.lockInterruptibly();
try {
try {
while (count == items.length) {
notFull.await();
}
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
items[putIndex] = item;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public int get() throws InterruptedException {
lock.lockInterruptibly();
try {
try {
while (count == 0) {
notEmpty.await();
}
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
int x = items[getIndex];
getIndex = inc(getIndex);
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
/**
* Circularly increment i.
*/
private int inc(int i) {
return (++i == items.length) ? 0 : i;
}
}
3) тест
public class BBTest {
public static void main(String[] args) {
// bounded buffer
final BoundedBufferSync buffer = new BoundedBufferSync(8);
// Producer
new Thread() {
public void run() {
int counter = 0;
while (true) {
try {
buffer.put(counter);
System.out.println("putter: i put " + counter);
Thread.sleep(100);
} catch (InterruptedException e) {}
counter++;
}
}
}.start();
// Consumer
new Thread() {
public void run() {
while (true) {
int result = 0;
try {
result = buffer.get();
System.out.println("getter: i get " + result);
} catch (InterruptedException e) {}
}
}
}.start();
}
}
Комментариев нет:
Отправить комментарий