Java并发编程(07)--锁

锁在并发编程中可以说是必须使用的一种资源保护技术, 而且是一种及其庞大且复杂的技术, 随着计算机学科的发展, 锁技术同样的在不断的发展和优化。 作为一个开发者, 当然无法直接深入到CPU的最底层去理解锁的原理, 但是通过一些其它的手段可以间接的理解锁的原理以及运行机制。 本篇文章进行一个现阶段的总结, 整理一下所学的全部关于锁的知识, 涉及的编程语言包括PythonJava, 应用包括MySQL以及Redis, 如有错误, 还请指正。

1. 锁的分类

在完整的学习各种锁之前, 有必要对锁进行一个分类:

1)按是否可重入划分: 可重入锁, 非重入锁

2)按是否排他划分: 排它锁, 共享锁

3)按是否公平划分: 公平锁, 非公平锁

4)按是否需要显式加锁划分: 显式锁, 隐式锁

5)按是否使用锁划分: 乐观锁, 悲观锁

6)按系统状态划分: 单机锁, 分布式锁

每一个划分的标准都可以代表一个使用场景, 一个锁的实现必定会有上述分类特征, 例如可重入排他隐式锁, 可重入排他显式锁, 等等。

2. synchronized

synchronized关键字在开发过程中可以说是使用最为频繁的可重入排他隐式锁了, 为我们提供了非常强的线程安全性以及线程变量可见性。

需要特别注意的是, synchronized是一个关键字, 不是一个锁, 其作用是为我们生成一个锁。 而Java中每一个对象都可以作为锁: 1)对于普通的成员同步方法, 锁即是当前的实例对象 2)对于静态同步方法, 锁是当前类的Class对象 3)对于同步的部分代码段, 锁是synchronized括号中所指定的对象。

所以, 如果我们需要使用多线程并且保证线程安全性的去运行某一个任务时, 传入到多个线程的对象实例必须是同一个。

此外, 使用synchronized关键字所创建的锁具有可重入性, 这一点在开发过程中同样尤为重要。 在一个需要线程同步的方法中调用另一个需要线程同步的方法非常常见, 可重入锁提供了这样的支持。

class SynchronizedReentrant {

    private int count = 0;

    public synchronized int get() { return count; }

    public synchronized void set(int value) {
        int count = get();
        this.count = count + value;
    }

    public static void main(String[] args) {
        SynchronizedReentrant demo = new SynchronizedReentrant();

        System.out.println(demo.get());
        demo.set(10);
        System.out.println(demo.get());
    }
}

上面儿的代码不是很好的例子, 但是能说明synchronized的可重入性。 简单来说, 可重入锁通常会在递归调用以及线程在同步方法中调用对象的其它同步方法中进行使用, 其目的是避免死锁, 提供更大的灵活性。

2.1 synchronized原理

由于synchronized所生成的锁为隐式锁, 所以只能通过javap来反编码.class文件来获得JVM字节码来进行查看, 写一个最简单的demo:

class SynchronizedReentrant {

    private int count = 0;

    public synchronized int getCount() { return count; }

    public void doSomething() {
        synchronized (this) {
            System.out.println("start");
        }
    }

    public static void main(String[] args) { }
}

使用javap对其进行反编译:

javap -c SynchronizedReentrant

查看反编码之后的结果, 会发现在doSomething方法中有monitorentermonitorexit, 这个其实就是获取锁以及释放锁的过程。 而在getCount同步方法中却没有看到这样的指令对。

实际上, JVM会基于进入和退出Monitor对象来实现方法同步和代码块同步, 但是这两者之间的实现细节有所差别。 代码块同步是使用monitorentermonitorexit指令实现, 而方法同步JVM并没有说明, 但是可以使用这两个方法来进行实现。

锁其实是存在于Java对象头中, 保存着锁信息以及线程信息, 用于保证不同的线程不会持有同一个对象的锁。

3. ReentrantLock

synchronized关键字使用虽然很方便, 但是灵活性会差一些。 比如无法定义等待锁的超时时间, 这样可能会发生无期限的等待锁的获取, 造成系统阻塞。 ReentrantLock提供了和synchronized相似的功能, 但是拥有更高的灵活性。

ReentrantLockjava.util.concurrent包中实现, 也就意味着我们可以通过阅读java-doc来对其进行理解。

A reentrant mutual exclusion Lock with the same basic behavior and semantics as the implicit monitor lock accessed using synchronized methods and statements, but with extended capabilities.

在文档中也很明确的写出, ReentrantLocksynchronized具有相同的语义, 也是一种排它锁(互斥锁), 但是功能得到了拓展。

