Tuesday, December 30, 2014

Implementing Thread Pool in Java

In the previous post, I discussed Thread Pool fundamentals. I strongly believe such confusing concepts should be best explained through code; so here goes this dedicated post!

This post has a prototype thread pool implementation in Java. I have abstracted Runnable into a Task and Thread into worker to avoid the confusion. It also serves the purpose of clearly differentiating task and thread.

Implementation

Classes can be classified broadly in below two categories.
Job Queue: Task, TaskQueue, and TaskQueueImpl
Worker/Thread Pool: Worker and WorkerPool

Job queue component is about tasks and their storage in the queue. Worker pool manages lifecycle of threads/workers.


package pool.thread;

/**
 * Abstracts task; Notice it implements Runnable.
 * It finds all prime numbers between two values (i.e. start and end)
 * 
 */
public class Task implements Runnable {
 private static int counter = 1;
 private String taskId;
 private int start;
 private int end;

 public Task(int start, int end) {
  this.taskId = "task-" + counter++;
  this.start = start;
  this.end = end;
 }

 @Override
 public void run() {
  System.out.println(" \n Prime Numbers in range [" + this.start + "-"
    + this.end + "]");
  while (start <= end) {
   if (isPrime(start)) {
    System.out.print(" " + start);
   }
   start++;
  }
 }

 private boolean isPrime(int num) {
  if (num > 1 && num % 2 == 0) {
   return false;
  }
  int sqrRoot = (int) Math.sqrt(num);
  for (int i = 3; i < sqrRoot; i += 2) {
   if (num % i == 0) {
    return false;
   }
  }
  return true;
 }

 public String toString() {
  return "id:" + this.taskId;
 }
}

package pool.thread;

/**
 * Interfaces supported by Task queue
 */
public interface TaskQueue {
 public Task getNextJob();
 public void addJob(Task task);
 public boolean isEmpty();
}

package pool.thread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * Provides TaskQueue implementation
 * It uses BlockingQueue to implement queue
 */
public class TaskQueueImpl implements TaskQueue {
 private BlockingQueue<Task> queue;

 public TaskQueueImpl(int size) {
  queue = new ArrayBlockingQueue<Task>(size);
 }

 public Task getNextJob() {
  return queue.poll();
 }

 @Override
 public void addJob(Task task) {
  try {
   queue.put(task);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
 }

 @Override
 public boolean isEmpty() {
  return queue.isEmpty();
 }
}

package pool.thread;

/**
 * Abstracts the workers/threads of the thread pool
 * Once the queue is empty; the worker kills itself
 */
public class Worker extends Thread {
 private volatile boolean isStopped;
 private TaskQueue jobQueue;
 private String workerId;
 private static int counter = 1;

 public Worker(TaskQueue jobQueue) {
  isStopped = false;
  this.jobQueue = jobQueue;
  this.workerId = "worker-" + counter++;
 }

 public void run() {
  while (!isStopped) {
   System.out.println();
   if (!this.jobQueue.isEmpty()) {
    Task task = (Task) this.jobQueue.getNextJob();
    task.run();
    System.out.println("\n Above Task, " + task.toString()
      + "; finished by worker, " + this.toString());
   }else{
     this.kill();
   }
  }
 }

 public void kill() {
  this.isStopped = true;
 }

 public void startWorker() {
  this.start();
 }

 public String toString() {
  return "id :" + this.workerId;
 }

}

package pool.thread;

import java.util.ArrayList;
import java.util.List;

/**
 * Thread pool / worker pool implementation
 * Note that - methods are synchronized to make this class thread safe
 */
public class WorkerPool {
 private List<Worker> pool;
 private int maxThreads;

 public WorkerPool(int maxThreads) {
  this.maxThreads = maxThreads;
  this.pool = new ArrayList<>(maxThreads);
 }

 public synchronized void execute(TaskQueue jq) {
  for (int i = 0; i < maxThreads; i++) {
   Worker worker = new Worker(jq);
   pool.add(worker);
   worker.startWorker();
  }
 }

 public synchronized void stop() {
  while(!pool.isEmpty()){
   for (Worker worker : pool) {
    worker.kill();
   }   
  }
 }
}

Client Code
        TaskQueue jq = new TaskQueueImpl(5);
        jq.addJob(new Task(1, 100));
        jq.addJob(new Task(101, 200));
        jq.addJob(new Task(201, 300));
        jq.addJob(new Task(301, 400));
        jq.addJob(new Task(401, 500));
 

        WorkerPool wp = new WorkerPool(2);
        wp.execute(jq);


Output
Prime Numbers in range [1-100]
 1 3 5 7 9 11 13 15 17 19 23 25 29 31 35 37 41 43 47 49 53 59 61 67 71 73 79 83 89 97
 Above Task, id:task-1; finished by worker, id :worker-2


 Prime Numbers in range [101-200]
 101 103 107 109 113 121 127 131 137 139 143 149 151 157 163 167 169 173 179 181 191 193 197 199
 Above Task, id:task-2; finished by worker, id :worker-2


 Prime Numbers in range [201-300]
 211 223 227 229 233 239 241 251 257 263 269 271 277 281 283 289 293
 Above Task, id:task-3; finished by worker, id :worker-2


