ConcurrentLinkedQueue in Java implements Queue interface and it was added in Java 5 along with other concurrent utilities like CyclicBarrier, CountDownLatch, Semaphore, ConcurrentHashMap etc.
ConcurrentLinkedQueue in Java is an unbounded thread-safe queue which stores its elements as linked nodes. This queue orders elements FIFO (first-in-first-out). The head of the queue is the element that has been on the queue the longest time. The tail of the queue is the element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue.
Like most other concurrent collection implementations, ConcurrentLinkedQueue class does not permit the use of null elements.
Usage of ConcurrentLinkedQueue in Java
A ConcurrentLinkedQueue is an appropriate choice when many threads will share access to a common collection. Note that it doesn't block operations as it is done in the implementations of BlockingQueue interface like ArrayBlockingQueue, LinkedBlockingQueue. So there are no put() or take() methods which will wait if required.
Java ConcurrentLinkedQueue constructors
- ConcurrentLinkedQueue()- Creates a ConcurrentLinkedQueue that is initially empty.
- ConcurrentLinkedQueue(Collection<? extends E> c)- Creates a ConcurrentLinkedQueue initially containing the elements of the given collection, added in traversal order of the collection's iterator.
Java ConcurrentLinkedQueue Iterator
Iterators in ConcurrentLinkedQueue are weakly consistent, returning elements reflecting the state of the queue at some point at or since the creation of the iterator. They do not throw ConcurrentModificationException, and may proceed concurrently with other operations. Elements contained in the queue since the creation of the iterator will be returned exactly once.
ConcurrentLinkedQueue Java Example
Let's create a producer consumer Java program using ConcurrentLinkedQueue. In this code there will be one producer thread putting element into the queue and three consumer threads retrieving elements from the queue. Note that producer thread will put only 3 elements.
You can run the program multiple times and observe that two consumer threads are not retrieving the same element from the queue.
Later ConcurrentLinkedQueue is replaced with PriorityQueue which is not synchronized. Running this code will result in consumer threads getting into a race condition and overstepping on each other. Note that sometimes you may get correct output also but in multiple runs you are bound to get incorrect results.
import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ConcurrentLQDemo { public static void main(String[] args) { Buffer buffer = new Buffer(); ExecutorService executor = Executors.newFixedThreadPool(4); // Calling one producer executor.execute(new ProdTask(buffer)); // Calling three consumers executor.execute(new ConTask(buffer)); executor.execute(new ConTask(buffer)); executor.execute(new ConTask(buffer)); executor.shutdown(); } } /** * */ class ProdTask implements Runnable{ Buffer buffer; ProdTask(Buffer buffer){ this.buffer = buffer; } @Override public void run() { // putting just three elements for(int i = 0; i < 3; i++){ buffer.put(i); } } } /** * */ class ConTask implements Runnable{ Buffer buffer; ConTask(Buffer buffer){ this.buffer = buffer; } @Override public void run() { try { // delay to make sure producer starts first Thread.sleep(20); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } buffer.get(); } } //Shared class used by threads class Buffer{ int i; Queue<Integer> clQueue = new ConcurrentLinkedQueue<Integer>(); //Retrieving from the queue public void get(){ System.out.println("Consumer recd - " + clQueue.poll()); } // putting in the queue public void put(int i){ this.i = i; clQueue.add(i); System.out.println("Putting - " + i); } }
Output
Putting - 0 Putting - 1 Putting - 2 Consumer recd - 0 Consumer recd - 2 Consumer recd - 1
Now let's replace the ConcurrentLinkedQueue with PriorityQueue which is not synchronized.
import java.util.PriorityQueue; import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ConcurrentLQDemo { public static void main(String[] args) { Buffer buffer = new Buffer(); ExecutorService executor = Executors.newFixedThreadPool(4); // Calling one producer executor.execute(new ProdTask(buffer)); // Calling three consumers executor.execute(new ConTask(buffer)); executor.execute(new ConTask(buffer)); executor.execute(new ConTask(buffer)); executor.shutdown(); } } /** * */ class ProdTask implements Runnable{ Buffer buffer; ProdTask(Buffer buffer){ this.buffer = buffer; } @Override public void run() { // putting just three elements for(int i = 0; i < 3; i++){ buffer.put(i); } } } /** * */ class ConTask implements Runnable{ Buffer buffer; ConTask(Buffer buffer){ this.buffer = buffer; } @Override public void run() { try { // delay to make sure producer starts first Thread.sleep(20); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } buffer.get(); } } //Shared class used by threads class Buffer{ int i; //Queue<Integer> clQueue = new ConcurrentLinkedQueue<Integer>(); Queue<Integer> clQueue = new PriorityQueue<Integer>(); //Retrieving from the queue public void get(){ System.out.println("Consumer recd - " + clQueue.poll()); } // putting in the queue public void put(int i){ this.i = i; clQueue.add(i); System.out.println("Putting - " + i); } }
Output
Putting - 0 Putting - 1 Putting - 2 Consumer recd - 0 Consumer recd - 1 Consumer recd - 0
Here it can be seen Consumer threads are getting the same element out of the queue.
That's all for this topic ConcurrentLinkedQueue in Java With Examples. If you have any doubt or any suggestions to make please drop a comment. Thanks!
Related Topics
You may also like-