Thread Pool

January 12, 2022

建立執行緒需要成本,若能重複使用已建立的執行緒,而不是用完就丟,對效率可能有所助益,〈Worker Thread〉示範了預先建立執行緒,不斷地從任務來源取得任務並執行,是一種重複使用執行緒的方式。

另一種 Worker

Worker Thread〉範例的實現方式,需要知道協調者的存在;另一種方式是考慮將 Worker 角色轉為被動,也就是被交辦任務,而不是主動取得任務,不用知道協調者的存在,然而這個時候,協調者必須知道 Worker 是否處於空閒狀態,才能知道該不該交辦任務:

class Worker extends Thread {
    private Runnable runnable;
    private boolean isOnDuty = true;
    
    boolean isIdle() {
        return runnable == null;
    }
    
    void accept(Runnable runnable) {
        synchronized(this) {
            if(isIdle()) {
                this.runnable = runnable;
                notify();
            }
        }
    }
    
    public void run() {
        while(isOnDuty) {
            synchronized(this) {
                runnable.run();
                runnable = null;
                try {
                    wait();
                }
                catch(InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }
    
    void terminate() {
        isOnDuty = false;
        accept(() -> {}); // nope
    }
}

上面這個 Worker 建立後,必須先交辦任務,然後啟動執行緒,當手邊的 runnable 執行完畢,執行緒進入等待狀態,若要判斷 Worker 是否符於空閒狀態,是簡單地透過 runnable 是否為 null;範例中也簡單地實現了了〈Two-phase Termination〉中談到的,如何停止執行緒。

簡單的 ThreadPool

為了要管理 Worker,這邊實現一個簡單的 ThreadPool,可以在沒有 Worker 可以接任務時,建立新 Worker,也示範了清除空閒 Worker 的簡單方式:

class ThreadPool {
    private List<Worker> workers = new ArrayList<>();
    
    synchronized void submit(Runnable runnable) {
        for(var worker : workers) {
            if(worker.isIdle()) {
                worker.accept(runnable);
                return;
            }
        }
        
        // 沒有空閒的 Worker,建立新的
        var worker = new Worker();
        worker.accept(runnable);
        worker.start();
        workers.add(worker);
    }
    
    synchronized void removeIdle() {
        for(var worker : workers) {
            if(worker.isIdle()) {
                workers.remove(worker);
                worker.terminate();
            }
        }
    }
}

以〈Thread-Per-Message〉下載網頁的範例來說,可以如下改用 ThreadPool

var pool = new ThreadPool();

for(var uri : uris) {
    pool.submit(() -> {
        try {
            download(
                uri, 
                uri.replace("https://openhome.cc/zh-tw/", "")
                   .replace("/", "")
                   .concat(".html")
            );
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    });
}

Java 的 Executor 框架

當然,以上只是簡單的執行緒池示範,真的需要執行緒池功能的話,建議瞭解一下現成穩固的方案,例如 Java 標準 API 提供 Executor 框架,可以依需求,透過 使用 java.util.concurrent.ExecutorsnewCachedThreadPoolnewFixedThreadPool 靜態方法來建構想要的執行緒池,例如:

var pool = Executors.newCachedThreadPool();

for(var uri : uris) {
    pool.submit(() -> {
        try {
            download(
                uri, 
                uri.replace("https://openhome.cc/zh-tw/", "")
                   .replace("/", "")
                   .concat(".html")
            );
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    });
}