One Producer multiple Consumers - using BlockingQueue
In Computer Science Producer-Consumer Problem also known as the bounded-buffer problem) is a classic example of a multi-process synchronization problem. The problem describes two processes, the producer and the consumer, who share a common, fixed-size buffer used as a queue. The producer's job is to generate data, put it into the buffer, and start again. At the same time, the consumer is consuming the data.
We have already articles for single producers and consumers in the Multi-Threading Section. Check this article for a Single Producer and Single Consumer Problem. Or in our Java Concurrent section, we have used BlockingQueue implementations to describe producer-consumer problems: https://www.tuturself.com/category?categoryId=134
But here in this article, we will discuss single producer and multiple consumers’ problem. Where a producer will produce items in a commonplace like a Queue. And there are multiple consumers which are processing these records one by one parallel. Thus we can achieve more throughput.
A real-life example of such a scenario is a Bank. Consider the bank has 5 counters for serving the customers. And the bank serves a maximum of 100 tokens/day. There is one token vending machine that is generating token continuously for each customer. When a customer arrives in a bank for some work, he /she needs to collect a token from the token vending machine (The Producer). Now the customer needs to wait for some time to be served from one of the 5 counters (The Consumers). Is not it simple? But wait, there are some challenges to implement it.
Typical Problems for this kind of problems are:
- Thread-safe data access for Producer and Consumer.
- The producer will keep adding elements to Queue (although a maximum of 100 in our case) regardless or Queue capacity.
- The producer is not aware of the status of items being processed by the consumer.
- The consumer will run an infinite loop to see if items are available in the queue to process.
- How to tell the consumer that all the items are processed? Now stop the execution.
Possible Solutions are:
- Use Thread Safe Queue.
- The producer should enter into sleep mode when queue size is full. This can be achieved by using a Blocking queue of fixed size. Thus the producer will wait for the consumers to consume some items when the queue capacity is full.
- The consumer should go to sleep more when buffer is empty and resume its operation when elements are available in the queue for processing. This also can have achieved by a Blocking Queue.
- The consumer should be notified to end processing when the producer completes producing elements, and no more elements will be added to the Queue. This can be achieved by using a CountDownLatch only when you know the total number of records your producer is going to produce. Create a CountDownLatch with the number of records, your producer is going to produce. And after consuming each record, decrement the count by One. And put the awaits() method only when all your consumer have started. Thus it will cross the awaits() only all your records are processed. Now you can shut down your consumers. We have used the similar technique in our example. Check the code for detail implementation.
BlockingCollection<T> is a thread-safe collection class that provides the following features: An implementation of the Producer-Consumer pattern. Concurrent adding and taking of elements from multiple threads. Optional maximum capacity. Insertion and removal operations that block when the collection is empty or full. To read more about BlockingQueue read here
Following is our Model Class for Token, which is produced by the Token Vending machine:
package com.tuturself.multiconsumer;
import java.util.UUID;
public class Token {
private UUID id;
private int number;
private String description;
public UUID getId() {
return id;
}
public void setId(UUID id) {
this.id = id;
}
public int getNumber() {
return number;
}
public void setNumber(int number) {
this.number = number;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
@Override
public String toString() {
return "Token [id=" + id + ", number=" + number + ", description=" + description + "]";
}
}
Now we have the Producer which is the TokenVendingMachine:
package com.tuturself.multiconsumer;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
public class TokenVendingMachine implements Runnable {
private BlockingQueue<Token> blockingQueue;
public TokenVendingMachine(BlockingQueue<Token> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
for (int i = 0; i <= 100; i++) {
Token token = new Token();
token.setId(UUID.randomUUID());
token.setNumber(i);
token.setDescription("Some Description");
/**
* Insert the token element in the Queue. Wait if no space is
* available
*/
try {
System.out.println("New token issued :" + token);
blockingQueue.put(token);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
Now we have the Consumer, which is a Counter for serving the Customer with a Token:
package com.tuturself.multiconsumer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
public class TokenServingCounter implements Runnable {
private BlockingQueue<Token> blockingQueue;
private CountDownLatch countDownLatch;
public TokenServingCounter(BlockingQueue<Token> blockingQueue,
CountDownLatch countDownLatch) {
this.blockingQueue = blockingQueue;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
/**
* Serving token one by one in a infinite loop.
* The Loop will break while there are no more
* token to serve
*/
while (true) {
if (countDownLatch.getCount() == 0) {
break;
}
try {
// Serving the customer with the token
Token token = blockingQueue.take();
System.out.println("Serving Token :" + token);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
/*Decrementing count from the Countdown Latch
as the token is served*/
countDownLatch.countDown();
}
}
}
}
Now let us test our Single Producer and multiple consumer example. Here we will start the producer First. Then we will start our 5 token serving counters. And all will be shutdown, where there are no more token to serve.
package com.tuturself.multiconsumer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class TestMultipleConsumer {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Token> blockingQueue = new LinkedBlockingQueue<>();
/**
* Create and START the vending Machine. The Machine will create
* 100 token/day. Which will be served in 5 counters
*/
TokenVendingMachine tokenVendingMachine = new TokenVendingMachine(blockingQueue);
new Thread(tokenVendingMachine).start();
CountDownLatch countDownLatch = new CountDownLatch(100);
// Here we have the Token consumer. We have 5 counter
TokenServingCounter tokenConsumer = new TokenServingCounter(blockingQueue,
countDownLatch);
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 1; i <= 5; i++) {
executor.submit(tokenConsumer);
}
countDownLatch.await();
System.out.println("Stopped");
executor.shutdown();
}
}
You can Run the TestMultipleConsumer class to test the Program. Debugging line by line will also help you to have a detailed understanding about the flow. Happy learning.
Nice explanation Kunal. This really helped me in clearing my concepts.