Translate

Wednesday, December 25, 2013

Thread safe classes - CachedThreadPool, ConcurrentHashMap , Atomic classes, Synchronized/Notification queues

Q1. Lock free containers/Synchronized collections
Ans.
1.) CopyOnWriteArrayList: In CopyOnWriteArrayList , a write will cause a copy of the entire underlying array to be created. The original array is left in place so that reads can safely occur while the copied array is being modified. When the modification is complete, an atomic operation swaps the new array in so that new reads will see the new information. One of the benefits of CopyOnWriteArrayList is that it does not throw ConcurrentModificationException when multiple iterators are traversing and modifying the list, so you don’t have to write special code to protect against such exceptions.

2.) CopyOnWriteArraySet: This uses CopyOnWriteArrayList to achieve its lock-free behavior.

3.) ConcurrentHashMap : Implementation of interface ConcurrentMap which defines useful atomic operations. These operations remove or replace a key-value pair only if the key is present, or add a key-value pair only if the key is absent. This 
 doesn’t throw ConcurrentModificationException.

4.) 
ConcurrentSkipListMapIt is an implementation of ConcurrentNavigableMap, which is subinterface of ConcurrentMap. It is concurrent analog of TreeMap.  ConcurrentNavigableMap that supports approximate matches. 

5.) ConcurrentLinkedQueue: ConcurrentHashMap and ConcurrentLinkedQueue use similar techniques to allow concurrent reads and writes, but only portions of the container are copied and modified rather than the entire container. However, readers will still not see any modifications before they are complete.

As long as you are primarily reading from a lock-free container, it will be much faster than its synchronized counterpart because the overhead of acquiring and releasing locks is eliminated. This is still true for a small number of writes to a lock-free container, but it would be interesting to get an idea of what "small" means.

NOTE: There is no ConcurrentHashSet similar to ConcurrentHashMap, though copyOnWriteArraySet exists similar to copyOnWriteArrayList.

Q2. How does a ConcurrentHashMap work.
Ans. This class obeys the same functional specification as Hashtable, and includes versions of methods corresponding to each method of Hashtable. Hashtable’s offer concurrent access to their entries by locking entire map to perform any sort of operation.

However, ConcurrentHashMap maintains a list of 16 locks by default, each of which is used to guard (or lock on) a single bucket of the map. Thus effectively 16 threads can modify the collection at a single time (as long as they’re all working on different buckets). Infact there is no operation performed by this collection that locks the entire map. The concurrency level of the collection can be increased. However a higher number means more overhead of maintaining this list of locks.

Ideally, you should choose a value to accommodate as many threads as will ever concurrently modify the table. Using a significantly higher value than you need can waste space and time, and a significantly lower value can lead to thread contention. But overestimates and underestimates within an order of magnitude do not usually have much noticeable impact. A value of one is appropriate when it is known that only one thread will modify and all others will only read. Also, re-sizing this or any other kind of hash table is a relatively slow operation, so, when possible, it is a good idea to provide estimates of expected table sizes in constructors.

Retrieval Operation: Retrieval operations don’t block unless the entry is not found in the bucket or if the value of the entry is null. In such a case the map synchronizes on the bucket and then tries to look for the entry again just in case the entry was put or removed right after the get in synchronized mode. Retrievals reflect the results of the most recently completed update operations holding upon their onset. For aggregate operations such as putAll and clear, concurrent retrievals may reflect insertion or removal of only some entries.

Removal Operation: Removal operations do require a bit of overhead. All removal operations require the chain of elements before and after to be cloned and joined without the removed element. Since the value of the map key is volatile (not really, the value of the inner Entry class is volatile), if a thread already traversing the bucket from which a value is removed reaches the removed element, it automatically sees a null value and knows to ignore such a value.

Traversal Operation: Traversal doesn’t synchronize on the entire map either. In fact, traversal does not synchronize at all except under one condition. The internal LinkedList implementation is aware of the changes to the underlying collection. If it detects any such changes during traversal, it synchronizes itself on the bucket it is traversing and then tries to re-read the values. This always ensures that while the values received are always fresh, there is minimalist locking, if any.

Iteration over a ConcurrentHashMap are a little different from those offered by other collections. The iterators are *NOT* fail-fast in the sense that they don’t throw a ConcurrentModificationException. They also do not guarantee that once the iterator is created it will list/show all elements that are added after its creation. The iterators do however guarantee that any updates or removal of items will be reflected correctly in their behavior. They also guarantee that no element will be returned more than once while traversal. However, iterators are designed to be used by only one thread at a time.

NOTE: Like Hashtable but unlike HashMap, this class does not allow null to be used as a key or value.

Q3. Compare Hashtable, Collections.synchronizedMap and ConcurrentHashMap 
Ans.
HashTable and Collections.synchronizedMap: Lock the entire Map. In former, you start paying for synchronization at the beginning itself, whereas in latter, you pay only when synchronization is required.

