Producer Consumer 模式


假 設Producer與Consumer是兩個獨立的執行緒,Producer與Consumer生產與消費的速度並不一定,因此Producer將產品放 在中介物件,而Consumer從中介物件取得產品。假設中介物件持有產品的緩衝區長度有限(就像是一個產品桌,它可以擺放的空間是有限的),如果緩衝區 滿了,則Producer必須停止 繼續將產品放到緩衝區中,直到Consumer取走了產品而有了空間,而如果緩衝區中沒有產品,當然Consumer必須等待,直到有新的產品放到緩衝區 中。

一個簡單的 UML 順序圖如下所示:


Producer Consumer模式強調的是如何協調非同步生產與消費的問題。一個用Java實現的簡單流程架構如下:
import java.util.*;

class Store {
private LinkedList<Integer> products = new LinkedList<Integer>();
synchronized void add(Integer product) {
while(products.size() >= 2) { // 容量限制為 2
try {
wait();
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
products.addLast(product);
notifyAll();
}
synchronized Integer get() {
while(products.size() <= 0) {
try {
wait();
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
Integer product = products.removeFirst();
notifyAll();
return product;
}
}

class Producer implements Runnable {
private Store store;
Producer(Store store) {
this.store = store;
}
public void run() {
for(int product = 1; product <= 10; product++) {
try {
// wait for a random time
Thread.sleep((int) Math.random() * 3000);
}
catch(InterruptedException e) {
e.printStackTrace();
}
store.add(product);
System.out.println("Produce " + product);
}
}
}

class Consumer implements Runnable {
private Store store;

Consumer(Store store) {
this.store = store;
}

public void run() {
for(int i = 1; i <= 10; i++) {
try {
// wait for a random time
Thread.sleep((int) (Math.random() * 3000));
}
catch(InterruptedException e) {
e.printStackTrace();
}
System.out.println("Consume " + store.get());
}
}
}

public class Main {
public static void main(String[] args) {
Store store = new Store();
(new Thread(new Producer(store))).start();
(new Thread(new Consumer(store))).start();
}
}

如果使用Python來實現以下範例:
import threading
import time
import random

class Store:
def __init__(self):
self.products = []
self.condition = threading.Condition()

def add(self, product):
self.condition.acquire()
while len(self.products) >= 2: # 容量限制為 2
self.condition.wait()
self.products.append(product)
self.condition.notify()
self.condition.release()

def get(self):
self.condition.acquire()
while not self.products:
self.condition.wait()
product = self.products.pop(0)
self.condition.notify()
self.condition.release()
return product

class Producer(threading.Thread):
def __init__(self, store):
threading.Thread.__init__(self)
self.store = store

def run(self):
for product in range(1, 11):
time.sleep(int(random.random() * 3))
self.store.add(product)
print("Produce %d" % product)

class Consumer(threading.Thread):
def __init__(self, store):
threading.Thread.__init__(self)
self.store = store

def run(self):
for i in range(1, 11):
time.sleep(int(random.random() * 3))
print("Consume %d" % store.get())

store = Store()
Producer(store).start()
Consumer(store).start()