Tuesday, December 30, 2014

Implementing Thread Pool in Java

In the previous post, I discussed Thread Pool fundamentals. I strongly believe such confusing concepts should be best explained through code; so here goes this dedicated post!

This post has a prototype thread pool implementation in Java. I have abstracted Runnable into a Task and Thread into worker to avoid the confusion. It also serves the purpose of clearly differentiating task and thread.

Implementation

Classes can be classified broadly in below two categories.
Job Queue: Task, TaskQueue, and TaskQueueImpl
Worker/Thread Pool: Worker and WorkerPool

Job queue component is about tasks and their storage in the queue. Worker pool manages lifecycle of threads/workers.


package pool.thread;

/**
 * Abstracts task; Notice it implements Runnable.
 * It finds all prime numbers between two values (i.e. start and end)
 * 
 */
public class Task implements Runnable {
 private static int counter = 1;
 private String taskId;
 private int start;
 private int end;

 public Task(int start, int end) {
  this.taskId = "task-" + counter++;
  this.start = start;
  this.end = end;
 }

 @Override
 public void run() {
  System.out.println(" \n Prime Numbers in range [" + this.start + "-"
    + this.end + "]");
  while (start <= end) {
   if (isPrime(start)) {
    System.out.print(" " + start);
   }
   start++;
  }
 }

 private boolean isPrime(int num) {
  if (num > 1 && num % 2 == 0) {
   return false;
  }
  int sqrRoot = (int) Math.sqrt(num);
  for (int i = 3; i < sqrRoot; i += 2) {
   if (num % i == 0) {
    return false;
   }
  }
  return true;
 }

 public String toString() {
  return "id:" + this.taskId;
 }
}

package pool.thread;

/**
 * Interfaces supported by Task queue
 */
public interface TaskQueue {
 public Task getNextJob();
 public void addJob(Task task);
 public boolean isEmpty();
}

package pool.thread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * Provides TaskQueue implementation
 * It uses BlockingQueue to implement queue
 */
public class TaskQueueImpl implements TaskQueue {
 private BlockingQueue<Task> queue;

 public TaskQueueImpl(int size) {
  queue = new ArrayBlockingQueue<Task>(size);
 }

 public Task getNextJob() {
  return queue.poll();
 }

 @Override
 public void addJob(Task task) {
  try {
   queue.put(task);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
 }

 @Override
 public boolean isEmpty() {
  return queue.isEmpty();
 }
}

package pool.thread;

/**
 * Abstracts the workers/threads of the thread pool
 * Once the queue is empty; the worker kills itself
 */
public class Worker extends Thread {
 private volatile boolean isStopped;
 private TaskQueue jobQueue;
 private String workerId;
 private static int counter = 1;

 public Worker(TaskQueue jobQueue) {
  isStopped = false;
  this.jobQueue = jobQueue;
  this.workerId = "worker-" + counter++;
 }

 public void run() {
  while (!isStopped) {
   System.out.println();
   if (!this.jobQueue.isEmpty()) {
    Task task = (Task) this.jobQueue.getNextJob();
    task.run();
    System.out.println("\n Above Task, " + task.toString()
      + "; finished by worker, " + this.toString());
   }else{
     this.kill();
   }
  }
 }

 public void kill() {
  this.isStopped = true;
 }

 public void startWorker() {
  this.start();
 }

 public String toString() {
  return "id :" + this.workerId;
 }

}

package pool.thread;

import java.util.ArrayList;
import java.util.List;

/**
 * Thread pool / worker pool implementation
 * Note that - methods are synchronized to make this class thread safe
 */
public class WorkerPool {
 private List<Worker> pool;
 private int maxThreads;

 public WorkerPool(int maxThreads) {
  this.maxThreads = maxThreads;
  this.pool = new ArrayList<>(maxThreads);
 }

 public synchronized void execute(TaskQueue jq) {
  for (int i = 0; i < maxThreads; i++) {
   Worker worker = new Worker(jq);
   pool.add(worker);
   worker.startWorker();
  }
 }