ConcurrentHashMap: Never locks the entire Map, but just acquires a lock on bucket.


Q4. How to use collections in a thread safe manner.
Ans.
1. By using thread-safe collections: This can be used for methods that needed to access the data in the collection and method contains only a single operation.
Ex: Using Vector in place of ArrayList, HashTable in place of HashMap.

2. Synchronization: Using Thread-unsafe classes and synchronizing access to them via synchronized methods and blocks. This can be used for methods containing more than single operation.
Ex: Retrieving a value from a Hashtable and replacing an element in a Hashtable are thread-safe operations, but when both operations are contained in a method; overall operation is not thread-safe.

Thus, using a thread-safe collection does not guarantee the correctness of your program.

3. Use synchronized version of the thread-unsafe collection class: Most thread-unsafe collection classes have a synchronized counterpart that is thread-safe. The thread-safe collections are constructed by calling one of these static methods of the Collections class:

Set s = Collections.synchronizedSet(new HashSet(...));

Q5. Are collection classes thread-aware.
Ans. Many collection classes are what we would term "thread-aware." They have many internal and subtle features that were designed specifically for threads:
1. Some collections have an implementation that minimizes the need for synchronization by segmenting the collection. It is possible for threads to modify the collection simultaneously, without any synchronization, when they are operating on different segments.
For example, ConcurrentHashMap has ability to add a key only if the key is not in the map; this simple enhancement removes the need for explicit synchronization for parallel writes of new elements.

2. Some provide special services—such as iterator handling—that are specifically designed for multi-threaded environments. The main reason for copy-on-write iterators is to balance the performance issues of many simultaneous threads iterating through the collection against a few updates to the collection.

3. Interfaces have been enhanced to handle issues related to threads better.


Q6. What do you mean by atomicity and word-tearing
Ans. Atomicity applies to "simple operations" on primitive types except for longs and doubles.  However, you do get atomicity for simple assignments and returns if you use the volatile keyword when defining a long or double variable (note that volatile was not working properly before Java SE5).

Reading and writing primitive variables other than long and double is guaranteed to go to and from memory as indivisible (atomic) operations. However, the JVM is allowed to perform reads and writes of 64 - bit quantities (long and double variables) as two separate 32-bit operations, raising the possibility that a context switch could happen in the middle of a read or write, and then different tasks could see incorrect result. This is called word tearing, because you might see the value after only part of it has been changed).

Q7. How are atomic operations helpful in synchronization?
Ans. Atomic operations are thus not interruptible by the threading mechanism. Expert programmers can take advantage of this to write lock-free code, which does not need to be synchronized. But even this is an oversimplification. Sometimes, even when it seems like an atomic operation should be safe, it may not be.

Trying to remove synchronization is usually a sign of premature optimization, and will cause you a lot of trouble, probably without gaining much, or anything.


Q8. Are atomic operations always visible across the application.
Ans. On multiprocessor systems, visibility rather than atomicity is much more of an issue than on single-processor systems. Changes made by one task, even if they’re atomic in the sense of not being interruptible, might not be visible to other tasks (the changes might be temporarily stored in a local processor cache), so different tasks will have a different view of the application’s state. The synchronization mechanism, on the other hand, forces changes by one task on a multiprocessor system to be visible across the application. Without synchronization, it’s indeterminate when changes become visible.

Q9. Atomic Classes.
Ans. Java SE5 introduces special atomic variable classes such as
1. Atomiclnteger 
2. AtomicLong 
3. AtomicReference 

Ex:
// Declaration
private AtomicInteger i = new AtomicInteger(0); 

// Getting value 
public int getValue()
{
      return i.get();
}

// Adding 2
private void evenIncrement()
{
      i.addAndGet(2);
}

Atomic classes were designed to build the classes in java.util.concurrent, and you should use them in your own code only under special circumstances, and even then only when you can ensure that there are no other possible problems. It’s generally safer to rely on locks (either the synchronized keyword or explicit Lock objects).

Q10. What are the various methods available with Atomic classes.
Ans.
1. get()
2. getAndSet(): Set to new value and return old value
3. compareAndSet(): Accepts two arguments. Compares the object to first value and if equal, it sets it to new value and otherwise, it returns false without changing the value.
4. weakCompareAndSet(): Similar to #3, except that return value of false could be due to existing value not equal to expected value or could be due to failure to update value.
5. incrementAndGet()/decrementAndGet(): Post increment/decrement the value by one
6. getAndIncrement()/getAndDecrement(): Pre increment/decrement
7. addAndGet()/getAndAdd(): Pre and post operators for addition/subtraction. Passing -ve values lead to subtraction operation and hence no separate method is required.

