Worker Thread 模式


每 個執行緒處理一個請求,每次執行緒執行完請求後,再次嘗試取得下一個請求並執行,這是Worker Thread的基本概念,對於一些需要冗長計算或要在背景執行的請求,可以採用Worker Thread。

Thread-Per-Message 模式 中,其實已經有點Worker Thread的概念,在Service物件接收到資料後,以匿名方式建立執行緒來處理資料,那個建立的執行緒就是Worker Thread,只不用過就丟了。

Worker Thread可以應用在不同的場合,例如在 Guarded Suspension 模式 的範例,是使用一個執行緒來處理請求佇列中的請求,如果請求不斷來到,且請求中可能有冗長的處理,則請求佇列中的請求可能會來不及消化。

您可以為請求佇列中的每個請求配給一個執行緒來處理,不過實際上,只要建立足夠多的執行緒即可,在以下的範例中,可以指定請求佇列預先建立的執行緒數量, 每個執行緒會取出一個請求來執行。
import java.util.*;

interface Request {
void execute();
}

class Worker implements Runnable {
private RequestQueue queue;
Worker(RequestQueue queue) {
this.queue = queue;
}
public void run() {
while(true) {
queue.get().execute();
}
}
}

class RequestQueue {
private LinkedList<Request> requests;

RequestQueue(int workers) {
requests = new LinkedList<Request>();
for(int i = 0; i < workers; i++) {
(new Thread(new Worker(this))).start();
}
}

synchronized Request get() {
while(requests.size() == 0) {
try {
wait();
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
return requests.removeFirst();
}

synchronized void put(Request request) {
requests.addLast(request);
notifyAll();
}
}

// 模擬 Client 置入請求
class Client implements Runnable {
private RequestQueue queue;
Client(RequestQueue queue) {
this.queue = queue;
}
public void run() {
while(true) {
Request request = new Request() {
public void execute() {
System.out.println("執行客戶請求...XD");
try {
Thread.sleep((int) (Math.random() * 3000));
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
};
queue.put(request);
try {
Thread.sleep((int) (Math.random() * 3000));
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
}
}

public class Main {
public static void main(String[] args) {
RequestQueue queue = new RequestQueue(5);
for(int i = 0; i < 5; i++) {
(new Thread(new Client(queue))).start();
}
}
}

在這個範例中, Worker Thread有請求來了就作,如果沒有請求,則所有的Worker Thread就等待,直到有新的工作進來而通知它們,取得請 求的WorkerThread要作的工作,就直接定義在execute()中。

以順序圖來表示這個範例:

若使用Python來示範的:
import threading
import time
import random

class Worker(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue

def run(self):
while True:
self.queue.get()()

class RequestQueue:
def __init__(self, workers):
self.requests = []
self.condition = threading.Condition()
for i in range(workers):
Worker(self).start()

def get(self):
self.condition.acquire()
while not self.requests:
self.condition.wait()
request = self.requests.pop(0)
self.condition.release()
return request

def put(self, request):
self.condition.acquire()
self.requests.append(request)
self.condition.notify()
self.condition.release()

class Client(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue

def run(self):
while True:
second = int(random.random() * 3) # 隨機模擬請求的執行時間
request = lambda: print("執行客戶請求...XD"); time.sleep(second)
self.queue.put(request)
time.sleep(int(random.random() * 3))

queue = RequestQueue(5)
for i in range(5):
Client(queue).start()

while True:
try:
time.sleep(1)
except KeyboardInterrupt:
exit()