 Prime Numbers in range [301-400]
 307 311 313 317 323 331 337 347 349 353 359 361 367 373 379 383 389 397
 Above Task, id:task-4; finished by worker, id :worker-2


 Prime Numbers in range [401-500]
 401 409 419 421 431 433 439 443 449 457 461 463 467 479 487 491 499
 Above Task, id:task-5; finished by worker, id :worker-2


---
keep coding !!!
do post your feedback; 
this post marks the end of a fruitful year, 2014 :)

Monday, December 29, 2014

Thread pool in Java

Thread Pool, literary means pool of thread. But, the literal  meaning is a bit confusing, as just having a pool of threads is hardly of any value. So, just remember that thread pool is not just another Object pool (like String pool). I will be talking about it in detail here!

The missing story from the literal meaning of the thread pool is that it also uses a queue to manage tasks. 
The pool of threads + Job Queue makes story complete, and hence just calling it thread pool is bit confusing and even misleading. This term is not specific to Java and used in DataBases and other programming languages as well. It's not easy to change its intended meaning or rename it to make it clearer (it existed even before Java was born and definitely going to last beyond Java!).

The fundamental idea behind an object pool is that creation and destruction (finalization or garbage collection) of any resource (read objects here, and even threads) is dearer. So, if you have a pool of readily available resources, they can be reused.

 

Task vs Thread

Most of the programmers use task and thread interchangeably and mean the same thing; that's plain WRONG! A thread executes a task.

Tasks are independent activities and don't depend on the state or side effect of other tasks. This independent nature of tasks facilitates concurrency and hence can be executed by multiple threads at the same time. Also, tasks are usually finite; they have a clear starting point and they eventually terminate. Tasks are logical unit of work, and threads are a mechanism by which tasks can be run asynchronously.

Runnable is the default task abstraction in Java. Don't get confused by trying to understand tasks and threads the way we create threads in Java; by either extending Thread class or implementing Runnable.

          Runnable task = new Runnable(){
            public void run(){
                System.out.println("inside task");
            }
        };  //end of task definition 
      
    Thread thread = new Thread(task);  //note that task goes as argument
        thread.start(); 


Above example helps to understand difference between task and thread. It clearly explains that thread takes task as argument to process/execute it. Calling start method on thread will internally call the run method declared in Runnable (of course in a separate thread). Other task abstraction in Java is Callable which got added in J2SE 5.0.

On similar lines (of task and thread), thread pool has a task queue as well.

Thread Pool

Thread Pool manages a homogeneous pool of workers or threads and it is bound to a work queue holding tasks. Workers from the pool go and pick the next task from the queue and execute them independently. Once a task is finished, the workers fetch the next task from the queue if it has one, otherwise, workers keep on waiting for next task in the queue.

Executor framework (link), added in Java 5.0 provides static factory methods in Executors to create thread pool (like newFixedThreadPool(), newCachedThreadPool(), newSingleThreadExecutor(), newScheduledThreadPool()). The Executor framework uses Runnable as its basic task representation. If you want more control, Callable is a better choice.

Let's zoom into the interfaces provided by two major components of ThreadPool.
Task Queue: Used for holding tasks in a queue so that workers can pick and execute them. The queue can be implemented as a FIFO or Priority Queue. At least, below methods should be supported by a task queue.

public interface TaskQueue {
    public Task getNextJob();
    public void addJob(Task task);
    public boolean isEmpty();
}

Worker Pool: Used for holding threads. The pool should provide an interface to execute task submitted to it. So the execute method takes tasks queue as an argument.
public interface WorkerPool {
    public void execute(TaskQueue queue);
}

Time to wind up on the fundamentals.
Next post (link), I have covered thread pool implementation in Java.


Tuesday, December 16, 2014

Know Maven Repository Path

This post explains, how you will know maven repo path in a host. This becomes even more important as in some of the cases default path (${user.home}/.m2/settings.xml) is overridden.

Checking if maven is installed is quite trivial. Just use version argument.

$mvn -version
Apache Maven 3.0.3 (r1075438; 2011-02-28 23:01:09+0530)
Maven home: /bin/maven3
Java version: 1.8.0, vendor: Oracle Corporation
Java home: /bin/jdk8/jre
Default locale: en_IN, platform encoding: UTF-8
OS name: "linux", version: "3.2.0-4-amd64", arch: "amd64", family: "unix"


Printing local repository path

Maven need to be properly installed to get the local repository path. Above command confirms it. It also prints Java home path. This means that for maven to work properly, Java home should be configured. Let's see how to print maven local repository.

$mvn -X 
or
$mvn -debug

Output of above commands will be verbose. We can use grep/egrep utility to limit the console output.

$mvn -X | grep "repository"
[DEBUG] Using local repository at /home/user/.m2/repository
[DEBUG] Using manager EnhancedLocalRepositoryManager with priority 10 for /home/user/.m2/repository

So the command prints the path of local repository. Also maven setting file named as settings.xml is usually kept inside .m2 directory.

--
References:
http://maven.apache.org/guides/introduction/introduction-to-repositories.html