This post shows how you can create your own
BlockingQueue in Java using
ReentrantLock and Condition
interface. Condition interface provides method await
and signal
which work the same
way as
wait and notify.
BlockingQueue Java Program
Here we have a class called BufferClass which has an array of type Object, whose length is 5. So, 5 is the bound for buffer, if 5 values are already added to the array it will be blocked until at least one value is retrieved from the array.
put()
and take()
methods are used to add values and retrieve values from an array respectively.
BufferClass
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class BufferClass { final Lock lock = new ReentrantLock(); // Conditions final Condition produce = lock.newCondition(); final Condition consume = lock.newCondition(); final Object[] valueArr = new Object[5]; int putIndex, takeIndex; int count; public void put(Object x) throws InterruptedException { //System.out.println("count -- " + count); //System.out.println("Array length -- " + valueArr.length); lock.lock(); try { while (count == valueArr.length){ produce.await(); //System.out.println("Awaiting"); } valueArr[putIndex] = x; System.out.println("Adding - " + x); if (++putIndex == valueArr.length){ putIndex = 0; } // increment count ++count; consume.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0){ consume.await(); } Object x = valueArr[takeIndex]; System.out.println("Retrieving - " + x); if (++takeIndex == valueArr.length){ takeIndex = 0; } // reduce the count --count; // signal producer produce.signal(); return x; } finally { lock.unlock(); } } }
To test this BufferClass we have another class BufferClassDemo where two threads are created, one will add values to the buffer and another will retrieve values from the buffer. Here 10 values are added, BufferClass should ensure if 5 values are already added any attempt to add any further value should be blocked. Same way if the buffer is empty any attempt to retrieve value should be blocked.
public class BufferClassDemo { public static void main(String[] args) { BufferClass bufferClass = new BufferClass(); // Creating two threads Thread producer = new Thread(new Producer(bufferClass)); Thread consumer = new Thread(new Consumer(bufferClass)); // starting threads producer.start(); consumer.start(); } } class Producer implements Runnable { private BufferClass bufferClass; public Producer(BufferClass bufferClass){ this.bufferClass = bufferClass; } @Override public void run() { for (int i = 1; i <= 10; i++) { try { //Thread.sleep(10); bufferClass.put(i); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Consumer implements Runnable { private BufferClass bufferClass; public Consumer(BufferClass bufferClass){ this.bufferClass = bufferClass; } @Override public void run() { for (int i = 1; i <= 10; i++) { try { // Thread.sleep(500); bufferClass.take(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
Output
Output from one of the run. Note that output may vary but the condition of not having more than 5 elements and blocking any attempt to add should hold.
Adding - 1 Adding - 2 Adding - 3 Adding - 4 Adding - 5 Retrieving - 1 Retrieving - 2 Retrieving - 3 Retrieving - 4 Retrieving - 5 Adding - 6 Adding - 7 Adding - 8 Retrieving - 6 Retrieving - 7 Retrieving - 8 Adding - 9 Retrieving - 9 Adding - 10 Retrieving - 10
That's all for this topic Java Program to Create Your Own BlockingQueue. If you have any doubt or any suggestions to make please drop a comment. Thanks!
>>>Return to Java Programs Page
Related Topics
You may also like-