вторник, 29 сентября 2009 г.

Lock-free BlockingStack

Дмитрий Вьюков в комментах спрашивает "А слабо к этому стеку прикрутить блокирующую семантику, что бы pop() на пустом стеке блокировался до появления элемента, и так что бы стек оставался lock-free пока есть элементы?". Отвечаю - не слабо:

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;

public class LockFreeBlockingStack {
private final AtomicReference head = new AtomicReference(null);
private final Queue popWaiters = new ConcurrentLinkedQueue();

public void push(int data) {
for (; ;) {
Node next = (Node) head.get();
Node myNode = new Node(data, next);
if (head.compareAndSet(next, myNode)) {
LockSupport.unpark(popWaiters.peek());
return;
}
}
}

public int pop() {
for (; ;) {
Node currHead = (Node) head.get();
if (currHead == null) {
boolean wasInterrupted = false;
Thread current = Thread.currentThread();
popWaiters.add(current);

while (popWaiters.peek() != current || (currHead = (Node) head.get()) == null) {
LockSupport.park(this);
if (Thread.interrupted()) {// ignore interrupts while waiting
wasInterrupted = true;
}
}

popWaiters.remove();
if (wasInterrupted) { // reassert interrupt status on exit
current.interrupt();
}
}
if (head.compareAndSet(currHead, currHead.next)) {
return currHead.data;
}
}
}

private static class Node {
private final int data;
private final Node next;

public Node(int data, Node next) {
this.data = data;
this.next = next;
}
}
}
0) в коде могут быть ошибки
1) кстати, пример использования java.util.concurrent.LockSupport
2) хотя очередь popWaiters FIFO итоговая конструкция вышла non-fair практически аналогично рассказываемой мной на лекции причины нечестности synchronized секции, а именно в pop()
Node currHead = (Node) head.get();
if (currHead == null) {
т.е. сразу лезем получить данные и только если их нет пойдем вставать в очередь
popWaiters. Если же кто-то вызвал pop()
if (head.compareAndSet(next, myNode)) {
LockSupport.unpark(popWaiters.peek());
return;
т.е. положил данные и разбудил спящего в очереди, но проснувшегося мог опередить новый поток вызвавший pop и получить данное в обход очереди.
3) думаю данный функционал возможно реализовать также на Thread.suspend()/resume(). Надо подумать.
4) Это решение рождено в процессе анализа этого кода
class FIFOMutex {
private final AtomicBoolean locked = new AtomicBoolean(false);
private final Queue waiters
= new ConcurrentLinkedQueue();

public void lock() {
boolean wasInterrupted = false;
Thread current = Thread.currentThread();
waiters.add(current);

// Block while not first in queue or cannot acquire lock
while (waiters.peek() != current ||
!locked.compareAndSet(false, true)) {
LockSupport.park(this);
if (Thread.interrupted()) // ignore interrupts while waiting
wasInterrupted = true;
}

waiters.remove();
if (wasInterrupted) // reassert interrupt status on exit
current.interrupt();
}

public void unlock() {
locked.set(false);
LockSupport.unpark(waiters.peek());
}
}

из javadoc к LockSupport
5) Дмитрий, Вы данное решение имели в виду? Или есть другое?

