5 非阻塞同步

Wu Jun 2018-12-18 21:52:58
05 Java > 00 Java 基础 > 07 并发

1 锁的劣势

2 硬件对并发的支持

几乎所有的现代处理器中都包含了某种形式的原子读-改-写指令,例如比较并交换(Compare-and-Swap, CAS)或者关联加载/条件存储(Load-Linked/Store-Conditional)

2.1 比较并交换 CAS

CAS 通常配合 volatile 使用,以达到非阻塞锁的作用。

2.2 非阻塞的计数器

2.3 JVM 对 CAS 的支持

在支持 CAS 的平台上,运行时把它们编译为相应的(多条)机器指令。在最坏的情况下,如果不支持 CAS 指令,那么 JVM 将使用自旋锁。

3 原子变量类

在 java.util.concurrent.atomic 包中有许多基于 CAS 实现的原子包装器类。共有 12 个原子变量类,可分为 4 组:标量类(Scalar)、更新器类、数组类以及复合变量类。

原子数组类中的元素可以实现原子更新。volatile 类型的数组仅在数组引用上具有 volatile 语义,而在其元素上则没有。

原子变量类未重写 equals 和 hashCode 方法,每个实例都是不同的,不宜用作基于散列容器中的键值。

3.1 原子变量是一种“更好的 volatile”

原子变量类相当于一种泛化的 volatile 变量,能够支持原子的和有条件的读一改一写操作。

可以通过 CAS 来维持包含多个变量的不变性条件,避免竞态条件。

@ThreadSafe
public class CasNumberRange {
    @Immutable
    private static class IntPair {
        // INVARIANT: lower <= upper
        final int lower;
        final int upper;
        ...
    }

    private final AtomicReference<IntPair> values =
            new AtomicReference<>(new IntPair(0, 0));

    public void setLower(int i) {
        while (true) {
            IntPair oldv = values.get();
            if (i > oldv.upper)
                throw new IllegalArgumentException("Can't set lower to " + i + " > upper");
            IntPair newv = new IntPair(i, oldv.upper);
            if (values.compareAndSet(oldv, newv))
                return;
        }
    }
    ...
}

3.2 性能比较: 锁与原子变量

3.3 竞态条件

最常见的竞态条件就是“先检查后执行”。如某个线程的操作插入到另一个线程的检查和执行操作之间。

3.4 Java SE 8

如果认为可能存在大量竞争, 只需要使用 LongAdder 而不是 AtomicLong。

LongAdder 包括多个变量(加数),其总和为当前值。可以有多个线程更新不同的加数,线程个数增加时会自动提供新的加数。

final LongAdder adder = new LongAdder();
for(...)
    pool.submit(() -> {
        while(...) {
            // ...
            if (...) adder.increment();
        }
    });
...
long total = adder.sum();

LongAccumulator 将这种思想推广到任意的累加操作。在构造函数中,可以提供这个操作以及它的零元素。要加入新的值,可以调用 accumulate。调用 get 来获得当前值。

LongAccumulator adder = new LongAccumulator(Long::sum, 0);
// In some thread ...
adder.accumulate(value);

另外 DoubleAdder 和 DoubleAccumulator 也采用同样的方式,只不过处理的是 double 值。

4 非阻塞算法

一个线程的失败或挂起不会导致其它线程也失败或挂起,这种算法称为非阻塞算法。

如果在算法的每个步骤中都存在某个线程能够执行下去,那么这种算法也被称为无锁算法。

4.1 非阻塞的栈

在实现相同功能的前提下,非阻塞算法通常比基于锁的算法更为复杂。创建非阻塞算法的关键在于,找出如何将原子修改的范围缩小到单个变量上,同时还要维护数据的一致性。

@ThreadSafe
public class ConcurrentStack<E> {
    private final AtomicReference<Node<E>> top = new AtomicReference<Node<E>>();

    public void push(E item) {
        Node<E> newHead = new Node<E>(item);
        Node<E> oldHead;
        do {
            oldHead = top.get();
            newHead.next = oldHead;
        } while (!top.compareAndSet(oldHead, newHead));
    }

