Tuesday, November 26, 2013

Fun with Guava's ListeningExecutorService

As a follow up to my previous blog post, I decided to rewrite the code sample using the more advance Google Guava ListeningExecutorService and ListenableFuture API, so here it is:

 import com.google.common.util.concurrent.Futures;  
 import com.google.common.util.concurrent.ListenableFuture;  
 import com.google.common.util.concurrent.ListeningExecutorService;  
 import com.google.common.util.concurrent.MoreExecutors;  
 import java.util.ArrayList;  
 import java.util.List;  
 import java.util.Random;  
 import java.util.Scanner;  
 import java.util.concurrent.Callable;  
 import java.util.concurrent.ExecutionException;  
 import java.util.concurrent.Executors;  

 class ListenableFutureExample {  
   public static void main(String[] args) {  
     Scanner in = new Scanner(System.in);  
     final int nThreads = in.nextInt();  
     final int n = in.nextInt();  
     System.out.println("Using " + nThreads + " threads");  
     ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(nThreads));  
     try {  
       try {  
         testSomeWorkers(service, n);  
       } catch (InterruptedException | ExecutionException e) {  
         e.printStackTrace();  
       }  
       try {  
         testJobCanceling(service, n);  
       } catch (InterruptedException | ExecutionException e) {  
         e.printStackTrace();  
       }  
     } finally {  
       // necessary or the thread pool will keep the JVM up and running!  
       service.shutdown();  
     }  
   }  

   public static void testSomeWorkers(ListeningExecutorService service, int n) throws InterruptedException, ExecutionException {  
     // using the Guava's utility method allAsList will return all the results in a future list  
     // enormously simplifying the code:  
     ListenableFuture<List<Integer>> ret = Futures.successfulAsList(addSomeTasks(service, n));  
     // the call to get() is now the blocking piece of code  
     System.out.println("Values returned from computations: " + ret.get());  
     System.out.println("All done.");  
   }  

   public static void testJobCanceling(ListeningExecutorService service, int n) throws InterruptedException, ExecutionException {  
     List<ListenableFuture<Integer>> tasks = addSomeTasks(service, n);  
     ListenableFuture<List<Integer>> ret = Futures.allAsList(tasks);  
     Thread.sleep(1000);  
     System.out.println("Actually nevermind!");  
     ret.cancel(true);  
     // let's see how many tasks were actually cancelled by asking the original futures:  
     List<Integer> completed = new ArrayList<>();  
     for (ListenableFuture<Integer> f : tasks) if (!f.isCancelled()) completed.add(f.get());  
     System.out.println("There were " + (n - completed.size()) + " cancelled tasks and " + completed.size() + " completed tasks: " + completed);  
     System.out.println("All done.");  
   }  

   private static List<ListenableFuture<Integer>> addSomeTasks(ListeningExecutorService service, int howMany) {  
     System.out.println("Enqueuing " + howMany + " tasks...");  
     List<ListenableFuture<Integer>> ret = new ArrayList<>();  
     for (int i = 1; i <= howMany; i++) {  
       final int n = i;  
       ret.add(service.submit(new Callable<Integer>() {  
         @Override  
         public Integer call() {  
           try {  
             try {  
               System.out.println("Task " + n + ": Doing some very important work...");  
               Thread.sleep(200 + rnd.nextInt(200));  
             } catch (InterruptedException e) {  
               System.out.println("Task " + n + " interrupted while doing very important work");  
               return null;  
             }  
             try {  
               System.out.println("Task " + n + ": Doing more important work...");  
               Thread.sleep(200 + rnd.nextInt(200));  
             } catch (InterruptedException e) {  
               System.out.println("Task " + n + " interrupted while doing important work");  
               return null;  
             }  
             try {  
               System.out.println("Task " + n + ": Doing slightly less important work...");  
               Thread.sleep(200 + rnd.nextInt(200));  
             } catch (InterruptedException e) {  
               System.out.println("Task " + n + " interrupted while doing slightly less important work");  
               return null;  
             }  
             int ret = rnd.nextInt();  
             System.out.println("Task " + n + ": about to return " + ret);  
             return ret;  
           } finally {  
             System.out.println("Task " + n + ": cleaning up");  
           }  
         }  
       }));  
     }  
     return ret;  
   }  
   private final static Random rnd = new Random();  
 }  


The Guava API has the advantage to allow one to register a callback to a ListenableFuture and apply transformations to futures, resulting in a chain of non-blocking operations, very much like Scala's Futures. Non-blocking concurrency will greatly limit resource usage, if used properly since there will be no idle threads blocking while waiting on the results of other threads' computations.