Q11. Does Atomic package support complex types.
Ans. No. However, behavior can be achieved by using atomic reference and encapsulating data/object type to read only object. Thus, values within an object can never be changed. Any changes to data object are accomplished by creating a new object and assigning it to AtomicReference, which is an Atomic operation.

Disadvantage: It leads to many intermediate data objects. Thus, if object construction is costly, this shouldn't be used.

Q12. Synchronized queues/Notification queues
Ans. Helpful in producer consumer problems. While using these synchronized queues, you can get away with synchronization. This blocks or times out when you attempt to add to a full queue, or retrieve from an empty queue.

interface: java.util.concurrent.BlockingQueue

Implementations:
1. LinkedBlockingQueue: A FIFO queue that can be either bounded or unbounded. This collection supports the blocking interface.
       Ex: new LinkedBlockingQueue<LiftOff>() // Unlimited size

2. ArrayBlockingQueue: A bounded FIFO queue which has a fixed size. This collection supports the blocking interface, an interface that allows threads to wait either for space to be available (while storing data) or data to be available (while retrieving data).
       Ex: new ArrayBlockingQueue<LiftOff>(3); // size of 3

3. SynchronousQueue: A bounded FIFO queue with size of 1. The bound on this queue is one (no elements are actually held in the collection), and multiple threads operate on it synchronously.
        Ex: new SynchronousQueue<LiftOff>()); // size of 1

4.PriorityBlockingQueue: Implements both PriorityQueue class and BlockingQueue interface. This is basically a priority queue that has blocking retrieval operations. Here’s an example where the objects in the priority queue are tasks that emerge from the queue in priority order. A PrioritizedTask is given a priority number to provide this order.

5. DelayQueue: This is an unbounded BlockingQueue of objects that implement the Delayed interface. An object can only be taken from the queue when its delay has expired. The queue is sorted so that the object at the head has a delay that has expired for the longest time. If no delay has expired, then there is no head element and poll() will return null (because of this, you cannot place null elements in the queue).

NOTE: DelayQueue is thus a variation of a priority queue.

Q13. Using Pipes for I/O btw tasks
Ans. It’s often useful for tasks to communicate with each other using I/O. Threading libraries may provide support for inter-task I/O in the form of pipes. These exist in the Java I/O library as the classes PipedWriter (which allows a task to write into a pipe) and PipedReader (which allows a different task to read from the same pipe). This can be thought of as a variation of the producer-consumer problem, where the pipe is the canned solution. The pipe is basically a blocking queue, which existed in versions of Java before BlockingQueue was introduced.

Sender and Receiver represent tasks that need to communicate with each other. Sender creates a PipedWriter, which is a standalone object, but inside Receiver the creation of PipedReader must be associated with a PipedWriter in the constructor (as demonstrated below).

class Sender implements Runnable
{
     private PipedWriter out = new PipedWriter();
}

class Receiver implements Runnable
{
     private PipedReader in;

     public Receiver(Sender sender) throws IOException
     {
          in = new PipedReader(sender.getPipedWriter());
     }
}

But when it does a read(), the pipe automatically blocks when there is no more data.

NOTE: BlockingQueues are more robust and easier to use.

An important difference between a PipedReader and normal I/O is that former is interruptible whereas latter is not.


Q14. New library components for concurrency problems/Synchronization classes
Ans.
1. CountDownLatch: This is used to synchronize one or more tasks by forcing them to wait for the completion of a set of operations being performed by other tasks. You give an initial count to a CountDownLatch object, and any task that calls await() on that object will block until the count reaches zero.

Other tasks may call countDown() on the object to reduce the count, presumably when a task finishes its job. Count can be decremented multiple times by same thread or even by a thread which is not waiting i.e any thread can decrement the count any number of times. The tasks that call countDown() are not blocked when they make that call. Only the call to await() is blocked until the count reaches zero.
A typical use is to divide a problem into n independently solvable tasks and create a CountDownLatch with a value of n. When each task is finished it calls countDown() on the latch. Tasks waiting for the problem to be solved call await() on the latch to hold themselves back until it is completed.Ex: 

// All must share a single CountDownLatch object:
CountDownLatch latch = new CountDownLatch(SIZE);

A CountDownLatch is designed to be used in a one-shot fashion; the count cannot be reset. If you need a version that resets the count, you can use a CyclicBarrier instead.

2. CyclicBarrier: A CyclicBarrier is used in situations where you want to create a group of tasks to perform work in parallel, and then wait until they are all finished before moving on to the next step. It brings all the parallel tasks into alignment at the barrier so you can move forward in unison. This is very similar to the CountDownLatch, except that a CountDownLatch is a one-shot event, whereas a CyclicBarrier can be reused over and over.

Here developer specifies the count of threads using barrier. This count is used to trigger the barrier; all waiting threads are released when number of threads waiting is equal to count specified in beginning. Thus here, unlike CountDownLatch, count doesn’t become zero, but becomes equal to number of threads participating.

