在 Thread-Per-Message 模式 中,每次請求來到,就建立一個新的執行緒,用完就不再使用,然後執行緒的建立需要系統資源,對於一個接受許多請求的情況,不斷的建立新執行緒,會導致系統 效能的降低。
可以的話,若能重複使用所建立的執行緒,而不是用完就丟,可以有效的重複利用資源。在 Worker Thread 模式 的範例中,預先建立好執行緒,當請求佇列有新請求時,通知等待的執行緒取出請求進行處理,其實就是一種重複使用執行緒的方式。
該如何重複使用執行緒?執行緒一旦離開run()方法,該執行緒任務就不能再重複使用,所以就是想辦法讓執行緒不離開run()方法,但不執行完run ()方法,又如何能完成您交付給執行緒的任務?看似矛盾的需求,其實在 Worker Thread 模式 中就有示範過概念了,也就是在run()中設定無窮迴圈:
class Worker implements Runnable {
private RequestQueue queue;
Worker(RequestQueue queue) {
this.queue = queue;
}
public void run() {
while(true) {
queue.get().execute();
}
}
}
private RequestQueue queue;
Worker(RequestQueue queue) {
this.queue = queue;
}
public void run() {
while(true) {
queue.get().execute();
}
}
}
Thread pool 模式的概念就是,需要使用執行緒時,在一個執行緒池中尋找可用的執行緒,如果找不到再建立新的,執行緒使用完畢後,留在池中重複使用。
以下是個簡單的示範程式,可以看出如何建立可重複使用的執行緒,與執行緒池的基本概念:
import java.util.*;
interface Request {
void execute();
}
class WorkerThread extends Thread {
private Request request;
private boolean isContinue = true;
boolean isIdle() {
return request == null;
}
void setRequest(Request request) {
synchronized(this) {
if(isIdle()) {
this.request = request;
notify();
}
}
}
public void run() {
while(isContinue) {
synchronized(this) {
try {
wait();
}
catch(InterruptedException e) {
e.printStackTrace();
}
request.execute();
request = null;
}
}
}
void terminate() {
isContinue = false;
setRequest(new Request() {
public void execute() { /* do nothing */ }
});
}
}
class WorkerThreadPool {
private List<WorkerThread> workerThreads;
WorkerThreadPool() {
workerThreads = new ArrayList<WorkerThread>();
}
synchronized void service(Request request) {
boolean idleNotFound = true;
for(WorkerThread workerThread : workerThreads) {
if(workerThread.isIdle()) {
workerThread.setRequest(request);
idleNotFound = false;
break;
}
}
if(idleNotFound) {
WorkerThread workerThread = createWorkerThread();
workerThread.setRequest(request);
}
}
synchronized void cleanIdle() {
for(WorkerThread workerThread : workerThreads) {
if(workerThread.isIdle()) {
workerThreads.remove(workerThread);
workerThread.terminate();
}
}
}
private WorkerThread createWorkerThread() {
WorkerThread workerThread = new WorkerThread();
workerThread.start();
workerThreads.add(workerThread);
try {
Thread.sleep(1000); // 給點時間進入 Runnable
}
catch(InterruptedException e) {
e.printStackTrace();
}
return workerThread;
}
}
範 例中的WorkerThreadPool是個簡單的實現,您可以採用更完善的池化技術,另一個重點則在於WorkerThread如何重用,執行緒一旦啟 動,就進入無窮迴圈並進入等待,如果有設定請求,則被通知執行請求,請求執行完畢,回到迴圈開頭又進入等待,如此循環不斷。以下則是一個使用 WorkerThreadPool的示範:
class Service {
private WorkerThreadPool pool = new WorkerThreadPool();
void accept(Request request) {
pool.service(request);
}
}
// 以下模擬客戶發出請求
class Client implements Runnable {
private Service service;
Client(Service service) {
this.service = service;
}
public void run() {
while(true) {
Request request = new Request() {
public void execute() {
System.out.println("執行客戶請求...XD");
try {
Thread.sleep((int) (Math.random() * 3000));
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
};
service.accept(request);
try {
Thread.sleep((int) (Math.random() * 3000));
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Main {
public static void main(String[] args) {
Service service = new Service();
for(int i = 0; i < 5; i++) {
(new Thread(new Client(service))).start();
}
}
}
使用Python來示範的話:
import threading
import time
import random
class WorkerThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.condition = threading.Condition()
self.isContinue = True
self.request = None
def isIdle(self):
return self.request == None
def setRequest(self, request):
self.condition.acquire()
if self.isIdle():
self.request = request
self.condition.notify()
self.condition.release()
def run(self):
while self.isContinue:
self.condition.acquire()
self.condition.wait()
self.request()
self.request = None
self.condition.release()
def terminate(self):
self.isContinue = False
self.setRequest(lambda: None) # do nothing
class WorkerThreadPool:
def __init__(self):
self.workerThreads = []
def service(self, request):
idleNotFound = True
for workerThread in self.workerThreads:
if workerThread.isIdle():
workerThread.setRequest(request)
idleNotFound = False
break
if idleNotFound:
workerThread = self.createWorkerThread()
workerThread.setRequest(request)
def cleanIdle(self):
for workerThread in self.workerThreads:
if workerThread.isIdle():
self.workerThreads.remove(workerThread)
workerThread.terminate()
def createWorkerThread(self):
workerThread = WorkerThread()
workerThread.start()
self.workerThreads.append(workerThread)
time.sleep(1)
return workerThread
class Service:
def __init__(self):
self.pool = WorkerThreadPool()
def accept(self, request):
self.pool.service(request)
class Client(threading.Thread):
def __init__(self, service):
threading.Thread.__init__(self)
self.service = service
def run(self):
while True:
second = int(random.random() * 3) # 隨機模擬請求的執行時間
request = lambda: print("執行客戶請求...XD"); time.sleep(second)
self.service.accept(request)
time.sleep(int(random.random() * 3))
service = Service()
for i in range(5):
Client(service).start()
while True:
try:
time.sleep(1)
except KeyboardInterrupt:
exit()