 public synchronized void stop() {
  while(!pool.isEmpty()){
   for (Worker worker : pool) {
    worker.kill();
   }   
  }
 }
}

Client Code
        TaskQueue jq = new TaskQueueImpl(5);
        jq.addJob(new Task(1, 100));
        jq.addJob(new Task(101, 200));
        jq.addJob(new Task(201, 300));
        jq.addJob(new Task(301, 400));
        jq.addJob(new Task(401, 500));
 

        WorkerPool wp = new WorkerPool(2);
        wp.execute(jq);


Output
Prime Numbers in range [1-100]
 1 3 5 7 9 11 13 15 17 19 23 25 29 31 35 37 41 43 47 49 53 59 61 67 71 73 79 83 89 97
 Above Task, id:task-1; finished by worker, id :worker-2


 Prime Numbers in range [101-200]
 101 103 107 109 113 121 127 131 137 139 143 149 151 157 163 167 169 173 179 181 191 193 197 199
 Above Task, id:task-2; finished by worker, id :worker-2


 Prime Numbers in range [201-300]
 211 223 227 229 233 239 241 251 257 263 269 271 277 281 283 289 293
 Above Task, id:task-3; finished by worker, id :worker-2


 Prime Numbers in range [301-400]
 307 311 313 317 323 331 337 347 349 353 359 361 367 373 379 383 389 397
 Above Task, id:task-4; finished by worker, id :worker-2


 Prime Numbers in range [401-500]
 401 409 419 421 431 433 439 443 449 457 461 463 467 479 487 491 499
 Above Task, id:task-5; finished by worker, id :worker-2


---
keep coding !!!
do post your feedback; 
this post marks the end of a fruitful year, 2014 :)

Monday, December 29, 2014

Thread pool in Java

Thread Pool, literary means pool of thread. But, the literal  meaning is a bit confusing, as just having a pool of threads is hardly of any value. So, just remember that thread pool is not just another Object pool (like String pool). I will be talking about it in detail here!

The missing story from the literal meaning of the thread pool is that it also uses a queue to manage tasks. 
The pool of threads + Job Queue makes story complete, and hence just calling it thread pool is bit confusing and even misleading. This term is not specific to Java and used in DataBases and other programming languages as well. It's not easy to change its intended meaning or rename it to make it clearer (it existed even before Java was born and definitely going to last beyond Java!).

The fundamental idea behind an object pool is that creation and destruction (finalization or garbage collection) of any resource (read objects here, and even threads) is dearer. So, if you have a pool of readily available resources, they can be reused.

 

Task vs Thread

Most of the programmers use task and thread interchangeably and mean the same thing; that's plain WRONG! A thread executes a task.

Tasks are independent activities and don't depend on the state or side effect of other tasks. This independent nature of tasks facilitates concurrency and hence can be executed by multiple threads at the same time. Also, tasks are usually finite; they have a clear starting point and they eventually terminate. Tasks are logical unit of work, and threads are a mechanism by which tasks can be run asynchronously.

Runnable is the default task abstraction in Java. Don't get confused by trying to understand tasks and threads the way we create threads in Java; by either extending Thread class or implementing Runnable.

          Runnable task = new Runnable(){
            public void run(){
                System.out.println("inside task");
            }
        };  //end of task definition 
      
    Thread thread = new Thread(task);  //note that task goes as argument
        thread.start(); 


Above example helps to understand difference between task and thread. It clearly explains that thread takes task as argument to process/execute it. Calling start method on thread will internally call the run method declared in Runnable (of course in a separate thread). Other task abstraction in Java is Callable which got added in J2SE 5.0.

On similar lines (of task and thread), thread pool has a task queue as well.

Thread Pool

Thread Pool manages a homogeneous pool of workers or threads and it is bound to a work queue holding tasks. Workers from the pool go and pick the next task from the queue and execute them independently. Once a task is finished, the workers fetch the next task from the queue if it has one, otherwise, workers keep on waiting for next task in the queue.

Executor framework (link), added in Java 5.0 provides static factory methods in Executors to create thread pool (like newFixedThreadPool(), newCachedThreadPool(), newSingleThreadExecutor(), newScheduledThreadPool()). The Executor framework uses Runnable as its basic task representation. If you want more control, Callable is a better choice.

Let's zoom into the interfaces provided by two major components of ThreadPool.
Task Queue: Used for holding tasks in a queue so that workers can pick and execute them. The queue can be implemented as a FIFO or Priority Queue. At least, below methods should be supported by a task queue.