在接下来的文档描述中, 有一段相当重要的解释, 描述了公平锁和非公平锁对系统的稳定性和吞吐量所带来的影响。 那么什么是公平锁?当多个线程尝试获取同一个对象的锁时, 并且此时该对象的锁还未释放, 公平锁则保证等待时间最久的线程将会获得该锁。 而非公平锁则是让多个线程继续争抢, 哪个线程抢到了就是谁的, 没有先来后到一说。

公平在现实生活中是一件好事, 然而维护公平却需要更多的开销。 例如我们需要法院, 需要维护秩序的人员, 等等。 然而在计算机的世界中, 效率和吞吐量才是首要。 当我们将fairness参数设置为true时, 系统需要记录谁先申请锁, 谁后申请锁, 即维护一个加锁队列, 这样一来就会有额外的开销, 从而降低吞吐量。 所以在没有非常强的需求下, 使用默认的非公平锁即可。

文档还给出了使用显式可重入锁的最佳实践, 将释放锁的代码放到finally语句块儿中能够保证锁资源的正确释放, 增加系统的稳定性。

class X {
    private final ReentrantLock lock = new ReentrantLock();
    // ...

    public void m() {
        lock.lock();  // block until condition holds
        try {
            // ... method body
        } finally {
            lock.unlock()
        }
    }
}
3.1 ReentrantLock实现原理

ReentrantLock基于AQS所实现, AQS的基本原理在上一篇文章做了一个粗略的介绍, 那么在可重入锁这里, 着重分析一下其实现源码。 关键代码为ReentrantLock.Sync内部类, 以及其子类。 当我们使用默认的非公平锁构造器创建一个对象时, 实际实例化的对象类为NonfairSync

static final class NonfairSync extends Sync {
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

对于一个加锁操作, 必须借助于硬件为我们提供的原子性比较与赋值操作(compare-and-set, 以下简称CAS)。 当我们尝试调用ReentrantLock().lock()时, 首先对前锁对象的状态(state)值, 该值在AbstractQueuedSynchronizer中进行定义, 如果state == 0, 说明还没有线程上锁, 那么将当前线程赋给exclusiveOwnerThread属性, 并将state置为1。

此时其余线程想要对其进行加锁, 因为状态为1, 则执行acquire(1)操作:

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire方法其实是再一次确认state值是否为0, 以及是否是同一线程再次进行加锁操作; addWaiter方法将会创建一个双向链表节点(Node)并进行挂载, 并将该节点返回; acquireQueued接受一个Node对象并在内部维护一个死循环, 当前面一个节点的线程将锁释放之后该循环返回, 并且此时线程已经成功获取到了锁资源。 锁的释放与该过程相反, 基本类似。

比较有意思的是ReentrantReadWriteLock, 可重入读写锁。 假如我们有一个list, 读多写少并且在多个线程间共享, 为了保证读到的数据为最新数据以及写数据时的安全性, 对该list的读写均进行加锁操作。 因为操作为读多写少, 两个线程并发的读应该是允许的, 这样才能增加系统的吞吐量, 所以就有了读写锁:ReentrantReadWriteLock

ReentrantReadWriteLock保证了写操作时读操作阻塞, 读操作时另一个线程可以执行读操作, 但不允许执行写操作。 这样的设计极大的增加了系统的吞吐量。

3.2 synchronized与ReentrantLock

synchronized可以认为是纯底层实现, 线程的阻塞与唤醒交由操作系统管理, 这样一来就会有用户态和内核态来回切换的过程, 属于重量级操作, 所以又称为重量级锁。 并且开发人员没有办法控制和进行监控, 灵活性比较差, 但是使用很方便。

ReentrantLock由AQS所实现, 线程的阻塞与唤醒采用自旋进行管理, 即所有操作均在用户空间进行, 但是如果自旋时间过长同样会降低系统吞吐量。 该锁为开发人员提供了很灵活的接口, 能够结合业务场景进行定制化, 但是使用较为复杂。

当系统中持有锁的时间较长, 需要对一个执行较长时间的操作进行加锁时, synchronized会更好; 如果系统中持有锁的时间很短, 例如向map中添加一个对象这种O(1)操作, 使用ReentrantLock更好。 另外如果处理器为单核, 不要使用ReentrantLock, 因为在单核情况下, 自旋会耗尽CPU为其分配的时间片, 白白浪费资源且会造成大量的线程阻塞。

3.2 RLock

RLockPython中所提供的一种互斥可重入锁, 其原理和使用与Java没有什么明显的区别。 只不过由于Python中可以使用上下文管理器, 对资源的释放能够更好的管理而已:

import threading

r_lock = threading.RLock()

def do_something():
    with r_lock:
        do_something...

4. Condition

4.1 线程状态

如果想要理解Condition条件锁的用法, 首先得明白线程状态以及之间的转换。

状态名称 说明
初始(NEW) 线程被构建(实例化), 但是未调用start方法
运行(RUNNABLE) 运行状态, 已调用start方法, 或者是线程已就绪
阻塞(BLOCKED) 阻塞状态, 线程等待进入synchronized方法中, 等待获取对象锁
等待(WAITING) 通常是自身调用wait, join方法进入等待通知状态
超时等待(TIMED_WAITING) 等待超时, 例如获取Lock锁超时, join超时或者wait超时等
终止(TERMINATED) 线程结束, 任务执行完毕或异常抛出

值得一提的只有阻塞和等待状态, 这两个状态比较容易混淆。 只有在线程等待synchronized锁时才会进入阻塞状态, 而对于J.U.C.Lock所实现的各种锁, 由于底层是通过LockSupport方法实现, 所以会进入等待状态。

4.2 notify与notifyAll

这两个方法属于Java对象(Object), 也就是说任何一个对象都可以调用notify或者是notifyAll, 在获取到对象锁的前提下。

wait:线程释放所持有的对象锁, 并等待其它线程的notify(RUNNABLE -> WAITING)。 notify:唤醒一个当前处于wait当前对象的线程, 并使其获得对象锁(WAITING -> RUNNABLE)。 notifyAll:唤醒所有对当前对象处于wait状态的线程, 并让这些线程重新对锁进行争抢。

使用这两个方法可以实现一些比较复杂的线程同步问题, waitnotify有些类似于回合制游戏, 我先打一拳, 你再给我一脚; notifyAll类似于抛绣球, 丢出去一堆人哄抢。

4.3 Condition

Condition锁的核心函数同样也是notify, wait以及notifyAll, 但是Condition是由逻辑层面所实现的, 并不涉及synchronized, 所以这些方法区别于Objects的相关方法。 其原理与对象的notify等方法基本相同。

对于一个阻塞队列, 当队列已满时, put方法会阻塞直到队列中腾出了空间。 take方法当队列没有任何元素可取时同样会被阻塞, 直到有元素可以取出。 这种阻塞队列应用非常的广泛, 例如LinkedBlockingQueue, ArrayBlockingQueue, PriorityBlockingQueue, 这些阻塞队列的实现都由Condition完成。

我们来看一下LinkedBlockingQueue对于puttake方法的基本实现, 核心的成员变量:

public class LinkedBlockingQueue<E> {

