ReentrantLock 与 Condition

ReentrantLock 是 Java 5 引入的显式锁,提供了比 synchronized 更灵活的功能。相比 synchronized,ReentrantLock 支持公平/非公平、可中断、超时获取,同时提供了 Condition 条件等待机制。

ReentrantLock vs synchronized

对比表

特性ReentrantLocksynchronized
获取方式显式 lock/unlock隐式,进入/退出同步块
公平性支持公平/非公平非公平
可中断支持 tryLock()不支持
超时支持 tryLock(timeout)不支持
多条件支持多个 Condition只有一个 wait/notify
锁状态可查询不可查询

基本用法

ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
    // 临界区代码
} finally {
    lock.unlock();  // 必须在 finally 中释放
}

可重入机制

ReentrantLock 支持重入,即同一个线程可以多次获取锁:

lock.lock();
try {
    lock.lock();  // 同一个线程再次获取
    try {
        // 重入层 2
        lock.lock();  // 再次重入
        try {
            // 重入层 3
        } finally {
            lock.unlock();  // 解锁 3
        }
    } finally {
        lock.unlock();  // 解锁 2
    }
} finally {
    lock.unlock();  // 解锁 1
}

重入计数

ReentrantLock 内部维护一个计数器:

// Sync 类继承 AQS
abstract static class Sync extends AbstractQueuedSynchronizer {
    // state 表示重入次数
}

// 非公平获取
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();

    if (c == 0) {
        // 首次获取
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    } else if (current == getExclusiveOwnerThread()) {
        // 重入,增加计数
        int nextc = c + acquires;
        setState(nextc);
        return true;
    }
    return false;
}

公平锁 vs 非公平锁

非公平锁

默认实现,新线程可以直接插队:

public ReentrantLock() {
    sync = new NonfairSync();
}

// NonfairSync
static final class NonfairSync extends Sync {
    final void lock() {
        // 直接尝试获取,可能插队
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
}

公平锁

严格按等待顺序:

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

// FairSync
static final class FairSync extends Sync {
    final void lock() {
        acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();

        if (c == 0) {
            // 公平:检查是否有更早等待的线程
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            setState(nextc);
            return true;
        }
        return false;
    }
}

选择建议

flowchart LR
    subgraph 选择
        A["非公平锁\n吞吐量高\n可能饥饿"] --> |"大多数场景| R["推荐"]
        B["公平锁\n避免饥饿\n吞吐量低"] --> |"需要公平性| R
    end

可中断获取

lockInterruptibly()

ReentrantLock lock = new ReentrantLock();

try {
    lock.lockInterruptibly();  // 可中断获取
    try {
        // 临界区
    } finally {
        lock.unlock();
    }
} catch (InterruptedException e) {
    // 线程被中断
}

tryLock()

ReentrantLock lock = new ReentrantLock();

// 立即返回
if (lock.tryLock()) {
    try {
        // 获取成功
    } finally {
        lock.unlock();
    }
} else {
    // 获取失败
}

// 带超时
if (lock.tryLock(5, TimeUnit.SECONDS)) {
    try {
        // 5 秒内获取成功
    } finally {
        lock.unlock();
    }
} else {
    // 超时
}

Condition 条件等待

synchronized 的 wait/notify

synchronized (obj) {
    while (condition不满足) {
        obj.wait();
    }
    // 处理
}

synchronized (obj) {
    obj.notifyAll();
}

ReentrantLock 的 Condition

ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();

lock.lock();
try {
    while (condition不满足) {
        condition.await();  // 等待
    }
    // 处理
} finally {
    lock.unlock();
}

// 在其他线程
lock.lock();
try {
    condition.signalAll();  // 唤醒
} finally {
    lock.unlock();
}

多 Condition

ReentrantLock 支持创建多个 Condition:

ReentrantLock lock = new ReentrantLock();
Condition bufferNotFull = lock.newCondition();
Condition bufferNotEmpty = lock.newCondition();

// 生产者线程
lock.lock();
try {
    while (buffer.isFull()) {
        bufferNotFull.await();  // 等待缓冲区不满
    }
    buffer.add(item);
    bufferNotEmpty.signal();   // 通知缓冲区非空
} finally {
    lock.unlock();
}

// 消费者线程
lock.lock();
try {
    while (buffer.isEmpty()) {
        bufferNotEmpty.await();  // 等待缓冲区非空
    }
    Object item = buffer.remove();
    bufferNotFull.signal();     // 通知缓冲区不满
} finally {
    lock.unlock();
}

生产者-消费者示例

有界阻塞队列

public class BoundedQueue<T> {

    private final Object[] items;
    private int putIndex, takeIndex, count;
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public BoundedQueue(int capacity) {
        items = new Object[capacity];
    }

    public void put(T item) throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                notFull.await();  // 等待不满
            }
            items[putIndex] = item;
            putIndex = (putIndex + 1) % items.length;
            count++;
            notEmpty.signal();  // 通知不空
        } finally {
            lock.unlock();
        }
    }

    public T take() throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                notEmpty.await();  // 等待非空
            }
            @SuppressWarnings("unchecked")
            T item = (T) items[takeIndex];
            items[takeIndex] = null;
            takeIndex = (takeIndex + 1) % items.length;
            count--;
            notFull.signal();  // 通知不满
            return item;
        } finally {
            lock.unlock();
        }
    }
}

锁状态查询

相关方法

ReentrantLock lock = new ReentrantLock();

lock.lock();
try {
    // 查询持有锁的线程
    Thread holder = lock.getOwner();

    // 查询等待队列中的线程数
    int waiters = lock.getQueueLength();

    // 查询等待某个条件的线程数
    Condition c = lock.newCondition();
    int waiting = lock.getWaitQueueLength(c);

    // 查询是否锁定
    boolean isHeld = lock.isLocked();

    // 查询当前线程是否持有锁
    boolean heldByCurrent = lock.isHeldByCurrentThread();

    // 查询重入次数
    int holds = lock.getHoldCount();
} finally {
    lock.unlock();
}

最佳实践

必须释放

// 错误:可能不释放锁
ReentrantLock lock = new ReentrantLock();
lock.lock();
if (condition) {
    return;  // 锁未释放!
}

// 正确:在 finally 中释放
ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
    if (condition) {
        return;
    }
} finally {
    lock.unlock();
}

正确选择获取方式

// 1. 简单场景:synchronized 足够
synchronized (obj) {
    // 处理
}

// 2. 需要高级特性:ReentrantLock
ReentrantLock lock = new ReentrantLock();

// 需要公平性
ReentrantLock fairLock = new ReentrantLock(true);

// 需要可中断
lock.lockInterruptibly();

// 需要超时
lock.tryLock(5, TimeUnit.SECONDS);

// 需要多个条件
Condition c1 = lock.newCondition();
Condition c2 = lock.newCondition();

本章总结

核心要点

  1. ReentrantLock vs synchronized:更灵活,支持公平/非公平、可中断、超时
  2. 可重入:同一线程可多次获取,内部计数器维护重入次数
  3. 公平性:非公平锁吞吐量高,公平锁避免饥饿
  4. Condition:比 wait/notify 更强大的条件等待机制
  5. 多 Condition:一个锁可以有多个独立的条件队列
  6. 必须在 finally 中释放:确保锁一定被释放

ReentrantLock 是 synchronized 的有力补充。下一节我们将讲解读写锁(ReentrantReadWriteLock)。