public interface TaskQueue {
    public Task getNextJob();
    public void addJob(Task task);
    public boolean isEmpty();
}

Worker Pool: Used for holding threads. The pool should provide an interface to execute task submitted to it. So the execute method takes tasks queue as an argument.
public interface WorkerPool {
    public void execute(TaskQueue queue);
}

Time to wind up on the fundamentals.
Next post (link), I have covered thread pool implementation in Java.


Tuesday, December 16, 2014

Know Maven Repository Path

This post explains, how you will know maven repo path in a host. This becomes even more important as in some of the cases default path (${user.home}/.m2/settings.xml) is overridden.

Checking if maven is installed is quite trivial. Just use version argument.

$mvn -version
Apache Maven 3.0.3 (r1075438; 2011-02-28 23:01:09+0530)
Maven home: /bin/maven3
Java version: 1.8.0, vendor: Oracle Corporation
Java home: /bin/jdk8/jre
Default locale: en_IN, platform encoding: UTF-8
OS name: "linux", version: "3.2.0-4-amd64", arch: "amd64", family: "unix"


Printing local repository path

Maven need to be properly installed to get the local repository path. Above command confirms it. It also prints Java home path. This means that for maven to work properly, Java home should be configured. Let's see how to print maven local repository.

$mvn -X 
or
$mvn -debug

Output of above commands will be verbose. We can use grep/egrep utility to limit the console output.

$mvn -X | grep "repository"
[DEBUG] Using local repository at /home/user/.m2/repository
[DEBUG] Using manager EnhancedLocalRepositoryManager with priority 10 for /home/user/.m2/repository

So the command prints the path of local repository. Also maven setting file named as settings.xml is usually kept inside .m2 directory.

--
References:
http://maven.apache.org/guides/introduction/introduction-to-repositories.html 
 
 


Sunday, November 16, 2014

How Garbage Collection works in JVM

Java runtime environment abstracts memory management (allocation and deallocation for object in the heap) on behalf of programmers. This allows programmers to focus more on the real work and let Java Virtual Machine (JVM) manage the lifecycle of objects. Garbage Collector is given the job of reclaiming (or collecting) memory from the objects which are no longer referenced (i.e. it frees memory from the objects which are no longer in use).

This post, I will briefly cover the overview of GC cycle and it's implication on the running application.

GC overview

When GC kicks in, it starts from the Root object and visits all the live objects. Objects which are reachable through this chain are active objects so the remaining objects are candidates for getting garbage collected. Reference counting collector and tracing collector are few popular types of GC  algorithms.

GC has below two mandates:
  1. Free up unreferenced memory so that apps allocation request for new objects can be honored without running out of memory
  2. Reclaim unreferenced memory with minimum impact to the performance (i.e. latency, throughput) of running app
If JVM doesn't run GC cycle enough, the app will run out of memory. And if it runs GC cycle too frequently, the app will loose on throughput and response time.

In short, GC is a double-edged sword.

 

What enables GC cycle

Predicting exact time of GC is incredibly difficult. JVM specification doesn't guarantee any behavior and hence all vendors are free to implement it in their own custom way. Specification just says, that, if an object needs to get allocated memory and there is not enough free memory in the heap for allocation then application should throw OutOfMemory error. I have listed below few pointers on what leads to GC:

Steps which lead to GC:
  1. Memory allocating thread doesn't find large enough consecutive memory for the object it wants to allocate
  2. JVM thread notifies to the GC of above problem
  3. GC determines if it's safe to start a GC cycle
  4. It is safe to start GC when all active application threads are at safe points(i.e. GC cycle will not cause any issue to running application)
  5. GC kicks in
Be careful that the decision making algorithm is not so trivial as discussed above, but it gives fair idea of what may lead to a GC cycle. As a programmer/administrator you can't do much (unless you use your own custom GC library). But, Java gives few technique which can be used to request GC.
Within the application (or code):
You can call below method(s) to request full garbage collection cycle.
     System.gc() or 
     Runtime.getRuntime.gc()

Out of application: 
You can request GC cycle though jconsole as shown below (jconsole provides a button to request GC)


