Monday, October 27, 2014

Solving The Producer Consumer Problem in Java

Producer and Consumer solves one of the standard multithreaded ( or concurrency) problem via inter process communication mechanism. Let's try to understand the problem first -

Problem

There are two mutually exclusive processes ( or rather say threads ); one produces something (i.e Producer) and the other one consumes the same (i.e. Consumer). And to handle these two activities it uses a fixed sized buffer or queue. And both work independently. So, obviously there is a problem due to fixed size of the buffer; Producer can't keep adding for forever and similarly consumer can't keep taking endlessly. So how will you ensure that producer doesn't add data in queue if it's full; and consumer doesn't remove data if it's empty.
Also, there could be multiple producers and consumers.

Here is Wikipedia link on same.This is also known as Bounded-Buffer problem.

Solution

If the buffer is full producer can't add more items and if buffer is empty consumer can't take items from it. So to synchronize these two processes; block the producer when buffer is full and similarly block the consumer when the buffer is empty.

This can be solved using Semaphore and Monitor technique. Java also provides a Queue API which supports these features; BlockingQueue.


I am not covering more details on the methods provided by BlockingQueue. You can find that out from the Java doc page. So better take an example BlockingQueue to understand it in detail.


package queue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * Blocking queue with 2 publishers and 1 subscriber
 * @author Sid
 *
 */
public class BlockingQueueDemo {
 private BlockingQueue<String> queue;
 private volatile static int count = 0;

 public BlockingQueueDemo(int size) {
  queue = new ArrayBlockingQueue<>(size);
 }

 public Runnable publisher1() {
  return new Runnable() {
   public void run() {
    while (!Thread.interrupted()) {
     try {
      // blocks if queue is full
      queue.put("p1:" + count++);
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    }
   }
  };
 }

 public Runnable publisher2() {
  return new Runnable() {
   public void run() {
    while (!Thread.interrupted()) {
     try {
      // blocks if queue is full
      queue.put("p2:" + count++);
      TimeUnit.MILLISECONDS.sleep(5);
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    }
   }
  };
 }

 public Runnable subscriber1() {
  return new Runnable() {
   public void run() {
    while (!Thread.interrupted()) {
     try {
      // blocks until queue is empty
      System.out.println("Element: "+queue.take() + " ;Size: "+ queue.size());
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    }
   }
  };

 }

 public static void main(String[] args) {
  BlockingQueueDemo bqd = new BlockingQueueDemo(100);

  // though subscriber starts first; but waits until some element has been
  // put in the queue
  Thread sub = new Thread(bqd.subscriber1());
  sub.start();

  // start publisher
  Runnable p1 = bqd.publisher1();
  Runnable p2 = bqd.publisher2();
  Thread tp1 = new Thread(p1);
  Thread tp2 = new Thread(p2);
  tp1.start();
  tp2.start();

  try {
   TimeUnit.SECONDS.sleep(1);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
  tp1.interrupt();
  tp2.interrupt();
  sub.interrupt();
  System.out.println(" elements in queue :" + bqd.queue.size());
 }

}

In wait/notify post, I have provided a simplistic implementation of blocking queue.

---
keep coding !

No comments:

Post a Comment