Testing concurrent objects can be challenging. One particular pattern that is useful for objects used in producer-consumer designs is to ensure that everything put in to a shared concurrent queue by a producer is correctly executed by consumers.
Consider the example of a registry used to hold references to objects that need to be notified of activity via a callback mechanism. The interface could look something like this example:
public interface Registry {
/**
* Register a callback with an identifier.
*/
void register(Long identifier, AsyncCallback callback) throws Exception;
/**
* Unregister a callback.
*/
void unregister(Long identifier);
}
Now suppose you have developed an implementation of this interface that you wish to verify as thread safe. How can you test it? With this example, you can set up your test to register and unregister a sequence of callbacks and check to ensure that the registrations are equal to the unregistrations.
Such a test may look something like the following example:
/**
* Run a producer with multiple consumers.
* All data put in to the registry must correctly be consumed to pass the test.
*/
@Test
public void testRegistryIsThreadsafe() throws TException {
final ExecutorService pool = Executors.newCachedThreadPool();
// At test completion, values registered, unregistered, and executed must match
final AtomicLong registerSum = new AtomicLong(0);
final AtomicLong unregisterSum = new AtomicLong(0);
final int nProducers = 100; // Number of concurrent producers
final int nConsumers = 100; // Number of concurrent consumers
final int nRegistrations = 1000; // Number of registrations to make to the registry per producer
final CyclicBarrier barrier = new CyclicBarrier(nConsumers + nProducers + 1); // + 1 for main thread;
final BlockingQueue<Long> opIds = new ArrayBlockingQueue<>(nProducers * nRegistrations); // Store all operations registered
final Registry registry = new ThreadSafeRegistry(); // Implementation to test
/**
* The producer adds registrations.
*/
class Producer implements Runnable {
@Override
public void run() {
try {
barrier.await();
for (int i = nRegistrations; i > 0; i--) {
Long id = ThreadLocalRandom.current().nextLong();
registry.register(id, MyCallback);
registerSum.getAndAdd(context.getOpId());
opIds.add(context.getOpId());
}
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
/**
* The consumer removes registrations.
*/
class Consumer implements Runnable {
@Override
public void run() {
try {
barrier.await();
Long opId = opIds.take();
registry.unregister(opId);
unregisterSum.getAndAdd(opId);
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
try {
IntStream.range(0, nProducers).forEach(i -> pool.execute(new Producer()));
IntStream.range(0, nConsumers).forEach(i -> pool.execute(new Consumer()));
barrier.await(); // wait for all threads to be ready
barrier.await(); // wait for all threads to finish
// verify that the registrations and unregistrations are equal
assertEquals(registerSum.get(), unregisterSum.get());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
This pattern takes advantage of a CyclicBarrier
to coordinate the
starting and stopping of producers and consumers. As each producer and
consumer starts up, it “counts down” the barrier. Once all are ready, the
barrier is released and the main test thread can continue (only to stop
again waiting for all threads to finish). Finally, once all threads have
completed work, we can verify that the amount of work produced and
consumed is equal.