Friday, December 2, 2011

A Comparison of CountDownLatch, CyclicBarrier and Semaphore

Recently I have been developing some applications with multithreading using java. And I wanted to use waiting of all Child Threads to finish in order to continue on with the Parent thread. I did some research and found out that from Java 1.5 onwards there have been some classes introduced within java.util.concurrent package where the scenario I am trying to create can be. the classes are;

  • CountDownLatch
  • CyclicBarrier
  • Semaphore
CountDownLatch and CyclicBarrier on the first look seems to do the same task and I read some blogs where they have tried to explain the difference of the two. Most common differences many were saying were;
  1. CountDownLatch can not be reused after meeting the final count.
  2. CountDownLatch can not be used to wait for Parallel Threads to finish.
  3. CyclicBarrier can be reset thus reused
  4. CyclicBarrier can be used to wait for Parallel Threads to finish.
From these explanation the picture I got was this;

If a Main thread creates 5 different thread. CountDownLatch can be used by the Main Thread to wait on the Child Threads. Where as CyclicBarrier can be used to enable waiting on Threads until each other finish.
Thus CountDownLatch is a top down waiting where as CyclicBarrier is across waiting.

But this wasn't convincing enough for me because no blog clearly explained the practical usage of the two classes. Furthermore there weren't not much talk about the powerful Semaphore class in those either. So I did some testing on that too and this is what I found out.

I try to explain the Theoretical as well as Practical use of the 3 classes. Ok here goes.

CountDownLatch can be used to monitor the completion of the Children Threads if the size of the created children is known forehand. CountDownLatch enables a Thread or Threads to wait for completion of Children Threads. But there is no waiting amongst the Children until they finish each others tasks. Children may execute asynchronously and after their work is done will exit making a countdown.

Practical Example : Main thread creates 10 Database Connections and Creates 10 different Threads and assigns those DB connection to the threads one each. But the Main thread must wait until all 10 Threads finish their DB Operation before closing the DB Connections. Children will exit after performing the DB Operation. A CountDownLatch can be used in this scenario.




import java.util.concurrent.*;
import java.util.*;
import java.text.*;
/**
 * @author Shazin Sadakath
 *
 */
public class CountDownLatchTest {
    private static final int MAX_THREADS = 5;

    public static void main(String[] args) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(MAX_THREADS);
    
        System.out.println("Spawning Threads");
        for(int i=0;i<MAX_THREADS;i++) {
            Thread t = new Thread(new WorkerThread(countDownLatch, String.format("Thread-%d", i)));
            t.start();
        }
        System.out.println("Spawning Finished");
        System.out.println("Waiting All Threads to Finish");
        countDownLatch.await(); // Await is void
        System.out.println("All Threads are Finished");
    }
    
    private static class WorkerThread implements Runnable {
        private CountDownLatch countDownLatch;
        
        private String name;
        
        public WorkerThread(CountDownLatch countDownLatch, String name) {
            this.name = name;
            this.countDownLatch = countDownLatch;
        }
        
        public void run() {
            try {
                SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss");                
                System.out.printf("%s : Doing Some Work on %s\n", getFormattedDate(sdf), name);
                Thread.sleep(getRandomWaitTime());
                System.out.printf("%s : Doing Some more work on %s\n", getFormattedDate(sdf), name);
                Thread.sleep(getRandomWaitTime());
                System.out.printf("%s : Finished work on %s\n", getFormattedDate(sdf), name);
                countDownLatch.countDown(); 
                System.out.printf("%s : Count Down Latch count on %s is %d\n", getFormattedDate(sdf), name, countDownLatch.getCount());
            } catch(Exception e) {
                e.printStackTrace();
            }
        }
        
        private String getFormattedDate(SimpleDateFormat sdf) {
            return sdf.format(new Date());
        }
        
        private int getRandomWaitTime() {
            return (int) ((Math.random() + 1) * 1000);
        }
        
    }
}

CyclicBarrier can be used to create a set of Children Threads if the size of the Threads created is known forehand. CyclicBarrier can be used to implement waiting amongst Children Threads until all of them finish. This is useful where parallel threads needs to perform a job which requires sequential execution. For example 10 Threads doing steps 1, 2, 3, but all 10 Threads should finish step one before any can do step 2. Cyclic barrier can be reset after all Threads are finished execution. This is a distinguishing feature from a CountDownLatch. A CountDownLatch can only be used for a single count down. Additionally a CyclicBarrier can be assigned an Additional Thread which executes each time all the Children Threads finish their respective tasks.

Practical Example : Processing of a Image Pixels Matrix row by row in the first step and in the second step saving the Pixel values to file row by row. In this scenario if there are 10 Threads running simultaneously to process the matrix row by row then all 10 should wait until all are finished before they move on to the next step which is saving those rows to file.


import java.util.concurrent.*;
import java.util.*;
import java.text.*;
/**
 * @author Shazin Sadakath
 *
 */
