4/14/09&1&Java Concurrency Utilities Based on JavaOne talk given by David Holmes & Brian Goetz Overview • Rationale and goals for JSR 166 – Java community process – concurrency utilities • Executors – thread pools and scheduling • Futures • Concurrent Collections • Locks, conditions and synchronizers • Atomic variables4/14/09&2&Why Concurrency Utilities • Java’s built-in concurrency primitives -- wait(), notify(), and synchronized – are: • Hard to use correctly • Easy to use incorrectly • Too low level for many applications • Can lead to poor performance if used incorrectly • Leave out lots of useful concurrency constructs Goals • Provide efficient, correct & reusable concurrency building blocks • Enhance scalability, performance, readability, maintainability, and thread-safety of concurrent Java applications4/14/09&3&Background • Utilities are in the java.util.concurrent package – based on Doug Lea’s EDU.oswego.cs.dl.util.concurrent package • APIs take advantage of native JVM constructs & Java Memory Model guarantees specified in JSR 133 Building Blocks • Executors, Thread Pools, and Futures • Concurrent collections: – BlockingQueue, ConcurrentHashMap, CopyOnWriteArray • Locks and Conditions • Synchronizers: Semaphores, Barriers, etc. • Atomic Variables – Low-level compare-and-set operation4/14/09&4&Executor • Standardizes asynchronous invocation • Separates job submission from execution policy – anExecutor.execute(aRunnable) – not new Thread(aRunnable).start() • Two code styles supported: – Actions: Runnables – Functions: Callables – Also has lifecycle mgmt: cancellation, shutdown, etc. • Executor usually created via Executors factory class – Configures ThreadPoolExecutor – Customizes shutdown methods, before/after hooks, saturation policies, queuing Executor & ExecutorService • ExecutorService adds lifecycle management to Executor public interface Executor { void execute(Runnable command); } public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination( long timeout, TimeUnit unit); // other convenience methods for submitting tasks }4/14/09&5&Creating Executors • Executors factory methods public class Executors { static ExecutorService newSingleThreadedExecutor(); static ExecutorService newFixedThreadPool(int n); static ExecutorService newCachedThreadPool(int n); static ScheduledExecutorService newScheduledThreadPool(int n); // additional versions & utility methods } (Not) Executor Example • Thread per message Web Server (no limit on thread creation) class WebServer { public static void main( String [] args) { ServerSocket socket = new ServerSocket ( 80 ); while ( true ) { final Socket connection = socket.accept(); Runnable r = new Runnable () { public void run () {handleRequest(connection);} }; new Thread (r).start(); } } }4/14/09&6&Executor Example • Thread pool web server - better resource management class WebServer { Executor pool = Executors.newFixedThreadPool(7); public static void main(String[] args) { ServerSocket socket = new ServerSocket(80); while (true) { final Socket connection = socket.accept(); Runnable r = new Runnable() { public void run() {handleRequest(connection);} }; pool.execute(r); } } } Future and Callable • Callable is functional analog of Runnable interface Callable<V> { V call() throws Exception; } • Future represents asynchronous tasks • Future holds result of asynch call, norm to a Callable interface Future<V> { V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit); boolean cancel(boolean mayInterrupt); boolean isCancelled(); boolean isDone(); }4/14/09&7&Future Example • See: FutureTaskStringReverser.java Another Future Example • Implementing a cache with Future public class Cache<K, V> { Map<K, Future<V>> map = new ConcurrentHashMap(); Executor executor = Executors.newFixedThreadPool(8); public V get (final K key) { Future<V> f = map.get(key); // null if key not found if (f == null) { Callable<V> c = new Callable<V>() { public V call() {// compute value associated with key}}; f = new FutureTask<V>(c); Future old = map.putIfAbsent(key, f); // return null if key not found. put(key,f) if (old == null) // otherwise return get(key) executor.execute(f); else f = old; } return f.get(); } }4/14/09&8&ScheduledExecutorService • For deferred and recurring tasks, can schedule – Callable or Runnable to run once with a fixed delay after submission – Schedule a Runnable to run periodically at a fixed rate – Schedule a Runnable to run periodically with a fixed delay between executions • Submission returns a ScheduledFutureTask handle which can be used to cancel the task • Like Timer, but supports pooling and is more robust Concurrent Collections • Pre-1.5 Java class libraries had few concurrent (vs Synchronized) classes – Synchronized collections: Hashtable, Vector, and Collections.synchronizedXXX • Often required locking during iteration • Locking becomes is a source of contention • Java 1.5 concurrent collections: – Allow multiple operations to overlap • Some differences in semantics4/14/09&9&Queues • Queue interface added to java.util interface Queue<E> extends Collection<E> { boolean offer(E x); // try to insert. E poll(); // return null if empty E remove() throws NoSuchElementException; E peek(); // return null if empty E element() throws NoSuchElementException; } • Thread-safe and non-thread safe implementations – Non-thread-safe - LinkedList – Non-thread-safe - PriorityQueue – Thread-safe non-blocking - ConcurrentLinkedQueue Blocking Queues • Extends Queue to provide blocking operations – Retrieval: wait for queue to become nonempty – Insertion: wait for capacity to be available • Common in producer-consumer designs • Can support multiple producers and consumers •
View Full Document