Just for illustration purposes, an example of using Guava's future transformation capabilities to implement fully non-blocking asynchronous computations follows. Enjoy!


 import com.google.common.base.Function;  
 import com.google.common.base.Optional;  
 import com.google.common.util.concurrent.Futures;  
 import com.google.common.util.concurrent.ListenableFuture;  
 import com.google.common.util.concurrent.ListeningExecutorService;  
 import com.google.common.util.concurrent.MoreExecutors;  
 import java.math.BigInteger;  
 import java.util.ArrayList;  
 import java.util.List;  
 import java.util.Random;  
 import java.util.concurrent.Callable;  
 import java.util.concurrent.ExecutionException;  
 import java.util.concurrent.Executors;  

 class ListenableFutureChain {  
   public static void main(String[] args) {  
     ListenableFutureChain chain = new ListenableFutureChain(4);  
     // let's find a few 512 bit prime numbers for our awesome encryption algorithm!  
     ListenableFuture<List<BigInteger>> probablePrimes = chain.findSomePrimeNumbers(20, 512);  
     // now finally do something with the future prime list  
     try {  
       // WARNING: this call is blocking, for illustration purposes only.  
       // It's recommended to design so that you don't need to do this,  
       // as in the function findSomePrimeNumbers()  
       for (BigInteger i : probablePrimes.get()) System.out.println(i);  
     } catch (InterruptedException | ExecutionException e) {  
       e.printStackTrace();  
     }  
     // remember to call this or the executor service will keep the JVM "awake"!  
     chain.dispose();  
   }  

   public ListenableFutureChain(final int nThreads) {  
     executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(nThreads));  
   }  

   public void dispose() {  
     executorService.shutdown();  
   }  

   private final ListeningExecutorService executorService;  
   private final Random random = new Random();  

   public ListenableFuture<List<BigInteger>> findSomePrimeNumbers(final int nAttempts, final int nBits) {  
     List<ListenableFuture<Optional<BigInteger>>> probablePrimes = new ArrayList<>(nAttempts);  
     for (int i = 0; i < nAttempts; i++) {  
       // submit a task for execution and retrieve the ListenableFuture  
       ListenableFuture<BigInteger> probablePrimeFuture = executorService.submit(new Callable<BigInteger>() {  
         @Override  
         public BigInteger call() throws Exception {  
           // I'm going to find a probable prime number  
           return BigInteger.probablePrime(nBits, random);  
         }  
       });  
       // transform the previous ListenableFuture using a function; returns another ListenableFuture (non blocking operation)  
       ListenableFuture<Optional<BigInteger>> primeOrNot = Futures.transform(probablePrimeFuture, new Function<BigInteger, Optional<BigInteger>>() {  
         @Override  
         public Optional<BigInteger> apply(BigInteger p) {  
           // I'm going to return only the probable primes that are actually prime  
           if (isPrime(p)) return Optional.of(p);  
           return Optional.absent();  
         }  
       }, executorService);  
       // add the second future to a list  
       probablePrimes.add(primeOrNot);  
     }  
     // transform the list of futures to a future of list (Guava magic!), only retain successful futures (again, non blocking!)  
     ListenableFuture<List<Optional<BigInteger>>> primesOrNoValues = Futures.successfulAsList(probablePrimes);  
     // transform the future list to a future list containing only the prime numbers in question and return this future (still non blocking)  
     return Futures.transform(primesOrNoValues, new Function<List<Optional<BigInteger>>, List<BigInteger>>() {  
       @Override  
       public List<BigInteger> apply(List<Optional<BigInteger>> primes) {  
         List<BigInteger> ret = new ArrayList<>(primes.size());  
         //  
         for (Optional<BigInteger> optional : primes)  
           if (optional != null && optional.isPresent()) ret.add(optional.get());  
         return ret;  
       }  
     }, executorService);  
     // Note that this whole function is non blocking; you can tell by the fact that there's no InterruptedException being thrown anywhere.  
   }  

   private boolean isPrime(final BigInteger p) {  
     // TODO: do some fancy primality test! (note that this would take a while in real life)  
     // let's just return true or false randomly for now... ;)  
     return random.nextBoolean();  
   }  
 }  

1 comment:

  1. thanks for your post, I am gonna study more deeply your code. Today I need my ExecutorService with 10 threads and ~1000 tasks wait to submit more tasks every time the pool is full.

    ReplyDelete