Here I am going to create a simple producer-consumer program in java.
Let's take the below problem scenario.
Suppose I have a file containing the huge number of data. So my producer will read this data from the input file.
Put this data into a queue. Let's have two consumers to read this data from the queue.
Now we can go to implementation steps.
1.Producer
For creating the producer thread, we can either extends Thread class or implements Runnable interface. here we are going to implement Runnable class since it has more advantages.
If we extend Thread class, we can’t extend any other class which may be required later. So I would prefer to use runnable interface.
Let's declare and use a shared queue in our producer as shown in the below snippet.
In producer, we are reading the data one by one from input.txt file and offering(putting) each entry one by one to String queue.
package test;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.Queue;
import java.util.Scanner;
public class Producer implements Runnable {
private Queue<String> dataQ;
public Producer(Queue<String> sharedDataQ) {
dataQ=sharedDataQ;
}
@Override
public void run() {
File f=new File("test.txt");
try {
Scanner sc=new Scanner(f);
while(sc.hasNext()){
dataQ.offer(sc.next());
}
DataQueue.QofferCompleted=true;
} catch (FileNotFoundException e) {
e.printStackTrace();
}catch(Exception e){
e.printStackTrace();
}
}
}
2.Consumer
In Consumer thread, we will be polling the data from the queue and printing the output to the console. code snippets are shown below. Here we used a thread number variable to identify which thread is processing the data.
package test;
import java.util.Queue;
public class Consumer implements Runnable {
private int threadNumber;
private Queue<String> dataQ;
Consumer(int threadNumber, Queue<String> sharedDataQ){
this.threadNumber=threadNumber;
dataQ=sharedDataQ;
}
@Override
public void run() {
while(!dataQ.isEmpty() || !DataQueue.QofferCompleted){
if( !dataQ.isEmpty()) {
try {
String consumedData= dataQ.poll();
System.out.println("From "+this.threadNumber+":"+consumedData);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
Now we can go corresponding runner class.
3.Runner
In Runner class, we are going to create one producer to read the data from the file and two consumers to process the data from the queue. Shared data queue is created and set for both producer and consumers.
package test;
import java.util.LinkedList;
import java.util.Queue;
public class Runner {
public static void main(String[] args) {
Queue<String> sharedDataQ=new LinkedList<>();
Thread p=new Thread(new Producer(sharedDataQ));
p.start();
Thread c1=new Thread(new Consumer(1,sharedDataQ));
c1.start();
Thread c2=new Thread(new Consumer(2,sharedDataQ));
c2.start();
}
}
That's it. Hope this example helps the beginners to create a simple producer-consumer framework.
You will get below output finally for a sample test input file. Please add your suggestions to improve this.
From 1:example
From 1:of
From 1:producer
From 1:consumer
From 2:show
From 2:java
From 2:code
From 2:example
Let's take the below problem scenario.
Suppose I have a file containing the huge number of data. So my producer will read this data from the input file.
Put this data into a queue. Let's have two consumers to read this data from the queue.
Now we can go to implementation steps.
1.Producer
For creating the producer thread, we can either extends Thread class or implements Runnable interface. here we are going to implement Runnable class since it has more advantages.
If we extend Thread class, we can’t extend any other class which may be required later. So I would prefer to use runnable interface.
Let's declare and use a shared queue in our producer as shown in the below snippet.
In producer, we are reading the data one by one from input.txt file and offering(putting) each entry one by one to String queue.
package test;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.Queue;
import java.util.Scanner;
public class Producer implements Runnable {
private Queue<String> dataQ;
public Producer(Queue<String> sharedDataQ) {
dataQ=sharedDataQ;
}
@Override
public void run() {
File f=new File("test.txt");
try {
Scanner sc=new Scanner(f);
while(sc.hasNext()){
dataQ.offer(sc.next());
}
DataQueue.QofferCompleted=true;
} catch (FileNotFoundException e) {
e.printStackTrace();
}catch(Exception e){
e.printStackTrace();
}
}
}
2.Consumer
In Consumer thread, we will be polling the data from the queue and printing the output to the console. code snippets are shown below. Here we used a thread number variable to identify which thread is processing the data.
package test;
import java.util.Queue;
public class Consumer implements Runnable {
private int threadNumber;
private Queue<String> dataQ;
Consumer(int threadNumber, Queue<String> sharedDataQ){
this.threadNumber=threadNumber;
dataQ=sharedDataQ;
}
@Override
public void run() {
while(!dataQ.isEmpty() || !DataQueue.QofferCompleted){
if( !dataQ.isEmpty()) {
try {
String consumedData= dataQ.poll();
System.out.println("From "+this.threadNumber+":"+consumedData);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
3.Runner
In Runner class, we are going to create one producer to read the data from the file and two consumers to process the data from the queue. Shared data queue is created and set for both producer and consumers.
package test;
import java.util.LinkedList;
import java.util.Queue;
public class Runner {
public static void main(String[] args) {
Queue<String> sharedDataQ=new LinkedList<>();
Thread p=new Thread(new Producer(sharedDataQ));
p.start();
Thread c1=new Thread(new Consumer(1,sharedDataQ));
c1.start();
Thread c2=new Thread(new Consumer(2,sharedDataQ));
c2.start();
}
}
That's it. Hope this example helps the beginners to create a simple producer-consumer framework.
You will get below output finally for a sample test input file. Please add your suggestions to improve this.
From 1:example
From 1:of
From 1:producer
From 1:consumer
From 2:show
From 2:java
From 2:code
From 2:example
What is the DataQueue here?
ReplyDeleteDataQueue is for tracking whether the queue is fully cosumed or not? I will add that class also
Delete