    // 定义了两个可重入锁, 以及获取这两个可重入锁的Condition锁

    private final ReentrantLock takeLock = new ReentrantLock();

    private final Condition notEmpty = takeLock.newCondition();

    private final ReentrantLock putLock = new ReentrantLock();

    private final Condition notFull = putLock.newCondition();

put方法:

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    // count表示当前队列所容纳的元素数量, 为原子变量
    final AtomicInteger count = this.count;
    // 对可重入锁尝试加锁操作
    putLock.lockInterruptibly();
    try {
        // 当队列已满时, 调用notFull.await方法, 等待唤醒
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            // signal可以认为就是notify方法, 唤醒一正在await的线程
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}

take方法与put方法基本类似, 只不过await的判断条件不同而已, 并且会使用成员变量中的takeLocknotEmpty。 在不同的线程中进行协作式的同步, 最终达到阻塞队列的效果。

现在来具体的从逻辑上分析这两个方法的源代码, 以及理解为何需要两个可重入锁以及对应的条件锁。

首先, 如果我们自己实现put方法, 需要关心哪些细节? 首先在获取当前队列的数量时, 可重入锁就应该已经进行加锁操作, 禁止线程上下文切换; 当当前队列已满时, 操作应当阻塞, 调用notFull.await方法, 线程等待唤醒; 此时在另外一个线程中执行take操作, 取出队列一个元素, 那么此时就需要通知put方法中的等待线程, 也就是signalNotFull方法, put线程被唤醒继续执行, 本质上是调用notFull.signal方法。 同理, 在队列为空且put方法向队列中添加了一个元素, 会调用signalNotEmpty方法去唤醒take方法等待的线程, 本质上还是调用notEmpty.signal方法。

因为takeput方法通常会在两个不同的线程中执行, 所以需要不同的可重入锁对象, 那么也就必须要有不同的Condition对象。

5. 实现AQS的其它工具锁

5.1 CountDownLatch

CountDownLatch同步对象在开发中使用频率也会有, 但是没有那么多。 该同步对象的作用是: 等待或者超时等待多少个任务完成。 假如我有10个任务并发运行, 前三个任务是上下文相关的任务, 所以必须执行完毕才能继续执行, 而后面儿的7个任务我不关心, 那么此时就可以使用CountDownLatch来实现。

public class CountDownLatchDemo {

