Concurrent Collections
Table of contents
Concurrency
Not all collections are thread safe and using a non-thread safe collection with multiple threads may yield unexpected results. Consider the following example.
package demo;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.function.IntConsumer;
public class App {
public static void main( final String[] args ) throws Exception {
final int size = 100;
final List<String> list = new ArrayList<>( 3 );
final IntConsumer task = index -> list.add( String.format( "Element %d", index ) );
runUsingMultipleThreads( task, size );
System.out.printf( "Expecting a list of %d elements but found %d%n", size, list.size() );
}
private static void runUsingMultipleThreads( final IntConsumer consumer, final int size ) throws Exception {
final List<Thread> threads = createAndStartThreads( size, consumer );
waitAllThreadsToFinish( threads );
}
private static List<Thread> createAndStartThreads( final int size, final IntConsumer consumer ) {
final CyclicBarrier barrier = new CyclicBarrier( size );
final List<Thread> threads = new ArrayList<>( size );
for ( int i = 1; i <= size; i++ ) {
final int index = i;
final Thread thread = new Thread(
awaitAllThreadsAndRun( barrier, () -> consumer.accept( index ) ),
String.format( "THREAD-%d", i )
);
thread.start();
threads.add( thread );
}
return threads;
}
private static Runnable awaitAllThreadsAndRun( final CyclicBarrier barrier, final Runnable runnable ) {
return () -> {
try {
barrier.await();
runnable.run();
} catch ( Exception e ) {/* Suppress all errors */}
};
}
private static void waitAllThreadsToFinish( final List<Thread> threads ) throws InterruptedException {
for ( final Thread thread : threads ) {
thread.join();
}
}
}
The example shown above populates an ArrayList
using multiple threads without applying any concurrent safe-guards. The outcome may vary between runs.
Expecting a list of 100 elements but found 95
The example may be overwhelming and merits further explanation. Let’s break the example down and describe what each method is doing.
The example starts by creating an
ArrayList
and populates it using 100 threads, each thread adding one element, using therunUsingMultipleThreads()
. It then prints the actual list size.package demo; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CyclicBarrier; import java.util.function.IntConsumer; public class App { public static void main( final String[] args ) throws Exception { final int size = 100; final List<String> list = new ArrayList<>( 3 ); final IntConsumer task = index -> list.add( String.format( "Element %d", index ) ); runUsingMultipleThreads( task, size ); System.out.printf( "Expecting a list of %d elements but found %d%n", size, list.size() ); } private static void runUsingMultipleThreads( final IntConsumer consumer, final int size ) throws Exception { /* ... */ } private static List<Thread> createAndStartThreads( final int size, final IntConsumer consumer ) { /* ... */ } private static Runnable awaitAllThreadsAndRun( final CyclicBarrier barrier, final Runnable runnable ) { /* ... */ } private static void waitAllThreadsToFinish( final List<Thread> threads ) throws InterruptedException { /* ... */ } }
The
runUsingMultipleThreads()
method will run the given task using the given number of threads and waits for all threads to finish.The
runUsingMultipleThreads()
method is a high level method that invokes thecreateAndStartThreads()
method to create the threads and then waits for these threads to finish by invoking thewaitAllThreadsToFinish()
method.private static void runUsingMultipleThreads( final IntConsumer consumer, final int size ) throws Exception { final List<Thread> threads = createAndStartThreads( size, consumer ); waitAllThreadsToFinish( threads ); }
The
createAndStartThreads()
method creates and starts the threads. Each thread will invoke the givenconsumer
at approximatelly the same time, using aCyclicBarrier
to coordinates this.private static List<Thread> createAndStartThreads( final int size, final IntConsumer consumer ) { final CyclicBarrier barrier = new CyclicBarrier( size ); final List<Thread> threads = new ArrayList<>( size ); for ( int i = 1; i <= size; i++ ) { final int index = i; final Thread thread = new Thread( awaitAllThreadsAndRun( barrier, () -> consumer.accept( index ) ), String.format( "THREAD-%d", i ) ); thread.start(); threads.add( thread ); } return threads; }
The given task is a
IntConsumer
that takes an integer. This integer is the thread number. The variablei
is modified within the loop, therefore cannot be passed directly. Instead a constant,index
needed to be created and used instead.The
awaitAllThreadsAndRun()
method wraps the givenRunnable
in another one that waits for all threads to arrive at the same point, using the givenCyclicBarrier
. This forces all threads to arrive at the same point before proceeding and maximises the effect of concurrency on the object under test.⚠ Not recommended!!The following fragment is suppressing all types ofException
s.private static Runnable awaitAllThreadsAndRun( final CyclicBarrier barrier, final Runnable runnable ) { return () -> { try { barrier.await(); runnable.run(); } catch ( Exception e ) {/* Suppress all errors */} }; }
The
waitAllThreadsToFinish()
method waits for all threads to finish, using theThread.join()
method.private static void waitAllThreadsToFinish( final List<Thread> threads ) throws InterruptedException { for ( final Thread thread : threads ) { thread.join(); } }
The above example can be used to modify different collections, such as sets or maps, and observe how these behave when accessed by multiple threads. The following example shows a similar example using a TreeSet
.
package demo;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CyclicBarrier;
public class App {
public static void main( final String[] args ) throws Exception {
final int size = 100;
final Set<String> list = new TreeSet<>();
final IntConsumer task = index -> list.add( String.format( "Element %d", index ) );
runUsingMultipleThreads( task, size );
System.out.printf( "Expecting a set of %d elements but found %d%n", size, list.size() );
}
private static void runUsingMultipleThreads( final IntConsumer consumer, final int size ) throws Exception { /* ... */ }
private static List<Thread> createAndStartThreads( final int size, final IntConsumer consumer ) { /* ... */ }
private static Runnable awaitAllThreadsAndRun( final CyclicBarrier barrier, final Runnable runnable ) { /* ... */ }
private static void waitAllThreadsToFinish( final List<Thread> threads ) throws InterruptedException { /* ... */ }
}
The size of the set will vary between different runs.
Hash based collections, such as HashSet
, may seem immune to concurrency problems, but that’s incorrect. Any non-thread safe object should not be used by multiple thread without any concurrent protection.
Concurrent List
CopyOnWriteArrayList
A thread-safe list, ideal when the number of reads is much higher to the number of writes. Java provides several concurrent lists, such as the CopyOnWriteArrayList
, shown next.
package demo;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CyclicBarrier;
import java.util.function.IntConsumer;
public class App {
public static void main( final String[] args ) throws Exception {
final int size = 100;
final List<String> list = new CopyOnWriteArrayList<>();
final IntConsumer task = index -> list.add( String.format( "Element %d", index ) );
runUsingMultipleThreads( task, size );
System.out.printf( "Expecting a list of %d elements but found %d%n", size, list.size() );
}
private static void runUsingMultipleThreads( final IntConsumer consumer, final int size ) throws Exception { /* ... */ }
private static List<Thread> createAndStartThreads( final int size, final IntConsumer consumer ) { /* ... */ }
private static Runnable awaitAllThreadsAndRun( final CyclicBarrier barrier, final Runnable runnable ) { /* ... */ }
private static void waitAllThreadsToFinish( final List<Thread> threads ) throws InterruptedException { /* ... */ }
}
The above example will always yield the same result. The CopyOnWriteArrayList
is a thread-safe variant of ArrayList
in which all mutative operations (add()
, set()
, and so on) are implemented by creating a fresh copy of the underlying array.
“This is ordinarily too costly but may be more efficient than alternatives when traversal operations vastly outnumber mutations and is useful when you cannot or don’t want to synchronize traversals, yet need to preclude interference among concurrent threads. The “snapshot” style iterator method uses a reference to the state of the array at the point that the iterator was created. This array never changes during the lifetime of the iterator, so interference is impossible and the iterator is guaranteed not to throw ConcurrentModificationException
. The iterator will not reflect additions, removals, or changes to the list since the iterator was created. Element-changing operations on iterators themselves (remove, set, and add) are not supported. These methods throw UnsupportedOperationException
.“
(Reference)
Vector
The Vector
class is an old collection, which all methods are synchronized
. Only one thread can access one method at a given point in time.
package demo;
import java.util.ArrayList;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.CyclicBarrier;
import java.util.function.IntConsumer;
public class App {
public static void main( final String[] args ) throws Exception {
final int size = 100;
final List<String> list = new Vector<>();
final IntConsumer task = index -> list.add( String.format( "Element %d", index ) );
runUsingMultipleThreads( task, size );
System.out.printf( "Expecting a list of %d elements but found %d%n", size, list.size() );
}
private static void runUsingMultipleThreads( final IntConsumer consumer, final int size ) throws Exception { /* ... */ }
private static List<Thread> createAndStartThreads( final int size, final IntConsumer consumer ) { /* ... */ }
private static Runnable awaitAllThreadsAndRun( final CyclicBarrier barrier, final Runnable runnable ) { /* ... */ }
private static void waitAllThreadsToFinish( final List<Thread> threads ) throws InterruptedException { /* ... */ }
}
Being thread-safe, the Vector
will always ends up with 100 elements.
Collections.synchronizedList(List)
There are cases when we cannot control the type of list being used. In such cases we can warp the list with a synchronized list, using the Collections.synchronizedList(List)
method. Using the wrapped list instead, we can safely access the wrapped list from multiple threads as all access is synchronized. This means that only one thread can access any synchronized method at any point in time.
package demo;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.function.IntConsumer;
public class App {
public static void main( final String[] args ) throws Exception {
final int size = 100;
final List<String> list = new ArrayList<>();
final List<String> synchronizedList = Collections.synchronizedList( list );
final IntConsumer task = index -> synchronizedList.add( String.format( "Element %d", index ) );
runUsingMultipleThreads( task, size );
System.out.printf( "Expecting a list of %d elements but found %d%n", size, list.size() );
}
private static void runUsingMultipleThreads( final IntConsumer consumer, final int size ) throws Exception { /* ... */ }
private static List<Thread> createAndStartThreads( final int size, final IntConsumer consumer ) { /* ... */ }
private static Runnable awaitAllThreadsAndRun( final CyclicBarrier barrier, final Runnable runnable ) { /* ... */ }
private static void waitAllThreadsToFinish( final List<Thread> threads ) throws InterruptedException { /* ... */ }
}
In the above example we are using a non-thread safe list, then ArrayList
.
final List<String> list = new ArrayList<>();
Instead of adding new elements to the ArrayList
, we wrapped the non-thread safe list into a synchronized list, using the Collections.synchronizedList()
method.
final List<String> synchronizedList = Collections.synchronizedList( list );
final IntConsumer task = index -> synchronizedList.add( String.format( "Element %d", index ) );
This guarantees that only one thread modifies the list at any given point in time and thus ensures that all 100 elements are added.