Both approaches are one and the same. It suggests/request that, GC should try recycling unused objects in order to free the memory they currently occupy. This is just a hint or request to GC; GC has all rights to ignore the request.

References:
http://www.oracle.com/technetwork/java/javase/tech/index-jsp-140228.html

Related Articles

Tuesday, November 4, 2014

Observer Design Pattern

On a poll result day, Election Commission keeps everyone posted on trends/results !!!

Observer design pattern is a behavioral pattern. It models relationship between a Subject object and set of dependent Observer objects. This is like a publisher-subscriber model. When the subject's state changes, the observers get notified (may also be updated with new value). It models, communication between two dependent objects without tightly coupling them, so that they can be changed and reused independently. This pattern defines one-to-many relationship between subject and observers. This pattern enables to reuse Subject and Observers and design more flexible object oriented system.

Intent ( as put in Gang of Four book)
"Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically."
Subject(or Publisher) maintains list of dependent Observers(or Subscribers) and notifies them automatically of the state changes by calling one of their method.

Implementation

Let's briefly cover major components :
Subject: Stores list of observers and provides interface for modifying observers
ConcreteSubject : Implements Subject interface and sends notification to list of observers on state change
Observer: Object that should be notified of change in state
ConcreteObserver: Implements observer interface to get notified of state changes

Class Diagram (from Gang of four book)

Let's take case of Election Governing body of a democratic country. The day on which votes are counted; it keeps posted to all stakeholders like parties, new channels etc of trends and results. So in this case, Election body becomes subject and all stakeholders interested in updates and news bites play role of observers. Below code simulates this:

package observer.pattern;

public interface ElectionCommission {
 public void register(PollObservers observer);
 public void unregister(PollObservers observer);
 public void notifyObservers();
 public void setResult(Result result);
}

package observer.pattern;

import java.util.ArrayList;
import java.util.List;

public class NorthernElectionCommision implements ElectionCommission {
 private List<PollObservers> observers;
 private Result result;

 public NorthernElectionCommision() {
  observers = new ArrayList<>();
 }

 @Override
 public void register(PollObservers newObserver) {
  observers.add(newObserver);
 }

 @Override
 public void unregister(PollObservers deleteObserver) {
  int index = observers.indexOf(deleteObserver);
  observers.remove(index);
  System.out.println(" Observer " + deleteObserver
    + " removed from index " + index);
 }

 @Override
 public void notifyObservers() {
  for (PollObservers observer : observers) {
   observer.update(result);
  }
 }

 @Override
 public void setResult(Result result) {
  this.result = result;
  this.notifyObservers();
 }
}

package observer.pattern;

public interface PollObservers {
 public void update(Result result);
}

package observer.pattern;

public class ChannelX implements PollObservers {

 @Override
 public void update(Result result) {
  System.out.println(" Bingo !!! ");
  System.out.println(" Breaking news by ChannelX");

  if (result.trending) {
   System.out.println("Candidate :" + result.candidateName
     + " ; from " + result.partyName + " is leading by "
     + result.numOfVotes + " votes");
  } else if (result.won) {
   System.out.println("Candidate :" + result.candidateName
     + " ; from " + result.partyName + " won !!! ");
  }
 }

}

package observer.pattern;

public class ChannelY implements PollObservers {

 @Override
 public void update(Result result) {
  System.out.println(" !!! --- !!! ");
  System.out.println(" Breaking news by ChannelY");

  if (result.trending) {
   System.out.println(result.candidateName + " is leading by "
     + result.numOfVotes + " votes");
  } else if (result.won) {
   System.out.println(result.candidateName + "  won !!! ");
  }
 }
}

package observer.pattern;

public class Result {
 String candidateName;
 String partyName;
 boolean trending;
 boolean won;
 int numOfVotes;
 public Result(String candidateName, String partyName, boolean trending,
   boolean won, int numOfVotes) {
  super();
  this.candidateName = candidateName;
  this.partyName = partyName;
  this.trending = trending;
  this.won = won;
  this.numOfVotes = numOfVotes;
 }
 
 //note that - it's not properly encapsulated
}

package observer.pattern;

