Saturday, July 6, 2013

Executor Framework in Java

Java 5 introduced a new framework to abstract thread management from the rest of the application. These set of API/objects are popularly known as Executors or Executor framework. Managing thread means; controlling its life cycle, its usage, and its scheduling etc. So through Executors, you don't need to explicitly create threads. Thread creation and management will be taken care of by Executors. As a developer, if you are scared of threads; then Bingo; Executors will make your life simpler and cooler!

Executor APIs

Below class diagram shows major classes which are part of Executor Framework. I have removed attributes/operations from class diagram to make it look neater and lighter. I want to stress more on their relationship; you can always look for details through Java doc.  Let's go briefly through important classes : 

Class Diagram

Executors: This is a utility class which acts as a factory for creating different types of executors which are shown in the above class diagram. Using this you can create an instance of Executor, ExecutorService, ScheduledExecutorService, ThreadFactory and Callable classes.

import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

public class ExecutorSample {

 public static void main(String[] args) {

  Runnable task = new Runnable() {
   public void run() {

   }
  };

  Callable<Object> c = Executors.callable(task);

  Executor ex = Executors.newSingleThreadExecutor();

  ExecutorService exs = Executors.newSingleThreadExecutor();

  ScheduledExecutorService sexs = Executors
    .newSingleThreadScheduledExecutor();
 }
}
   
Executor: This interface has a single method (execute) to execute a Runnable task. So it decouples task submission and execution. As shown below, execute method is equivalent to running a thread in the more traditional way by creating an instance of Thread class.

 //Executor ex = Executors.newSingleThreadExecutor();

import java.util.concurrent.Executor;

public class ExecutorSample {

 public static void main(String[] args) {
  Runnable task = new Runnable() {
   public void run() {
    System.out.println("inside task");
   }
  };

  Executor es = new Executor() {
   @Override
   public void execute(Runnable task) {
    (new Thread(task)).start();
    task.run();
   }
  };
  es.execute(task);
 }
}


ExecutorService: It supplements base class (Executor) by adding more methods to control life cycle. It provides a more versatile submit method which can take Runnable as well as Callable task type. Submit method returns a Future object, which can be used to return the value of Callable task and it can also be used to manage the status. ExecutorService could be in either of three states: running, shutting down, and terminated. The shutdown method initiates a graceful shutdown: no new tasks are accepted but previously submitted tasks are allowed to complete (including those that have not yet begun execution). The shutdownNow method initiates an abrupt shutdown: it attempts to cancel outstanding task and doesn't start any task that is queued but not begun.
ExecuterService es = Executors.xx();
while(!es.isShutdown()){
      //task execution
}
Below class shows invokeAll method example:

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * Demo of invokeAll method; It can any collection of callable tasks.
 * 
 * @author Sid
 * 
 */
public class InvokeAllDemo {
 public static void main(String[] args) {
  ExecutorService executorService = Executors.newSingleThreadExecutor();
  Set<Callable<String>> callables = new HashSet<Callable<String>>();

  callables.add(new Callable<String>() {
   public String call() throws Exception {
    return "Task 1";
   }
  });
  callables.add(new Callable<String>() {
   public String call() throws Exception {
    return "Task 2";
   }
  });

  try {
   List<Future<String>> futures = executorService.invokeAll(callables);
   executorService.shutdown();

   for (Future<String> future : futures) {
    System.out.println("Response = " + future.get());
   }

  } catch (InterruptedException e) {
   e.printStackTrace();
  } catch (ExecutionException e) {
   e.printStackTrace();
  }
  executorService.shutdown();
 }
}

Output:
Response = Task 1
Response = Task 2


ScheduledExecutorService: This interface extends further and adds methods related to scheduling. With this, you can run your Runnable/Callable task after a fixed delay. So it can be used to execute a specified task repeatedly at the fixed interval.

Thread Pools

Managing the thread life cycle is expensive so one of the reason behind this framework is to abstract it. In complicated or real-life applications, allocating and deallocating memory to multiple threads become even more complicated (compared to a single thread). So to handle this, Java introduced the concept of thread pools. 

Thread pool consists of worker threads. Tasks are submitted to a thread pool via a queue. Worker threads have a simple life : request new task from a work queue, execute it, and go back to pool for assignment to next task. Executors class provides the factory method to create thread pools. Let's cover them :

Executors.newFixedThreadPool(10): This is one of the most common types of thread pool. This type of pool always has a specified number of threads running. If a thread from the pool is terminated due to some reason then it automatically gets replaced with a new thread. 

Executors.newCachedThreadPool(): Creates an expandable thread pool which reuses previously created threads if available. These pools will improve the performance of the program that creates many short-lived asynchronous tasks. Threads that have been used for 60 seconds are terminated and removed from the cache. Thus if pool remains idle for longer time will not consume any resource.

Executors.newSingleThreadExecutor() : Creates a single-threaded executor. It's a single worker thread to process tasks, and replaces it if it dies due to some reason/error. So using this, tasks will be executed sequentially. So if you have a queue having 10 tasks, then tasks will get executed one after another depending on the order.


Executors.newScheduledThreadPool() : Creates a fixed size thread pool that supports delayed and periodic task execution. 

Executors also provides dedicated classes for creating thread pools specific to your needs if methods provided through Executors are not enough. These classes are ThreadPoolExecutor, ScheduledThreadPoolExecutor  and ForkJoinPool as shown in above class diagram.

--
do post your comments/questions !!!