    public E pop() {
        Node<E> oldHead;
        Node<E> newHead;
        do {
            oldHead = top.get();
            if (oldHead == null)
                return null;
            newHead = oldHead.next;
        } while (!top.compareAndSet(oldHead, newHead));
        return oldHead.item;
    }

    private static class Node<E> {
        public final E item;
        public Node<E> next;

        public Node(E item) {
            this.item = item;
        }
    }
}

4.2 非阻塞的链表

CAS 的基本使用模式:在更新某个值时存在不确定性,以及在更新失败时重新尝试。

构建非阻塞算法的技巧在于:将执行原子修改的范围缩小到单个变量上。

下方是在 ConcurrentLinkedQueue 中使用的非阻塞链接队列算法中的插入部分

@ThreadSafe
public class LinkedQueue<E> {

    private static class Node<E> {
        final E item;
        final AtomicReference<Node<E>> next;

        public Node(E item, Node<E> next) {
            this.item = item;
            this.next = new AtomicReference<>(next);
        }
    }

    private final Node<E> dummy = new Node<>(null, null);
    private final AtomicReference<Node<E>> head = new AtomicReference<>(dummy);
    private final AtomicReference<Node<E>> tail = new AtomicReference<>(dummy);

    public boolean put(E item) {
        final Node<E> newNode = new Node<>(item, null);
        while (true) {
            Node<E> curTail = tail.get();
            Node<E> tailNext = curTail.next.get();
            if (curTail == tail.get()) {
                if (tailNext != null)
                    // Queue in intermediate state, advance tail
                    tail.compareAndSet(curTail, tailNext);
                else {
                    // In quiescent state, try inserting new node
                    if (curTail.next.compareAndSet(null, newNode)) {
                        // Insertion succeeded, try advancing tail
                        tail.compareAndSet(curTail, newNode);
                        return true;
                    }
                }
            }
        }
    }
}

4.3 原子的域更新器

在 ConcurrentLinkedQueue 实际的实现中中没有使用原子引用来表示每个 Node,而是使用普通的 volatile 类型引用,并通过基于反射的 AtomicReferenceFieldUpdater 来进行更新。

private static class Node<E> {
    private volatile E item;
    private volatile Node<E> next;

    public Node(E item) { 
        thiis.item = item; 
    }
}

private static final AtomicReferenceFieldUpdater<Node, Node> nextUpdater =
        AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "next");

原子的域更新器类表示现有 volatile 域的一种基于反射的“视图”,从而能够在已有的 volatile 域上使用 CAS。

4.4 ABA 问题

ABA 问题是一种异常现象:如果在算法中的节点可以被循环使用,那么在使用“比较并交换"指令时就可能出现这种问题:A 变为 B 又变为 A。

相对简单的解决方案:不是更新某个引用的值,而是更新两个值,包括一个引用和一个版本号。

可使用 java.util.concurrent.atomic.AtomicStampedReference(或 AtomicMarkableReference)解决。

5 线程封闭

线程封闭:将对象封闭在一个线程中时,自动实现了线程安全性,免去了线程同步和协调的开销。

常见应用:JDBC 的 Connection 对象。

5.1 Ad-hoc 线程封闭

5.2 栈封闭

5.3 ThreadLocl 类

1)原理

每个 Thread 维护一个 ThreadLocalMap 映射表,这个映射表的 key 是 ThreadLocal 实例本身, value 是真正需要存储的 Object 。

ThreadLocal 本身并不存储值,它只是作为一个 key 来让线程从 ThreadLocalMap 获取 value 。

ThreadLocalMap 中的key使用了弱引用

2)建议

6 不变性

不可变的对象一定是线程安全的

6.1 final 域

共享域声明为 final 时,也可以安全地访问。

6.2 volatile 对象

6.3 无状态对象

不包含域,也不包含与其他类中域的引用。一定是线程安全的。