public class Client {
 public static void main(String[] args) {
  ElectionCommission ec = new NorthernElectionCommision();
  PollObservers o1 = new ChannelX();
  PollObservers o2 = new ChannelY();

  Result r1 = new Result("Ram Thakkar", "Republican", true, false, 12345);
  Result r2 = new Result("Zen", "Democrat", false, true, 21);

  System.out.println(" with observer o1");
  ec.register(o1);
  ec.setResult(r1);
  System.out.println(" ####################");

  System.out.println(" with observer o1 and o2");
  ec.register(o2);
  ec.setResult(r2);
  System.out.println(" ####################");

  System.out.println(" deleted o1 observer");
  ec.unregister(o1);
  ec.setResult(r2);
  System.out.println(" ####################");

 }
}

Output:
 with observer o1
 Bingo !!!
 Breaking news by ChannelX
Candidate :Ram Thakkar ; from Republican is leading by 12345 votes
 ####################
 with observer o1 and o2
 Bingo !!!
 Breaking news by ChannelX
Candidate :Zen ; from Democrat won !!!
 !!! --- !!!
 Breaking news by ChannelY
Zen  won !!!
 ####################
 deleted o1 observer
 Observer observer.pattern.ChannelX@135fbaa4 removed from index 0
 !!! --- !!!
 Breaking news by ChannelY
Zen  won !!!
 ####################
 

 

Notes

  • The Subject doesn't need to know any thing about Observers other than references of observers and the interface which needs to be called to pass the information. We could also store the subject-observers mapping in a map.
  • The Subject might send non-relevant updates to an Observer (especially if there are more than one subjects). To avoid this, the subject can pass some meta data like its own reference so that observers can make decisions. 
  • Make sure that the subject state is consistent before calling notify method. 
  • At one extreme is, push model where subject sends all data to the observers, irrespective of they need it or not. At other extreme is the pull model, subject send a light weight notification to all observers and then observers in return ask for details if they need it. 
  • Architectural design pattern, MVC uses observer pattern. MVC's model class acts as subject and view is like observer. If the data in model changes the corresponding view changes automatically. 

References:
http://www.dcs.bbk.ac.uk/~oded/OODP13/Sessions/Session6/Observer.pdf

Sunday, November 2, 2014

State Design Pattern

Before exam only study; after exam only play !

State pattern is a behavioral design pattern which enables an object to change its behavior when its state changes. The state transitions is encapsulated so client only sees it as the side effect in the form of changed behavior. So whenever the state changes, the state reference starts pointing to a different state object.

Intent(as put in Gang of four book)
"Allows an object to alter its behavior when its internal state changes. The object will appear to change its class. "

Implementation  

Let's briefly cover components or actors of this pattern.

Context: Interface for the client. Maintains instance of the current State (shown below in class diagram as aggregation relationship). Delegates state-specific request to a particular instance.
State: Declares an interface common to all subclasses. Subclasses provide different operational states.
ConcreteState: Implements a specific behavior