10 комментариев:

  1. Да, я имел в виду, что-то типа такого. Основное условие, которое я бы выдвинул (помимо корректной работы естественно) - это минимальный оверхед в случае, когда данные в стеке есть. Тут это выполняется - потребитель вообще ничего не делает в этом случае, а производитель видит, что popWaiters пустой и тоже ничего не делает.

    ОтветитьУдалить
  2. По поводу fairness, так это мне кажется и к лучшему. Это 2 разных класса примитивов: best-effort (performance/scalability) и QOS (fair). Причём обычно более интересно best-effort.

    ОтветитьУдалить
  3. По поводу корректности, тут по-моему может быть следующая ситуация. 2 потока заблокированы (добавлены в popWaiters и припаркованы). Производитель добавляет 2 элемента, причём оба раза он нотифицирует одного потребителя, т.к. он ещё не успел себя убрать из popWaiters. В итоге в стеке лежит элемент, а второй потребитель остаётся заблокированным.

    ОтветитьУдалить
  4. Да, действительно, классический "lost signal" в чистом виде. Попробовал избавится за час - не вышло. Надо хорошенько подумать.

    ОтветитьУдалить
  5. Операции AtomicReference.compareAndSet(), LockSupport.park()/unpark(), ConcurrentLinkedQueue.peek()/remove()/add() - сами по себе "атомарны", но связки - уже нет. А без связок - никак.

    ОтветитьУдалить
  6. Я думаю, что-то типа такого должно сработать:

    public class LockFreeBlockingStack {
    private final AtomicReference head = new AtomicReference(null);
    private final AtomicInteger epoch = new AtomicInteger(0);

    public void push(int data) {
    for (; ;) {
    Node next = (Node) head.get();
    Node myNode = new Node(data, next);
    if (head.compareAndSet(next, myNode)) {
    if (epoch.get() & 1)
    doNotify();
    return;
    }
    }
    }

    public int pop() {
    for (; ;) {
    Node currHead = (Node) head.get();
    while (currHead == null) {
    int key;
    do {
    key = epoch.get();
    } while (!epoch.compareAndSet(key, key | 1));
    currHead = (Node) head.get();
    if (currHead)
    break;
    doWait(key);
    currHead = (Node) head.get();
    }
    }
    if (head.compareAndSet(currHead, currHead.next)) {
    return currHead.data;
    }
    }
    }

    private synchronized void doNotify()
    {
    do {
    int key = epoch.get();
    } while (false == epoch.compareAndSet(key, (key + 2) & ~1));
    notifyAll();
    }

    private synchronized void doWait(int key)
    {
    while ((key & ~1) == (epoch.get() & ~1))
    wait();
    }

    private static class Node {
    private final int data;
    private final Node next;

    public Node(int data, Node next) {
    this.data = data;
    this.next = next;
    }
    }
    }

    ОтветитьУдалить
  7. Дмитрий, ты пробывал етот код компилировать ? Он не компилируется, при чем есть unreachable code.

    ОтветитьУдалить
  8. Нет, не пробовал, у меня не стоит компилятор. А какие ошибки выдаются?

    ОтветитьУдалить
  9. @Dmitry Vyukov
    @Golovach Ivan
    >По поводу корректности, тут по-моему может >быть следующая ситуация. 2 потока >заблокированы (добавлены в popWaiters и >припаркованы). Производитель добавляет 2 >элемента, причём оба раза он нотифицирует >одного потребителя, т.к. он ещё не успел себя >убрать из popWaiters. В итоге в стеке лежит >элемент, а второй потребитель остаётся >заблокированным.

    >Да, действительно, классический "lost signal" >в чистом виде.

    Камрадс, в самом javadoc-е LockSupport-а, в коде, кот. стал inspiration-ом для Ивана есть то, что вы называете "lost signal", а я - "check-then-act sequence". Смотрите сами:

    class FIFOMutex{
    ...
    public void lock() {
    while (!locked.compareAndSet(false, true)){
    LockSupport.park();
    ...
    }
    ...
    }
    public void unlock() {
    locked.set(false);
    ...
    }
    }

    th-0 проходит проверку "!locked.compareAndSet(false, true)" в lock()-e,
    и тут вдруг th-1 получает процессорное время и делает "locked.set(false)" в unlock()-e; и теперь у th-0 неприятность.

    ОтветитьУдалить
  10. Можно добавить LockSupport.unpark(popWaiters.peek());
    перед return в этом кусочке кода
    if (head.compareAndSet(currHead, currHead.next)) {
    return currHead.data;
    }
    тогда получивший элемент из очереди поток разбудит другой и т.д.

    ОтветитьУдалить