Tuesday, December 30, 2014

Implementing Thread Pool in Java

In the previous post, I discussed Thread Pool fundamentals. I strongly believe such confusing concepts should be best explained through code; so here goes this dedicated post!

This post has a prototype thread pool implementation in Java. I have abstracted Runnable into a Task and Thread into worker to avoid the confusion. It also serves the purpose of clearly differentiating task and thread.

Implementation

Classes can be classified broadly in below two categories.
Job Queue: Task, TaskQueue, and TaskQueueImpl
Worker/Thread Pool: Worker and WorkerPool

Job queue component is about tasks and their storage in the queue. Worker pool manages lifecycle of threads/workers.


package pool.thread;

/**
 * Abstracts task; Notice it implements Runnable.
 * It finds all prime numbers between two values (i.e. start and end)
 * 
 */
public class Task implements Runnable {
 private static int counter = 1;
 private String taskId;
 private int start;
 private int end;

 public Task(int start, int end) {
  this.taskId = "task-" + counter++;
  this.start = start;
  this.end = end;
 }

 @Override
 public void run() {
  System.out.println(" \n Prime Numbers in range [" + this.start + "-"
    + this.end + "]");
  while (start <= end) {
   if (isPrime(start)) {
    System.out.print(" " + start);
   }
   start++;
  }
 }

 private boolean isPrime(int num) {
  if (num > 1 && num % 2 == 0) {
   return false;
  }
  int sqrRoot = (int) Math.sqrt(num);
  for (int i = 3; i < sqrRoot; i += 2) {
   if (num % i == 0) {
    return false;
   }
  }
  return true;
 }

 public String toString() {
  return "id:" + this.taskId;
 }
}

package pool.thread;

/**
 * Interfaces supported by Task queue
 */
public interface TaskQueue {
 public Task getNextJob();
 public void addJob(Task task);
 public boolean isEmpty();
}

package pool.thread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * Provides TaskQueue implementation
 * It uses BlockingQueue to implement queue
 */
public class TaskQueueImpl implements TaskQueue {
 private BlockingQueue<Task> queue;

 public TaskQueueImpl(int size) {
  queue = new ArrayBlockingQueue<Task>(size);
 }

 public Task getNextJob() {
  return queue.poll();
 }

 @Override
 public void addJob(Task task) {
  try {
   queue.put(task);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
 }

 @Override
 public boolean isEmpty() {
  return queue.isEmpty();
 }
}

package pool.thread;

/**
 * Abstracts the workers/threads of the thread pool
 * Once the queue is empty; the worker kills itself
 */
public class Worker extends Thread {
 private volatile boolean isStopped;
 private TaskQueue jobQueue;
 private String workerId;
 private static int counter = 1;

 public Worker(TaskQueue jobQueue) {
  isStopped = false;
  this.jobQueue = jobQueue;
  this.workerId = "worker-" + counter++;
 }

 public void run() {
  while (!isStopped) {
   System.out.println();
   if (!this.jobQueue.isEmpty()) {
    Task task = (Task) this.jobQueue.getNextJob();
    task.run();
    System.out.println("\n Above Task, " + task.toString()
      + "; finished by worker, " + this.toString());
   }else{
     this.kill();
   }
  }
 }

 public void kill() {
  this.isStopped = true;
 }

 public void startWorker() {
  this.start();
 }

 public String toString() {
  return "id :" + this.workerId;
 }

}

package pool.thread;

import java.util.ArrayList;
import java.util.List;

/**
 * Thread pool / worker pool implementation
 * Note that - methods are synchronized to make this class thread safe
 */
public class WorkerPool {
 private List<Worker> pool;
 private int maxThreads;

 public WorkerPool(int maxThreads) {
  this.maxThreads = maxThreads;
  this.pool = new ArrayList<>(maxThreads);
 }

 public synchronized void execute(TaskQueue jq) {
  for (int i = 0; i < maxThreads; i++) {
   Worker worker = new Worker(jq);
   pool.add(worker);
   worker.startWorker();
  }
 }

 public synchronized void stop() {
  while(!pool.isEmpty()){
   for (Worker worker : pool) {
    worker.kill();
   }   
  }
 }
}

Client Code
        TaskQueue jq = new TaskQueueImpl(5);
        jq.addJob(new Task(1, 100));
        jq.addJob(new Task(101, 200));
        jq.addJob(new Task(201, 300));
        jq.addJob(new Task(301, 400));
        jq.addJob(new Task(401, 500));
 

        WorkerPool wp = new WorkerPool(2);
        wp.execute(jq);


Output
Prime Numbers in range [1-100]
 1 3 5 7 9 11 13 15 17 19 23 25 29 31 35 37 41 43 47 49 53 59 61 67 71 73 79 83 89 97
 Above Task, id:task-1; finished by worker, id :worker-2


 Prime Numbers in range [101-200]
 101 103 107 109 113 121 127 131 137 139 143 149 151 157 163 167 169 173 179 181 191 193 197 199
 Above Task, id:task-2; finished by worker, id :worker-2


 Prime Numbers in range [201-300]
 211 223 227 229 233 239 241 251 257 263 269 271 277 281 283 289 293
 Above Task, id:task-3; finished by worker, id :worker-2


 Prime Numbers in range [301-400]
 307 311 313 317 323 331 337 347 349 353 359 361 367 373 379 383 389 397
 Above Task, id:task-4; finished by worker, id :worker-2


 Prime Numbers in range [401-500]
 401 409 419 421 431 433 439 443 449 457 461 463 467 479 487 491 499
 Above Task, id:task-5; finished by worker, id :worker-2


---
keep coding !!!
do post your feedback; 
this post marks the end of a fruitful year, 2014 :)

No comments:

Post a Comment