Testing a Producer-Consumer Design using a CyclicBarrier

Testing concurrent objects can be challenging. One particular pattern that is useful for objects used in producer-consumer designs is to ensure that everything put in to a shared concurrent queue by a producer is correctly executed by consumers.

Consider the example of a registry used to hold references to objects that need to be notified of activity via a callback mechanism. The interface could look something like this example:

public interface Registry {

    /**
     * Register a callback with an identifier.
     */
    void register(Long identifier, AsyncCallback callback) throws Exception;

    /**
     * Unregister a callback.
     */
    void unregister(Long identifier);
}

Now suppose you have developed an implementation of this interface that you wish to verify as thread safe. How can you test it? With this example, you can set up your test to register and unregister a sequence of callbacks and check to ensure that the registrations are equal to the unregistrations.

Such a test may look something like the following example:

/**
 * Run a producer with multiple consumers.
 * All data put in to the registry must correctly be consumed to pass the test.
 */
@Test
public void testRegistryIsThreadsafe() throws TException {
    final ExecutorService pool = Executors.newCachedThreadPool();

    // At test completion, values registered, unregistered, and executed must match
    final AtomicLong registerSum = new AtomicLong(0);
    final AtomicLong unregisterSum = new AtomicLong(0);

    final int nProducers = 100; // Number of concurrent producers
    final int nConsumers = 100; // Number of concurrent consumers
    final int nRegistrations = 1000; // Number of registrations to make to the registry per producer
    final CyclicBarrier barrier = new CyclicBarrier(nConsumers + nProducers + 1); // + 1 for main thread;
    final BlockingQueue<Long> opIds = new ArrayBlockingQueue<>(nProducers * nRegistrations); // Store all operations registered

    final Registry registry = new ThreadSafeRegistry(); // Implementation to test

    /**
     * The producer adds registrations.
     */
    class Producer implements Runnable {
        @Override
        public void run() {
            try {
                barrier.await();

                for (int i = nRegistrations; i > 0; i--) {
                    Long id = ThreadLocalRandom.current().nextLong();
                    registry.register(id, MyCallback);

                    registerSum.getAndAdd(context.getOpId());
                    opIds.add(context.getOpId());
                }

                barrier.await();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    /**
     * The consumer removes registrations.
     */
    class Consumer implements Runnable {

        @Override
        public void run() {
            try {
                barrier.await();

                Long opId = opIds.take();

                registry.unregister(opId);
                unregisterSum.getAndAdd(opId);

                barrier.await();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }

        }

    }

    try {
        IntStream.range(0, nProducers).forEach(i -> pool.execute(new Producer()));
        IntStream.range(0, nConsumers).forEach(i -> pool.execute(new Consumer()));

        barrier.await(); // wait for all threads to be ready
        barrier.await(); // wait for all threads to finish

        // verify that the registrations and unregistrations are equal
        assertEquals(registerSum.get(), unregisterSum.get());
    } catch (Exception e) {
        throw new RuntimeException(e);
    }

}

This pattern takes advantage of a CyclicBarrier to coordinate the starting and stopping of producers and consumers. As each producer and consumer starts up, it “counts down” the barrier. Once all are ready, the barrier is released and the main test thread can continue (only to stop again waiting for all threads to finish). Finally, once all threads have completed work, we can verify that the amount of work produced and consumed is equal.

See also

comments powered by Disqus