Worker Thread

January 12, 2022

執行緒建立後,從任務來源處取得任務,每次執行完後,繼續取得下個任務,這是 Worker Thread 的基本概念,可應用在一些需要冗長計算或背景執行的請求。

馬不停蹄的 Worker

因為每次執行完後,執行緒要繼續取得下個任務,這表示執行緒的流程不會停止,基本上會是個任務迴圈:

import java.util.concurrent.BlockingQueue;

class Worker extends Thread {
    private BlockingQueue<Runnable> runnables;
    
    Worker(BlockingQueue<Runnable> runnables) {
        this.runnables = runnables;
    }
    
    public void run() {
        while(true) {
            try {
                runnables.take().run();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

就消費任務而言,這樣的 Worker 實現了〈Producer Consumer〉中 Consumer 的角色,至於協調任務的角色,這邊直接使用 Java 標準 API 的 java.util.concurrent.BlockingQueue

因為使用的是 BlockingQueue,也就是說,BlockingQueue 沒有任務時,Worker 會進入等待狀態;另一方面,這邊並沒有考慮 Worker Thread 流程被中止時,該怎麼做善後,這會在之後的文件討論。

任務的交辦?

至於怎麼交辦任務,那是另一回事了,以〈Thread-Per-Message〉中的範例來說,或許你只需要下載網頁時,有其他流程可以代勞,主流程別被阻斷就好,這時可以改寫一下其中的 ThreadService

class ThreadService {
    private static BlockingQueue<Runnable> runnables = new LinkedBlockingQueue<>();
    
    static {
        // 只有一個 Worker Thread
        new Worker(runnables).start();
    }
    
    static void submit(Runnable runnable) {
        runnables.add(runnable);
    }
}

建立 Worker 時,需要指定 BlockingQueue 的實作,以便 Worker 主動取得任務,這邊使用了 Java 標準 API 的 java.util.concurrent.LinkedBlockingQueue,沒有限制任務的長度;並且只建立了一個 Worker Thread。

必要時可以建立多個 Worker Thread,或許可進一步考量,待執行任務大於 Worker Thread 數量一定程度時,建立新的 Worker Thread,也許又會想在空閒時減少 Worker Thread,或者進一步地思考,關閉 ThreadService 時,未被執行的任務該怎麼辦等問題。

既然如此,可以考慮將 Worker 角色轉為被動,也就是被交辦任務,而不是主動取得任務,不用知道 BlockingQueue 這類角色的存在,任務的交辦可以由 ThreadService 這類角色負責,這時 ThreadService 就有執行緒池管理的概念了。

執行緒池的部份,就在後續的文件中,再來探討了…