    private final static int taskCount = 3;

    private static void test() throws InterruptedException {
        Thread.sleep(100);
        System.out.println(Thread.currentThread().getName());
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();

        final CountDownLatch count = new CountDownLatch(taskCount);

        for (int i = 0; i < 10; i++) {
            exec.execute(() -> {
                try {
                    test();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    count.countDown();
                }
            });
        }
        System.out.println("This is a Line");
        count.await();
        System.out.println("Three task has completed");
        exec.shutdown();
    }
}

核心方法为countDown以及await, 当我们创建CountDownLatch对象时, 需要指定count大小, 每次执行countDown都会使得count原子性自减, 当减至为0时, 唤醒await等待。

顺带一提await方法同样是可以传入超时时间的。

5.3 CyclicBarrier

CyclicBarrier的作用与CountDownLatch的作用非常相似, 只不过CyclicBarrier更像是在无红绿灯斑马线过马路一样, 凑齐一波人就走。 其作用是当调用await方法的线程达到了指定的数量时, 这些线程才继续向下执行。

public class CyclicBarrierDemo {

    private final static CyclicBarrier barrier = new CyclicBarrier(3);

    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();

        for (int i = 0; i < 9; i++) {
            exec.execute(() -> {
                try {
                    test();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        exec.shutdown();
    }

    private static void test() throws BrokenBarrierException, InterruptedException {
        Thread.sleep(100);
        System.out.println("Thread: " + Thread.currentThread().getName() + ", 数据完成计算, 等待其余线程");
        barrier.await();
        System.out.println("Thread: " + Thread.currentThread().getName() + ", 继续执行");
    }
}

Demo代码写的并不是很严谨, 线程池的关闭和await方法之间会有一个耦合, 说明一下。 CyclicBarrier中的计数器是可以进行重用的, 而CountDownLatch则是一次性的, 无法进行重用。 这个也是这两个同步对象之间的主要差别。

5.3 Semaphore

Semaphore有点儿类似于高速的收费站, 8车道变4车道, 每次只允许4辆车同时收费。 Semaphore的真正作用其实就是限制并发数量, 使用方式与ReentrantLock类似。

public class SemaphoreDemo {

    private static final int maxTask = 3;

    public static void main(String[] args) {

        final Semaphore semaphore = new Semaphore(maxTask);
        for (int i = 0; i < 10; i++) {
            Thread t = new Thread(() -> {
                try {
                    semaphore.acquire();  // 申请一个许可
                    test();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();  // 释放一个许可
                }
            });
            t.start();
        }
    }

    private static void test() throws InterruptedException {
        Thread.sleep(1000);  // 测试需要, 更直观的显示并发运行的线程数量
        System.out.println(Thread.currentThread().getName());
    }
}

上面的代码在运行时可以很明显的感觉到线程并发的数量为3, 通常Semaphore会作为一个限流器来使用。

6. MySQL数据库相关锁

MySQL的重要程度不言而喻, 在Web开发中, 几乎绝大多数的并发问题都会映射至底层数据库。 例如商城应用的减库存操作, 首先我们要判断当前库存是否大于0, 大于0时进行下单逻辑并将库存数量减1, 我们可以写出这样的代码:

if (stock > 0) {
    /* 下单逻辑 */
    update table set stock = stock -1 where ..;
}

这是一个非常典型的Compare-And-Set操作, 在Java语言层面我们使用硬件提供支持的原子类来完成, 但是在这里并没有这种东西。 而且MySQL也是一种应用, 我们的Web APP和数据库通过socket进行交互, 不可能有原子类来提供相应的援助。

如果我们把上面儿的逻辑整体加锁呢?

synchronized(this) {
    if (stock > 0) {
        /* 下单逻辑 */
        update table set stock = stock -1 where ..;
    }
}

似乎可行, 但是如果下单逻辑比较复杂, 运行时间较长, 那么整体的下单吞吐量会受到很大的影响。 并且如果以多进程的方式来运行我们的应用, 那么线程锁在这里毫无用武之地。 进程和进程之间是不会管线程锁的, 当进程A的线程判断了库存大于0, 并准备执行下单逻辑, 此时进程B看到库存大于0, 直接完成该接口操作, 并将库存减1, 如果此时刚好库存为0, 那么进程A在执行逻辑时很有可能将库存减为负数, 毫无疑问的超卖了。

一种方式是将锁下沉到数据库, 采用事务 + 数据库锁的方式来解决这个问题。 不管我们的数据库集群采用什么样儿的拓补结构, 是MMM结构, 还是MMH结构, 最终的主库只有一个, 从库不参与写操作, 可以不考虑。 数据库的高可用架构是指当主库不可用时如何自动进行主从切换, 而不是构建多个主库。

顺带一提, 如果系统在MMM或者MMH的架构设计, 并且主库服务器采用顶配(128核256G内存), 被配置了4个或更多从库分担读压力, 在这种情况下数据库仍然是瓶颈的情况下, 就不是我这种杂鱼能够解决的了。

扯远了, MySQL为我们提供了select .. where id=xx for update这种语法。 即如果当前Data Line正在被修改, 那么语句等待获取锁, 当语句获取到锁时, 该锁会一直持有到事务结束。 那么当其余进程想要对该数据进行修改时, 会被锁阻塞。 这样一来我们可以写出这样的代码:

with transaction:
    stock = select stock from table where id=1 for update;
    if (stock > 0) {
        /* 下单逻辑 */
        update table set stock = stock -1 where id=1;
    }

OK, 这样一来就解决了多进程或者集群应用情况下的库存超卖问题, 但是所有的下单操作均会因为获取锁而形成串行化操作, 效率还是不高。 有没有可能进一步的将串行化粒度降低?

在下单时其实我们只是对库存数量进行判断而已, 只关心是不是大于0, 具体有多少, 并不care, 所以可以采用下面的方式:

with transaction:
    stock = select stock from table where id=1;
    if (stock > 0) {
        /* 下单逻辑 */
        stock = select stock from table where id=1 for update;
        if (stock - 1 < 0)
            事务直接回滚, 接口返回商品已售罄
        else
            update table set stock = stock -1 where id=1;
    }
    /* 接口直接返回商品售罄 */

这里做了一个双重校验(可以认为是双重校验锁), 首先粗略的判断库存是否充足, 若不充足直接返回, 可以降低事务回滚的次数; 第二次校验则是应对并发场景, 让能够下单的请求在减库存时进行串行化处理。

这种的处理方式能够解决一定并发量的问题, 对于更高的并发, 通常会采用异步化的处理:在秒杀开始前将库存写入Redis, 请求进入后利用Redis单线程操作的特性, 即DECR操作减库存, 若返回结果大于1则将其放入队列等待后续处理, 若返回结果小于1, 请求直接返回失败。 这种策略要比上面的策略效率更高, 但是代码编同时也更为复杂。

在上面以一个常见的业务场景引出了MySQL中常见的锁:update所加的行锁。

这是一个非常基本的知识:数据库在更新一条或者多条数据时, 会加入写锁; select .. for update也会对所查询的数据进行加锁。

关于数据库我想更多的讨论关于乐观锁和悲观锁的问题。 悲观锁, 其实也就是一系列的互斥锁, 读锁, 写锁等等, 由操作系统提供或者由语言底层提供。 而乐观锁, 则是在业务层面进行控制。

MVCC, Multi-Version Concurrency Control, 多版本并发控制就是应用了乐观锁的最佳体现。 我们对每一行数据都添加一个版本号, 简称为version, 该值在每一个的更新操作都会自增。 为了避免数据在读取完成之后改变, 通过两次查询该version值进行对比判断。

![ center ](https://smartkeyerror.oss-cn-shenzhen.aliyuncs.com/Blog/2018-11-16%2014-17-47%20%E7%9A%84%E5%B1%8F%E5%B9%95%E6%88%AA%E5%9B%BE.png)

进行select操作, 得到version, 然后准备进行更新, 更新时比较先前拿到的version值于当前version值是否相同, 若相同则进行更新, 若不相同则放弃更新重新select获取新值, 直至成功。

多版本并发控制主要就是比较和重试, 没有任何的锁涉及, 所以将其称为”乐观锁”。

7. 分布式锁

分布式锁的实现是一个非常复杂且极易出现问题的技术难题, 不管是使用Redis还是Zookeper来实现, 在细节方面都会有安全性以及一致性的问题, 所以这部分内容还需继续研究与实践。 关于分布式锁的内容将在后面的博文中给出。

smartkeyerror

日拱一卒,功不唐捐