Synchronized Queue

Till Java 5 (jdk1.5) developer has to take care of synchronizing queue in multi threaded application. But in Java5 new synchronized implementation java.util.concurrent.BlockingQueue has been introduced. It extends java.util.Queue and introduced few new methods like put, take etc.

BlockingQueue supports additional operations that waits for the queue to become non-empty when retrieving an element from queue, and wait for space to become available in the queue when storing an element. There are following few implementation available

ArrayBlockingQueue : A simple bounded BloickingQueue implementation backed by an array.

DelayQueue : An unbounded blocking queue of Delayed elements, in which an element can only be taken when its delay has expired.It uses elements that implement the new java.util.concurrent.Delayed interface.

PriorityBlockingQueue : This queue bases ordering on a specified Comparator, and the element returned by any take( ) call is the smallest element based on this ordering.

LinkedBlockingQueue : A simple bounded BloickingQueue implementation backed by a linked list.

SynchronousQueue : This queue has a size of zero (yes, you read that correctly). It blocks put( ) calls until another thread calls take( ), and blocks take( ) calls until another thread calls put( ). Essentially, elements can only go directly from a producer to a consumer, and nothing is stored in the queue itself (other than for transition purposes).

Running Example :

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class SynchronizedQueueTest {
public static void main(String[] args) {
BlockingQueue<String> q = new LinkedBlockingQueue<String>();
Producer p = new Producer(q);
Consumer c1 = new Consumer(q, "C-1");
p.start();
c1.start();
}
}

class Producer extends Thread {
private BlockingQueue<String> q;

public Producer(BlockingQueue<String> q) {
this.q = q;
}

public void run() {
while (true) {
try {
//putting elements to queue
q.put(new SimpleDateFormat("dd/mm/yy hh:MM:ss:SSSS")
.format(new Date()));
//sleeping for 10 second
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}
}

class Consumer extends Thread {
private BlockingQueue<String> q;
private String name;

public Consumer(BlockingQueue<String> q, String name) {
this.q = q;
this.name = name;
}

public void run() {
while (true) {
try {
//consuming and processing queue elements
System.out.println("Consumer " + name + " read : " + q.take());
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}
}


 

Here in this program, java.util.concurrent.LinkedBlockingQueue is being used as synchronized queue. Producer push an element for every 10 second and Consumer takes element from the same queue.

 

Output of this program :


Consumer C-1 read : 14/55/11 02:05:23:0482
Consumer C-1 read : 14/55/11 02:05:33:0484
Consumer C-1 read : 14/55/11 02:05:43:0484
Consumer C-1 read : 14/55/11 02:05:53:0486
Consumer C-1 read : 14/56/11 02:05:03:0487
Consumer C-1 read : 14/56/11 02:05:13:0487
Consumer C-1 read : 14/56/11 02:05:23:0488
Consumer C-1 read : 14/56/11 02:05:33:0488
Consumer C-1 read : 14/56/11 02:05:43:0489
Consumer C-1 read : 14/56/11 02:05:53:0489
Consumer C-1 read : 14/57/11 02:05:03:0490


This output explains that Consumer get blocked until there is an element in the queue to consume.

Comments

  1. Hey Yogesh,

    Whats the difference between these two?

    TimeUnit.SECONDS.sleep(10);

    and

    Thread.sleep(10);

    --
    Bhaskar

    ReplyDelete

Post a Comment

Popular posts from this blog

Composite Design Pattern by example

State Design Pattern by Example

Eclipse command framework core expression: Property tester