Tuesday, November 19, 2013

Fun with java's ExecutorService

It's surprisingly difficult to find a decent example of the java Executor framework that explains some of the API's "gotchas". It took me a while way back then when I was trying to figure it out, so I decided to post a hopefully useful example. In the code below, I use an executor service to execute some work in parallel. This is the easiest way to exploit simple parallelism in java 7 (java 8 introduces parallel streams which make things even simpler). The executor service is created as a fixed thread pool using a convenient static factory method available in the Executors class. It's very instructive to look at the javadoc for this and other static factory methods available in this class, and even the source code if you are feeling adventurous, to understand how the thread pools are created and what additional options are available to customize them.

The testSomeWorkers() method creates a few tasks (instances of Callable), invokes them using the executor service and retrieves their results using the corresponding Futures. The helper method addSomeTasks() simply calls the submit() method on the executor service to submit the n tasks and retrieve the corresponding future. The future will return the value returned by the Callable once the execution is over.

The testJobCanceling() method works pretty much like testSomeWorkers() but it shows how the tasks can be cancelled by using the Future.cancel() method. The cancel() method's only argument specifies whether or not the thread in which the job is running should be interrupted. In the Callable.call() function, invoking Thread.sleep() can potentially throw an InterruptedException. This will happen when we call cancel() on the future associated with this callable and thus the exception is handled by returning immediately. It's a common idiom in java to handle interruptions in this fashion to achieve some control over concurrent workers.

Enjoy the code:

 import java.util.*;  
 import java.util.concurrent.*;  
 class ExecutorServiceExample {  
   public static void main(String[] args) {  
     Scanner in = new Scanner(System.in);  
     final int nThreads = in.nextInt();  
     final int n = in.nextInt();  
     System.out.println("Using " + nThreads + " threads");  
     ExecutorService service = Executors.newFixedThreadPool(nThreads);  
     try {  
       testSomeWorkers(service, n);  
     } catch (InterruptedException | ExecutionException e) {  
       e.printStackTrace();  
     }  
     try {  
       testJobCanceling(service, n);  
     } catch (InterruptedException | ExecutionException e) {  
       e.printStackTrace();  
     }  
     // necessary or the thread pool will keep the JVM up and running!  
     service.shutdown();  
   }  
   public static void testSomeWorkers(ExecutorService service, int n) throws InterruptedException, ExecutionException {  
     // create and invoke some "tasks" on this executor service  
     Collection<Future<Integer>> taskFutures = addSomeTasks(service, n);  
     System.out.println("Waiting for all tasks to complete...");  
     List<Integer> ret = new ArrayList<>();  
     // retrieve the result of the tasks' computation  
     for (Future<Integer> f : taskFutures) ret.add(f.get());  
     System.out.println("Values returned from computations: " + ret);  
     System.out.println("All done.");  
   }  
   public static void testJobCanceling(ExecutorService service, int n) throws InterruptedException, ExecutionException {  
     Collection<Future<Integer>> taskFutures = addSomeTasks(service, n);  
     Thread.sleep(1000);  
     System.out.println("Actually nevermind!");  
     List<Integer> completed = new ArrayList<>();  
     List<Future<Integer>> cancelled = new ArrayList<>();  
     // try to cancel the tasks that are running  
     for (Future<Integer> f : taskFutures) {  
       // if successfully cancel add to the cancelled list  
       if (f.cancel(true)) cancelled.add(f);  
         // otherwise get the result  
       else completed.add(f.get());  
     }  
     System.out.println("" + cancelled.size() + " tasks were successfully cancelled");  
     if (!completed.isEmpty()) System.out.println("Values returned from computations: " + completed);  
     System.out.println("All done.");  
   }  
   private static Collection<Future<Integer>> addSomeTasks(ExecutorService service, int howMany) {  
     System.out.println("Enqueuing " + howMany + " tasks...");  
     List<Future<Integer>> ret = new ArrayList<>();  
     for (int i = 0; i < howMany; i++) {  
       final int n = i;  
       ret.add(service.submit(new Callable<Integer>() {  
         @Override  
         public Integer call() {  
           try {  
             try {  
               System.out.println("Task " + n + ": Doing some very important work...");  
               Thread.sleep(200 + rnd.nextInt(200));  
             } catch (InterruptedException e) {  
               System.out.println("Task " + n + " interrupted while doing very important work");  
               return null;  
             }  
             try {  
               System.out.println("Task " + n + ": Doing more important work...");  
               Thread.sleep(200 + rnd.nextInt(200));  
             } catch (InterruptedException e) {  
               System.out.println("Task " + n + " interrupted while doing important work");  
               return null;  
             }  
             try {  
               System.out.println("Task " + n + ": Doing slightly less important work...");  
               Thread.sleep(200 + rnd.nextInt(200));  
             } catch (InterruptedException e) {  
               System.out.println("Task " + n + " interrupted while doing slightly less important work");  
               return null;  
             }  
             return rnd.nextInt();  
           } finally {  
             System.out.println("Cleaning up after task " + n);  
           }  
         }  
       }));  
     }  
     return ret;  
   }  
   private final static Random rnd = new Random();  
 }  

No comments:

Post a Comment