import java.util.Queue;0) в коде могут быть ошибки
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
public class LockFreeBlockingStack {
private final AtomicReference head = new AtomicReference(null);
private final QueuepopWaiters = new ConcurrentLinkedQueue ();
public void push(int data) {
for (; ;) {
Node next = (Node) head.get();
Node myNode = new Node(data, next);
if (head.compareAndSet(next, myNode)) {
LockSupport.unpark(popWaiters.peek());
return;
}
}
}
public int pop() {
for (; ;) {
Node currHead = (Node) head.get();
if (currHead == null) {
boolean wasInterrupted = false;
Thread current = Thread.currentThread();
popWaiters.add(current);
while (popWaiters.peek() != current || (currHead = (Node) head.get()) == null) {
LockSupport.park(this);
if (Thread.interrupted()) {// ignore interrupts while waiting
wasInterrupted = true;
}
}
popWaiters.remove();
if (wasInterrupted) { // reassert interrupt status on exit
current.interrupt();
}
}
if (head.compareAndSet(currHead, currHead.next)) {
return currHead.data;
}
}
}
private static class Node {
private final int data;
private final Node next;
public Node(int data, Node next) {
this.data = data;
this.next = next;
}
}
}
1) кстати, пример использования java.util.concurrent.LockSupport
2) хотя очередь popWaiters FIFO итоговая конструкция вышла non-fair практически аналогично рассказываемой мной на лекции причины нечестности synchronized секции, а именно в pop()
Node currHead = (Node) head.get();
if (currHead == null) {
т.е. сразу лезем получить данные и только если их нет пойдем вставать в очередь
popWaiters. Если же кто-то вызвал pop()
if (head.compareAndSet(next, myNode)) {
LockSupport.unpark(popWaiters.peek());
return;
т.е. положил данные и разбудил спящего в очереди, но проснувшегося мог опередить новый поток вызвавший pop и получить данное в обход очереди.
3) думаю данный функционал возможно реализовать также на Thread.suspend()/resume(). Надо подумать.
4) Это решение рождено в процессе анализа этого кода
class FIFOMutex {
private final AtomicBoolean locked = new AtomicBoolean(false);
private final Queuewaiters
= new ConcurrentLinkedQueue();
public void lock() {
boolean wasInterrupted = false;
Thread current = Thread.currentThread();
waiters.add(current);
// Block while not first in queue or cannot acquire lock
while (waiters.peek() != current ||
!locked.compareAndSet(false, true)) {
LockSupport.park(this);
if (Thread.interrupted()) // ignore interrupts while waiting
wasInterrupted = true;
}
waiters.remove();
if (wasInterrupted) // reassert interrupt status on exit
current.interrupt();
}
public void unlock() {
locked.set(false);
LockSupport.unpark(waiters.peek());
}
}
из javadoc к LockSupport
5) Дмитрий, Вы данное решение имели в виду? Или есть другое?
Да, я имел в виду, что-то типа такого. Основное условие, которое я бы выдвинул (помимо корректной работы естественно) - это минимальный оверхед в случае, когда данные в стеке есть. Тут это выполняется - потребитель вообще ничего не делает в этом случае, а производитель видит, что popWaiters пустой и тоже ничего не делает.
ОтветитьУдалитьПо поводу fairness, так это мне кажется и к лучшему. Это 2 разных класса примитивов: best-effort (performance/scalability) и QOS (fair). Причём обычно более интересно best-effort.
ОтветитьУдалитьПо поводу корректности, тут по-моему может быть следующая ситуация. 2 потока заблокированы (добавлены в popWaiters и припаркованы). Производитель добавляет 2 элемента, причём оба раза он нотифицирует одного потребителя, т.к. он ещё не успел себя убрать из popWaiters. В итоге в стеке лежит элемент, а второй потребитель остаётся заблокированным.
ОтветитьУдалитьДа, действительно, классический "lost signal" в чистом виде. Попробовал избавится за час - не вышло. Надо хорошенько подумать.
ОтветитьУдалитьОперации AtomicReference.compareAndSet(), LockSupport.park()/unpark(), ConcurrentLinkedQueue.peek()/remove()/add() - сами по себе "атомарны", но связки - уже нет. А без связок - никак.
ОтветитьУдалитьЯ думаю, что-то типа такого должно сработать:
ОтветитьУдалитьpublic class LockFreeBlockingStack {
private final AtomicReference head = new AtomicReference(null);
private final AtomicInteger epoch = new AtomicInteger(0);
public void push(int data) {
for (; ;) {
Node next = (Node) head.get();
Node myNode = new Node(data, next);
if (head.compareAndSet(next, myNode)) {
if (epoch.get() & 1)
doNotify();
return;
}
}
}
public int pop() {
for (; ;) {
Node currHead = (Node) head.get();
while (currHead == null) {
int key;
do {
key = epoch.get();
} while (!epoch.compareAndSet(key, key | 1));
currHead = (Node) head.get();
if (currHead)
break;
doWait(key);
currHead = (Node) head.get();
}
}
if (head.compareAndSet(currHead, currHead.next)) {
return currHead.data;
}
}
}
private synchronized void doNotify()
{
do {
int key = epoch.get();
} while (false == epoch.compareAndSet(key, (key + 2) & ~1));
notifyAll();
}
private synchronized void doWait(int key)
{
while ((key & ~1) == (epoch.get() & ~1))
wait();
}
private static class Node {
private final int data;
private final Node next;
public Node(int data, Node next) {
this.data = data;
this.next = next;
}
}
}
Дмитрий, ты пробывал етот код компилировать ? Он не компилируется, при чем есть unreachable code.
ОтветитьУдалитьНет, не пробовал, у меня не стоит компилятор. А какие ошибки выдаются?
ОтветитьУдалить@Dmitry Vyukov
ОтветитьУдалить@Golovach Ivan
>По поводу корректности, тут по-моему может >быть следующая ситуация. 2 потока >заблокированы (добавлены в popWaiters и >припаркованы). Производитель добавляет 2 >элемента, причём оба раза он нотифицирует >одного потребителя, т.к. он ещё не успел себя >убрать из popWaiters. В итоге в стеке лежит >элемент, а второй потребитель остаётся >заблокированным.
>Да, действительно, классический "lost signal" >в чистом виде.
Камрадс, в самом javadoc-е LockSupport-а, в коде, кот. стал inspiration-ом для Ивана есть то, что вы называете "lost signal", а я - "check-then-act sequence". Смотрите сами:
class FIFOMutex{
...
public void lock() {
while (!locked.compareAndSet(false, true)){
LockSupport.park();
...
}
...
}
public void unlock() {
locked.set(false);
...
}
}
th-0 проходит проверку "!locked.compareAndSet(false, true)" в lock()-e,
и тут вдруг th-1 получает процессорное время и делает "locked.set(false)" в unlock()-e; и теперь у th-0 неприятность.
Можно добавить LockSupport.unpark(popWaiters.peek());
ОтветитьУдалитьперед return в этом кусочке кода
if (head.compareAndSet(currHead, currHead.next)) {
return currHead.data;
}
тогда получивший элемент из очереди поток разбудит другой и т.д.