Producer Consumer

January 10, 2022

若有一些執行緒負責生產資源,另一些執行緒會消費資源,那就會有生產/消費間的協調問題,例如,最常見的就是協調生產速度與消費速度的問題。

協調生產/消費

如果你讓生產者/消費者直接處理等待、通知的話,會產生複雜的同步問題,沒有搞好的話,還可能產生死結;不如將等待、通知的職責,交由一個居中的協調者。

例如,生產者可將產品交給店員,消費者可從店員處取走產品,假設產品是整數好了:

class Producer implements Runnable {
    private Clerk clerk; 
    
    Producer(Clerk clerk) { 
        this.clerk = clerk; 
    } 
    
    public void run() { 
        for(var product = 1; product <= 10; product++) { 
            try { 
                clerk.setProduct(product);
            } catch (InterruptedException ex) {
                throw new RuntimeException(ex);
            }
        }       
    } 
}

class Consumer implements Runnable {
    private Clerk clerk; 
    
    Consumer(Clerk clerk) { 
        this.clerk = clerk; 
    } 
    
    public void run() { 
        for(var i = 1; i <= 10; i++) { 
            try {
                clerk.getProduct(); 
            } catch (InterruptedException ex) {
                throw new RuntimeException(ex);
            }
        } 
    }  
 } 

店員可維持一定數量的產品,若生產者速度較快,店員處的產品量已滿,店員叫生產者等一下,若有空位放產品再通知生產者繼續生產;如果消費者速度較快,店員手上沒有產品,店員告訴消費者等一下,有產品了再通知消費者前來消費。

class Clerk {
    private final int EMPTY = 0;
    private int product = EMPTY;

    synchronized void setProduct(int product)
                                  throws InterruptedException {
        waitIfFull();
        this.product = product;
        notify();
    }

    private synchronized void waitIfFull() throws InterruptedException {
        while(this.product != EMPTY) {
            wait();
        }
    }

    synchronized int getProduct() throws InterruptedException {
        waitIfEmpty();
        var p = this.product;
        this.product = EMPTY;
        notify();
        return p;
    }

    private synchronized void waitIfEmpty() throws InterruptedException {
        while(this.product == EMPTY) {
            wait();
        }
    } 
}

可以使用以下的程式來使用 ProducerConsumerClerk

var clerk = new Clerk(); 
new Thread(new Producer(clerk)).start(); 
new Thread(new Consumer(clerk)).start(); 

BlockingQueue

程式語言環境若有多執行緒能力,標準 API 通常就會提供具有 Producer Consumer 概念的實作,例如,Java 的 java.util.concurrent.BlockingQueue,可以用它來改寫一下以上的例子:

import java.util.concurrent.BlockingQueue;

class Producer implements Runnable {
    private BlockingQueue<Integer> productQueue; 
    
    Producer(BlockingQueue<Integer> productQueue) { 
        this.productQueue = productQueue; 
    } 
    
    public void run() { 
        for(var product = 1; product <= 10; product++) { 
            try { 
                productQueue.put(product);
            } catch (InterruptedException ex) {
                throw new RuntimeException(ex);
            }
        }       
    } 
}

class Consumer implements Runnable {
    private BlockingQueue<Integer> productQueue; 
    
    Consumer(BlockingQueue<Integer> productQueue) { 
        this.productQueue = productQueue; 
    } 
    
    public void run() { 
        for(var i = 1; i <= 10; i++) { 
            try {
                var product = productQueue.take();
            } catch (InterruptedException ex) {
                throw new RuntimeException(ex);
            }
        } 
    }  
 } 

BlockingQueue 的實作之一是 ArrayBlockingQueue 類別,可以指定容量,例如:

var queue = new ArrayBlockingQueue<Integer>(1); // 容量為1
new Thread(new Producer3(queue)).start(); 
new Thread(new Consumer3(queue)).start(); 

Producer Consumer 是蠻常見的模式,有些伺服器或甚至語言本身,提供 channel 之類的概念,可能就是這個模式的實現。