public class CyclicBarrierTest {
    private static final int MAX_THREADS = 5;

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(MAX_THREADS, new Runnable() {
            private int count = 1;
        
            public void run() {
                System.out.printf("Cyclic Barrier Finished %d\n", count++);
            }
        });
    
        System.out.println("Spawning Threads");
        for(int i=0;i<MAX_THREADS;i++) {
            Thread t = new Thread(new WorkerThread(cyclicBarrier, String.format("Thread-%d", i)));
            t.start();
        }
        System.out.println("Spawning Finished");
    }
    
    private static class WorkerThread implements Runnable {
        private CyclicBarrier cyclicBarrier;
        
        private String name;
        
        public WorkerThread(CyclicBarrier cyclicBarrier, String name) {
            this.name = name;
            this.cyclicBarrier = cyclicBarrier;
        }
        
        public void run() {
            try {
                SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss");                
                System.out.printf("%s : Doing Step 1 Work on %s\n", getFormattedDate(sdf), name);
                Thread.sleep(getRandomWaitTime());
                System.out.printf("%s : Doing Step 1 more work on %s\n", getFormattedDate(sdf), name);
                Thread.sleep(getRandomWaitTime());
                System.out.printf("%s : Finished Step 1 work on %s\n", getFormattedDate(sdf), name);
                int count = cyclicBarrier.await(); // Await returns an int which is the arrival index 1 means first 0 means last
                System.out.printf("%s : Cyclic Barrier count on %s is %d\n", getFormattedDate(sdf), name, count);
                if(count == 0) {
                    cyclicBarrier.reset();
                }
                System.out.printf("%s : Doing Step 2 Batch of Work on %s\n", getFormattedDate(sdf), name);
                Thread.sleep(getRandomWaitTime());
                System.out.printf("%s : Doing Some more Step 2 Batch of work on %s\n", getFormattedDate(sdf), name);
                Thread.sleep(getRandomWaitTime());
                System.out.printf("%s : Finished Step 2 Batch of work on %s\n", getFormattedDate(sdf), name);
                count = cyclicBarrier.await();
                System.out.printf("%s : Cyclic Barrier count end of Step 2 Batch of work on %s is %d\n", getFormattedDate(sdf), name, count);
            } catch(Exception e) {
                e.printStackTrace();
            }
        }
        
        private String getFormattedDate(SimpleDateFormat sdf) {
            return sdf.format(new Date());
        }
        
        private int getRandomWaitTime() {
            return (int) ((Math.random() + 1) * 1000);
        }
        
    }
}



Semaphore can be used to create a set of Children Threads even when the size of the Threads to be created is not known fore hand. This is because a Semaphore can wait until a number of releases have been made but that number is not required to initialize the Semaphore. Semaphores can be used in other scenarios such as Synchronizing between different threads such as Publisher, Subscriber scenario.

Practical Example : Traversing through a folder with sub folders within sub folders and if  JPEG files are found, move them to a destination directory and then zip them. In this scenario the folder traversing is done recursively until a JPEG file is found. And then a Thread is invoked to move it to destination directory. But zipping needs to wait until all JPEG files are moved to the destination directory. In this scenario no of JPEG files available in the folder structure is not known but the zipping needs to wait till all files are successfully moved. Ideal scenario for a Semaphore based waiting.

import java.util.concurrent.*;
import java.util.*;
import java.text.*;
/**
 * @author Shazin Sadakath
 *
 */
public class SemaphoreTest {
    private static final int MAX_THREADS = 5;

    public static void main(String[] args) throws Exception {
        Semaphore semaphore = new Semaphore(0);

        System.out.println("Spawning Threads");
        int threadCount = 0;
        Random random = new Random();
        for(int i=0;i<MAX_THREADS;i++) {
            // Threads created will not always be MAX_THREADS
            // Because Threads are created only if Random no is Even.
            // Thus the No of Threads unknown at Semaphore Initialization
            if(random.nextInt(9999) % 2 == 0) {
                Thread t = new Thread(new WorkerThread(semaphore, String.format("Thread-%d", i)));
                t.start();
                threadCount++;
            }
        }
        System.out.println("Spawning Finished");
        System.out.println("Waiting All Threads to Finish");
        semaphore.acquire(threadCount); 
        System.out.println("All Threads are Finished");
    }
    
    private static class WorkerThread implements Runnable {
        private Semaphore semaphore;
        
        private String name;
        
        public WorkerThread(Semaphore semaphore, String name) {
            this.name = name;
            this.semaphore = semaphore;
        }
        
        public void run() {
            try {                
                SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss");                
                System.out.printf("%s : Doing Some Work on %s\n", getFormattedDate(sdf), name);
                Thread.sleep(getRandomWaitTime());
                System.out.printf("%s : Doing Some more work on %s\n", getFormattedDate(sdf), name);
                Thread.sleep(getRandomWaitTime());
                System.out.printf("%s : Finished work on %s\n", getFormattedDate(sdf), name);
                semaphore.release();                
            } catch(Exception e) {
                e.printStackTrace();
            }
        }
        
