The testSomeWorkers() method creates a few tasks (instances of Callable), invokes them using the executor service and retrieves their results using the corresponding Futures. The helper method addSomeTasks() simply calls the submit() method on the executor service to submit the n tasks and retrieve the corresponding future. The future will return the value returned by the Callable once the execution is over.
The testJobCanceling() method works pretty much like testSomeWorkers() but it shows how the tasks can be cancelled by using the Future.cancel() method. The cancel() method's only argument specifies whether or not the thread in which the job is running should be interrupted. In the Callable.call() function, invoking Thread.sleep() can potentially throw an InterruptedException. This will happen when we call cancel() on the future associated with this callable and thus the exception is handled by returning immediately. It's a common idiom in java to handle interruptions in this fashion to achieve some control over concurrent workers.
Enjoy the code:
import java.util.*;
import java.util.concurrent.*;
class ExecutorServiceExample {
public static void main(String[] args) {
Scanner in = new Scanner(System.in);
final int nThreads = in.nextInt();
final int n = in.nextInt();
System.out.println("Using " + nThreads + " threads");
ExecutorService service = Executors.newFixedThreadPool(nThreads);
try {
testSomeWorkers(service, n);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
try {
testJobCanceling(service, n);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// necessary or the thread pool will keep the JVM up and running!
service.shutdown();
}
public static void testSomeWorkers(ExecutorService service, int n) throws InterruptedException, ExecutionException {
// create and invoke some "tasks" on this executor service
Collection<Future<Integer>> taskFutures = addSomeTasks(service, n);
System.out.println("Waiting for all tasks to complete...");
List<Integer> ret = new ArrayList<>();
// retrieve the result of the tasks' computation
for (Future<Integer> f : taskFutures) ret.add(f.get());
System.out.println("Values returned from computations: " + ret);
System.out.println("All done.");
}
public static void testJobCanceling(ExecutorService service, int n) throws InterruptedException, ExecutionException {
Collection<Future<Integer>> taskFutures = addSomeTasks(service, n);
Thread.sleep(1000);
System.out.println("Actually nevermind!");
List<Integer> completed = new ArrayList<>();
List<Future<Integer>> cancelled = new ArrayList<>();
// try to cancel the tasks that are running
for (Future<Integer> f : taskFutures) {
// if successfully cancel add to the cancelled list
if (f.cancel(true)) cancelled.add(f);
// otherwise get the result
else completed.add(f.get());
}
System.out.println("" + cancelled.size() + " tasks were successfully cancelled");
if (!completed.isEmpty()) System.out.println("Values returned from computations: " + completed);
System.out.println("All done.");
}
private static Collection<Future<Integer>> addSomeTasks(ExecutorService service, int howMany) {
System.out.println("Enqueuing " + howMany + " tasks...");
List<Future<Integer>> ret = new ArrayList<>();
for (int i = 0; i < howMany; i++) {
final int n = i;
ret.add(service.submit(new Callable<Integer>() {
@Override
public Integer call() {
try {
try {
System.out.println("Task " + n + ": Doing some very important work...");
Thread.sleep(200 + rnd.nextInt(200));
} catch (InterruptedException e) {
System.out.println("Task " + n + " interrupted while doing very important work");
return null;
}
try {
System.out.println("Task " + n + ": Doing more important work...");
Thread.sleep(200 + rnd.nextInt(200));
} catch (InterruptedException e) {
System.out.println("Task " + n + " interrupted while doing important work");
return null;
}
try {
System.out.println("Task " + n + ": Doing slightly less important work...");
Thread.sleep(200 + rnd.nextInt(200));
} catch (InterruptedException e) {
System.out.println("Task " + n + " interrupted while doing slightly less important work");
return null;
}
return rnd.nextInt();
} finally {
System.out.println("Cleaning up after task " + n);
}
}
}));
}
return ret;
}
private final static Random rnd = new Random();
}
No comments:
Post a Comment