A CyclicBarrier can also be given a "barrier action", which is a Runnable that is automatically executed when trigger is executed but prior to releasing threads. Here, we could be performing some cleanup operations for previous tasks or some setup operations for new task. This is another distinction between CyclicBarrier and CountdownLatch.Ex:
         CyclicBarrier barrier = new CyclicBarrier(SIZE, new Runnable() {..} );

3. Semaphore: A normal lock (from concurrent.locks or the built-in synchronized lock) only allows one task at a time to access a resource. A counting semaphore allows n tasks to access the resource at the same time. As an example, consider the concept of the object pool, which manages a limited number of objects by allowing them to be checked out for use, and then checked back in again when the user is finished. This functionality can be encapsulated in a generic class known as Semaphore.

private Semaphore available;
// Acquiring resource 
available.acquire();

// Releasing resource
available.release();

Unlike locks, this doesn’t provide a condition variables and nesting mechanism. Multiple acquisitions by same thread acquire multiple permits from semaphore.

NOTE: If fair flag is set to true while initializing semaphores, it tries to use FIFO mechanism. But downside of this would be speed.

4. Exchanger: An Exchanger is a barrier that swaps objects between two tasks. When the tasks enter the barrier, they have one object and when they leave, they have the object that was formerly held by the other task. Exchangers are typically used when one task is creating objects that are expensive to produce and another task is consuming those objects; this way, more objects can be created at the same time as they are being consumed.

For example, to exercise the Exchanger class, we’ll create producer and consumer tasks. ExchangerProducer and ExchangerConsumer use a List<T> as the object to be exchanged; each one contains an Exchanger for this List<T>. When you call Exchanger.exchange() method, it blocks until the partner task calls its exchange() method, and when both exchange() methods have completed, the List<T> has been swapped.

5. ReadWrite Locks: ReadWriteLocks optimize the situation where you write to a data structure relatively infrequently, but multiple tasks read from it often. The ReadWriteLock allows you to have many readers at one time as long as no one is attempting to write. If the write lock is held, then no readers are allowed until the write lock is released. It’s completely uncertain whether a ReadWriteLock will improve the performance of your program, and it depends on issues like how often data is being read compared to how often it is being modified, the time of the read and write operations (the lock is more complex, so short operations will not see the benefits), how much thread contention there is, and whether you are running on a multiprocessor machine.

If few threads have obtained a read lock and few others are obtaining for write lock and again few are waiting for read lock; this won’t give read lock to new objects since this may lead to starvation of write locks. Thus, as soon as existing read locks are freed up, it provides a write lock. Then once, thread is done with write operation, further granting of read/write locks is done based on FIFO.

Q15. Is it possible to downgrade/upgrade locks.
Ans. Downgrading of locks is possible, i.e. write lock can be converted to read lock. This happens by acquiring read lock before releasing write lock. However, upgrading of locks i.e. converting a read lock to write lock is not possible.

Q16. How would you implement and use CountDown Latch.
Ans. 

Implementing CountDownLatch
public class CountDownLatch
{
      private final Object synchObj = new Object();
      private int count;

      public CountDownLatch(int noThreads)
      {
            synchronized (synchObj)
            
                  this.count = noThreads; 
            
      }
      

      public void await() throws InterruptedException
      {
            synchronized (synchObj)
            
                  while (count > 0)
                  {
                        synchObj.wait(); 
                  
            
      }
      
      public void countDown()
      {
            synchronized (synchObj)
            
                  if (--count <= 0)
                  {
                        synchObj.notifyAll(); 
                  
            
      }
}

Usage of CountDownLatch
public class Driver
{
      // ...       void main() throws InterruptedException
      {
            CountDownLatch startSignal = new CountDownLatch(1);
            CountDownLatch doneSignal = new CountDownLatch(N); 

            for (int i = 0; i < N; ++i) // create and start threads 
                  new Thread(new Worker(startSignal, doneSignal)).start(); 

            doSomethingElse(); // don't let run yet
            System.out.println(“Go at” + System.currentTimeMills()); 
            startSignal.countDown(); // let all threads proceed 
            doSomethingElse(); 
            doneSignal.await(); // wait for all to finish 
            System.out.println(“Done at” + System.currentTimeMills()); 
      }
}

class Worker implements Runnable
{
      private final CountDownLatch startSignal;
      private final CountDownLatch doneSignal;
      Worker(CountDownLatch startSignal, CountDownLatch doneSignal)
      {
            this.startSignal = startSignal;
            this.doneSignal = doneSignal; 
      }

      public void run()
      {
            try
            {
                  startSignal.await();
                  doWork();
                  doneSignal.countDown();
            }
            catch (InterruptedException ex) {} // return;
      }
}


No comments:

Post a Comment