Java并发编程中生产者消费者模型是什么_经典实现思路

生产者消费者模型的核心约束是:多个线程共享有限容量缓冲区,生产者只put、消费者只take,满则生产者阻塞、空则消费者阻塞,且所有操作线程安全。

什么是生产者消费者模型的核心约束

它不是一种具体代码写法,而是一组必须满足的并发约束:多个线程共享一个有限容量的缓冲区;生产者线程只负责往缓冲区put数据,消费者线程只负责从缓冲区take数据;缓冲区满时生产者必须阻塞或等待,空时消费者必须阻塞或等待;所有操作必须线程安全,不能出现数据错乱或重复消费。

BlockingQueue 实现最简可靠版本

Java 标准库的 BlockingQueue 接口(如 ArrayBlockingQueueLinkedBlockingQueue)已封装了等待/唤醒、锁、容量控制等全部逻辑,是首选方案。

  • put() 方法在队列满时自动阻塞,take() 在空时自动阻塞,无需手动 wait()/notify()
  • 构造时指定容量(如 new ArrayBlockingQueue(10)),超出即触发阻塞,天然防止内存溢出
  • 所有操作原子性由实现类保证,不用额外同步 —— 但注意:若需在 put/take 前后加日志或状态更新,这部分仍需自行同步
class Producer implements Runnable {
    private final BlockingQueue queue;
    Producer(BlockingQueue q) { this.queue = q; }
    public void run() {
        try {
            for (int i = 0; i < 5; i++) {
                String item = "msg-" + i;
                System.out.println("Producing: " + item);
                queue.put(item); // 自动阻塞
                Thread.sleep(100);
            }
        } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
    }
}

class Consumer implements Runnable {
    private final BlockingQueue queue;
    Consumer(BlockingQueue q) { this.queue = q; }
    public void run() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                String item = queue.take(); // 自动阻塞
                System.out.println("Consuming: " + item);
                Thread.sleep(200);
            }
        } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
    }
}

手写 synchronized + wait/notify 的关键陷阱

仅用于理解原理,不推荐生产使用。最容易出错的是:wait() 被虚假唤醒、条件判断用 if 而非 whilenotify() 误用为 notifyAll() 导致竞争失控。

  • 必须用

    while 循环检查条件,不能用 if —— 因为 wait() 返回后状态可能已变
  • notify() 只唤醒单个等待线程,但无法保证唤醒的是对应角色(比如生产者调用 notify() 却唤醒了另一个生产者),应统一用 notifyAll()
  • 所有对共享变量(如 queue.size()queue.isEmpty())的读写,必须包裹在同一个 synchronized 块中,且锁对象一致

ReentrantLock + Condition 替代方案的适用场景

当需要更精细的控制(例如区分“生产者等待队列”和“消费者等待队列”,避免不必要的唤醒)时,用 ReentrantLock 配合两个 Condition 更合适。

  • lock.newCondition() 创建独立等待队列,生产者 await()notFull 上,消费者 await()notEmpty
  • notFull.signal() 只唤醒等待空间的生产者,notEmpty.signal() 只唤醒等待数据的消费者,减少无效调度
  • 必须显式 lock.lock()/unlock(),且 unlock() 必须放在 finally 块里,否则极易死锁

真正难的不是写出来,而是想清楚:你是否真的需要这种粒度的控制?多数业务场景下,BlockingQueue 的语义足够清晰,出错率更低。