Who does state transition/switching ?
There are three players in this pattern - Context, State and ConcreteState. State provides interface for operations (it's usually implemented as either abstract class or as an interface in Java). This leaves with Context or ConcreateState. The pattern doesn't mandate which component should do state transition. So either Context or the ConcreateState can decide which state succeeds another and under what conditions.

Class diagram (from Gang of four book)


package state.pattern;

/**
 * Context : Encapsulates behavior as well as state transition
 * 
 * @author Siddheshwar
 * 
 */
public class Student {
 private StudyState state;

 public Student() {
  this.state = new AfterExam(); // default state
 }

 /**
  * client calls this generic operation always
  */
 public void performAction() {
  state.study(this);
 }

 /**
  * called by implementing states
  */
 public void changeState(StudyState state) {
  this.state = state;
 }
}

package state.pattern;

/**
 * State : Provides base behavior interface
 *
 */
public interface StudyState {
 public void study(Student student);
}

package state.pattern;

/**
 * Concrete state 1 
 *
 */
public class BeforeExam implements StudyState {

 @Override
 public void study(Student student) {
  System.out.println(" Exam time; need to study hard !");

  //switch state
  student.changeState(new AfterExam());
 }
}

package state.pattern;

/**
 * Concrete State 2
 *
 */
public class AfterExam implements StudyState {

 @Override
 public void study(Student student) {
  System.out.println(" Semester has just begun; play and party time !");
  
  //switch state
  student.changeState(new BeforeExam());
 }
}

//sample client
public static void main(String[] args) {
  Student s = new Student();
  s.performAction();
  s.performAction();
  
  s.performAction();
  s.performAction();
 }

Output:
 Semester just begun; play and party time !
 Exam time; need to study hard !
 Semester just begun; play and party time !
 Exam time; need to study hard !


Client always calls the same operation but behavior changes; evident from the above output. In above case state gets updated by concrete state sub classes. This is the reason for passing the Context instance (i.e. Student) in the behavioral method (i.e. study(..)).
State can be modified/updated in the Context class as well. So context can update the state using changeState(..) method based on some condition.

Notes

  • You should use this pattern when the behavior of an object is influenced by its internal state.  
  • Avoids inconsistent states as the state change gets controlled properly
  • State objects can be implemented as Singletons

References:
http://userpages.umbc.edu/~tarr/dp/lectures/StateStrategy.pdf

Monday, October 27, 2014

Solving The Producer Consumer Problem in Java

Producer and Consumer solves one of the standard multithreaded ( or concurrency) problem via inter process communication mechanism. Let's try to understand the problem first -

Problem

There are two mutually exclusive processes ( or rather say threads ); one produces something (i.e Producer) and the other one consumes the same (i.e. Consumer). And to handle these two activities it uses a fixed sized buffer or queue. And both work independently. So, obviously there is a problem due to fixed size of the buffer; Producer can't keep adding for forever and similarly consumer can't keep taking endlessly. So how will you ensure that producer doesn't add data in queue if it's full; and consumer doesn't remove data if it's empty.
Also, there could be multiple producers and consumers.

Here is Wikipedia link on same.This is also known as Bounded-Buffer problem.

Solution

If the buffer is full producer can't add more items and if buffer is empty consumer can't take items from it. So to synchronize these two processes; block the producer when buffer is full and similarly block the consumer when the buffer is empty.

This can be solved using Semaphore and Monitor technique. Java also provides a Queue API which supports these features; BlockingQueue.


I am not covering more details on the methods provided by BlockingQueue. You can find that out from the Java doc page. So better take an example BlockingQueue to understand it in detail.


package queue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * Blocking queue with 2 publishers and 1 subscriber
 * @author Sid
 *
 */
public class BlockingQueueDemo {
 private BlockingQueue<String> queue;
 private volatile static int count = 0;

 public BlockingQueueDemo(int size) {
  queue = new ArrayBlockingQueue<>(size);
 }

 public Runnable publisher1() {
  return new Runnable() {
   public void run() {
    while (!Thread.interrupted()) {
     try {
      // blocks if queue is full
      queue.put("p1:" + count++);
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    }
   }
  };
 }

 public Runnable publisher2() {
  return new Runnable() {
   public void run() {
    while (!Thread.interrupted()) {
     try {
      // blocks if queue is full
      queue.put("p2:" + count++);
      TimeUnit.MILLISECONDS.sleep(5);
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    }
   }
  };
 }

 public Runnable subscriber1() {
  return new Runnable() {
   public void run() {
    while (!Thread.interrupted()) {
     try {
      // blocks until queue is empty
      System.out.println("Element: "+queue.take() + " ;Size: "+ queue.size());
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    }
   }
  };

 }

 public static void main(String[] args) {
  BlockingQueueDemo bqd = new BlockingQueueDemo(100);

  // though subscriber starts first; but waits until some element has been
  // put in the queue
  Thread sub = new Thread(bqd.subscriber1());
  sub.start();

  // start publisher
  Runnable p1 = bqd.publisher1();
  Runnable p2 = bqd.publisher2();
  Thread tp1 = new Thread(p1);
  Thread tp2 = new Thread(p2);
  tp1.start();
  tp2.start();

  try {
   TimeUnit.SECONDS.sleep(1);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
  tp1.interrupt();
  tp2.interrupt();
  sub.interrupt();
  System.out.println(" elements in queue :" + bqd.queue.size());
 }

}

In wait/notify post, I have provided a simplistic implementation of blocking queue.

---
keep coding !