        private String getFormattedDate(SimpleDateFormat sdf) {
            return sdf.format(new Date());
        }
        
        private int getRandomWaitTime() {
            return (int) ((Math.random() + 1) * 1000);
        }
        
    }
}
In conclusion, Each task has its separate unique use and it is Software Engineer's responsibility to understand which one is more suitable for the scenario they have to solve. In my case however CountDownLatch was the ideal one and I used it with pretty much success. It is advised to use these classes instead of trying to implement similar behavior on our own because they are developed and tested by Sun Microsystems expert Engineers.

6 comments:

shongololo said...

This statement is false :



"CountDownLatch can not be used to wait for Parallel Threads to finish."



see this link for a good explanation.

John said...

I understand cyclicBarrier.reset() differently:

The purpose of calling cyclicBarrier.reset() is to allow the cyclicBarrier to continue to be used after an Exception has been caught during the running of a thread and to make other threads waiting on the cyclicBarrier.await() method to throw a BrokenBarrierException.

These other threads may have completed their processing before the cyclicBarrier.await() method without errors however calling cyclicBarrier.reset() in another thread could cause the cyclicBarrier.await() method to throw a BrokenBarrierException. The reason for this is that if a certain thread has a processing error you need a way of telling other threads that an error has occurred (maybe their processing needs to be rolled back).

cyclicBarrier.reset() is therefore normally called inside the catch block of a try - catch statement and not in the try block as was done by the author.

Note that the act of calling cyclicBarrier.reset() will itself cause other threads to throw a BrokenBarrierException if they are themselves blocking on cyclicBarrier.await().

Therefore even the very act of calling the cyclicBarrier.reset() method in the above example could cause the example to fail. The chance of failure could be increased by increasing the number of threads running.

Please correct me if I am wrong - my understanding comes from reading the jdk 1.7 docs.

Shazin Sadakath said...

Hi John,

According to the JDK 1.5 Documentation the definition of CyclicBarrier is as below.

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.

This clearly states that CyclicBarrier can be reused after all the threads are released. The resetting of course done by countdownlatch.reset().

Thanks for your enlighting thoughts. Yes you are right when it comes to the countdownlatch.reset() throwing a BrokenBarrierException. In an exceptional scenario this can be used to find out any failure on an executing thread occurred. But if you see the context of this example. The reset only takes place after countdownlatch.await() is 0 which means all executing threads are finished.

Plus this statement is not correct.

cyclicBarrier.reset() is therefore normally called inside the catch block of a try - catch statement and not in the try block as was done by the author.

I infact have called both await() and reset() from within a try-catch block. The aim of this example was to I wanted to use waiting of all Child Threads to finish in order to continue on with the Parent thread thus I have composed this example with regards to that. For instance the obvious use of Semaphore is to act as a Mutex within threads but I have explained how it can also be used in this problem scenario.

I have run all three examples before posting and they were running according to my requirement.

John said...

Hi,

In response to the comment above which says:

"This clearly states that CyclicBarrier can be reused after all the threads are released. The resetting of course done by countdownlatch.reset()".

The way I understand it is that CyclicBarrier can be reused, however you do not need to call "reset()" to reuse it. Infact calling reset() can have an undesired unless you cater for catching BrokenBarrierExceptions and processing them appropriately.

I have an example which shows reusing the CyclicBarrier without calling reset(). Calling reset() is not necessary to reuse a CyclicBarrier:

https://sites.google.com/site/javaduka/code-examples/concurrency/lock/cyclicbarrier-1

As mentioned above I understand the usage of reset() to be a mechanism to throw BarrierBrokenException in other threads in the await state so that logic can be implemented in the catch block to roll back state of those threads. It is a mechanism to ensure that everything passes or everything is rolled back. reset() is called as a mechanism to roll everything back.

Shazin Sadakath said...

Hi John,

I read your post and it is quite informative. But you haven't made use of either a reset() call or a Thread at the end of each cycle.

If you read the documentation for reset() it clears says that Resets the barrier to its initial state.
For my scenario explained in this example I make use of a Thread which will be called at the end of each iteration. To invoke that the cyclic barrier needs to be called back to its initial state. This is where reset() is required. Please run my code and see the output. I ran yours but I saw a small bug where you are trying to Throw a Throwable from a method which is declared to throw an Exception. Apparently this won't work since Throwable is the super class.

shadab anwar said...

John

I have little doubt

When you say catch block should have say reset().

Which will Throw the BBE by waiting thread and do clean up

'But you tell me if 5 thread waiting on barrier and suppose 5th one got interrupted then control goes to catch block intrupted one and other 4 thread which are waiting on barriar will automatically throw the BBE.' do u agree?

If yes then what is point of reset() in-order to throwing the BBE.