Nowadays
The Producer/Consumer design pattern is very important multi-processing
framework.Today we are going to see how to create a simple producer-consumer
framework using spring framework and blocking queues.
Basic
flow:-
Producer thread
reads the data from different text files present in input location. At the same
time consumer threads start consuming the data. Here we are going to describe
the development of this simple pattern.
Let's use
maven build. We need to add spring-context dependency in pom.xml file for
spring.
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.2.5.RELEASE</version>
</dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.2.5.RELEASE</version>
</dependency>
Step 1: config
and context
As a
first step, We need a property file to read the required properties like
producer/consumer thread count. We can have a config.properties file as shown
below.
producer.thread.count=2
consumer.thread.count=2
Let's
have a context file to load the required beans to spring container.Create a
context.xml and config.properties files and add the same to classpath.
Step 2:Properties
Reading from context
In
context file we need to specify all the required beans for your application. For
reading config.properties file from the classpath, we need to add PropertyPlaceholderConfigurer bean in context.xml
file as follows.
<bean id="propertyFile"
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="location" value="classpath:config.properties" /> </bean>
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="location" value="classpath:config.properties" /> </bean>
Now we can load properties from config.properties as shown below.
<bean id="producerCount"
class="java.lang.Integer" autowire="byName">
<constructor-arg value="${producer.thread.count}"></constructor-arg>
</bean>
<constructor-arg value="${producer.thread.count}"></constructor-arg>
</bean>
<bean id="consumerCount" class="java.lang.Integer"
autowire="byName">
<constructor-arg value="${consumer.thread.count}"></constructor-arg>
</bean>
<constructor-arg value="${consumer.thread.count}"></constructor-arg>
</bean>
Step 3:Main
and Runner
Here we
are going to create a Main class and Runner class with an execute method to
start our producers and consumers.
App.java
package spring.app;
import spring.app.common.JobRunner;
public class App
{
public static void main( String[] args )
{
JobRunner jobRunner=new JobRunner();
jobRunner.execute();
}
}
{
public static void main( String[] args )
{
JobRunner jobRunner=new JobRunner();
jobRunner.execute();
}
}
JobRunner.java
package spring.app.common;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import spring.app.Consumer;
import spring.app.Producer;
public class JobRunner {
private int producerCount;
private int consumerCount;
public static boolean isReadComplete;
private ApplicationContext cp;
public void execute(){
cp = new ClassPathXmlApplicationContext("context.xml");
producerCount=(Integer)cp.getBean("producerCount");
consumerCount=(Integer)cp.getBean("consumerCount");;
ThreadPoolExecutor producerPoolExecutor=(ThreadPoolExecutor) Executors.newFixedThreadPool(producerCount);
// producer
for(int i=1; i<=producerCount;i++){
Producer p=(Producer)cp.getBean("producer");
p.setThreadNumber(i);
producerPoolExecutor.execute(p);
}
//consumer
ThreadPoolExecutor consumerPoolExecutor=(ThreadPoolExecutor) Executors.newFixedThreadPool(consumerCount);
for(int i=1; i<=consumerCount;i++){
Consumer c=(Consumer)cp.getBean("consumer");
c.setThreadNumber(i);
consumerPoolExecutor.execute(c);
}
try {
producerPoolExecutor.awaitTermination(1, TimeUnit.SECONDS);
producerPoolExecutor.shutdown();
isReadComplete=true;
consumerPoolExecutor.awaitTermination(1, TimeUnit.SECONDS);
consumerPoolExecutor.shutdown();
System.out.println("Completed");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import spring.app.Consumer;
import spring.app.Producer;
public class JobRunner {
private int producerCount;
private int consumerCount;
public static boolean isReadComplete;
private ApplicationContext cp;
public void execute(){
cp = new ClassPathXmlApplicationContext("context.xml");
producerCount=(Integer)cp.getBean("producerCount");
consumerCount=(Integer)cp.getBean("consumerCount");;
ThreadPoolExecutor producerPoolExecutor=(ThreadPoolExecutor) Executors.newFixedThreadPool(producerCount);
// producer
for(int i=1; i<=producerCount;i++){
Producer p=(Producer)cp.getBean("producer");
p.setThreadNumber(i);
producerPoolExecutor.execute(p);
}
//consumer
ThreadPoolExecutor consumerPoolExecutor=(ThreadPoolExecutor) Executors.newFixedThreadPool(consumerCount);
for(int i=1; i<=consumerCount;i++){
Consumer c=(Consumer)cp.getBean("consumer");
c.setThreadNumber(i);
consumerPoolExecutor.execute(c);
}
try {
producerPoolExecutor.awaitTermination(1, TimeUnit.SECONDS);
producerPoolExecutor.shutdown();
isReadComplete=true;
consumerPoolExecutor.awaitTermination(1, TimeUnit.SECONDS);
consumerPoolExecutor.shutdown();
System.out.println("Completed");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Here we
are using ThreadPoolExecutor class to start and run the producer-consumer
threads in an efficient way. Thread pool will take care of the creation and
termination of threads and provide improved performance when executing large
numbers of asynchronous tasks. Shutdown method of executor initiates an orderly
shutdown in which previously submitted tasks(threads) are executed, but no new
tasks will be accepted. Invocation has no additional effect if already shut
down.This method does not wait for previously submitted tasks to complete
execution. So here we are using
awaitTermination method to do
that.
As
mentioned earlier producer threads will read all the files from input location.
So let's have a file queue to load the all the file details from the input
location. So we are going to create ResoureHelper class to populate all the
files to file queue. ResourceHelper class implements InitializingBean.So
afterPropertiesSet() method in ResourceHelper class will be executed immediately after the context
initialization.
PFB the
ResourceHelper class created.ResourceHelper.java
ResourceHelper.java
package spring.app.common;
import java.io.File;
import java.util.concurrent.LinkedBlockingQueue;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.Resource;
public class ResourceHelper implements InitializingBean {
@Autowired
private LinkedBlockingQueue<File> fileQ;
private Resource inputFiles;
public void setInputFiles(Resource inputFiles) {
this.inputFiles = inputFiles;
}
@Override
public void afterPropertiesSet() throws Exception {
for(File file:inputFiles.getFile().listFiles()){
fileQ.offer(file);
}
}
}
In
context file we can declare resource property as shown below.
<bean id="resourceHelper" class="spring.app.common.ResourceHelper">
<property name="inputFiles" value="file:D:\\CODE\\data\\incoming\\"></property>
</bean>
Step 4: Initialize
the required blocking queues
<bean id="dataQ" class="java.util.concurrent.LinkedBlockingQueue" autowire="byName">
<constructor-arg type="int" value="1000"></constructor-arg>
</bean>
<bean id="fileQ" class="java.util.concurrent.LinkedBlockingQueue" autowire="byName">
<constructor-arg type="int" value="1000"></constructor-arg>
</bean>
These queues can be auto-wired
into Producer and consumer classes as required.Now let's go through the creation of
Producer and Consumer thread classes.
Step 5:
Creation of Producer and Consumer threads
Producer
thread will read the files from fileQ and offer(load) the data to dataQ which
will be shared with consumer.
Producer.java
package
spring.app;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.concurrent.BlockingQueue;
import javax.annotation.Resource;
public class Producer implements Runnable,Cloneable {
@Resource
private BlockingQueue<String> dataQ;
@Resource
private BlockingQueue<File> fileQ;
private int threadNumber;
public void setThreadNumber(int threadNumber) {
this.threadNumber = threadNumber;
}
public void run() {
BufferedReader br = null;
try{
while(!fileQ.isEmpty()){
File f=fileQ.poll();
System.out.println("Processing "+f.getName());
br=new BufferedReader(new FileReader(f));
String dataLine=null;
while((dataLine=br.readLine())!=null){
System.out.println("Producer:"+threadNumber+" Data read:"+dataLine);
dataQ.offer(dataLine);
}
}
}catch(Exception e){
System.out.println(e);
}finally{
try {
if(br!=null)
br.close();
} catch (IOException e) {
System.out.println(e);
}
}
}
}
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.concurrent.BlockingQueue;
import javax.annotation.Resource;
public class Producer implements Runnable,Cloneable {
@Resource
private BlockingQueue<String> dataQ;
@Resource
private BlockingQueue<File> fileQ;
private int threadNumber;
public void setThreadNumber(int threadNumber) {
this.threadNumber = threadNumber;
}
public void run() {
BufferedReader br = null;
try{
while(!fileQ.isEmpty()){
File f=fileQ.poll();
System.out.println("Processing "+f.getName());
br=new BufferedReader(new FileReader(f));
String dataLine=null;
while((dataLine=br.readLine())!=null){
System.out.println("Producer:"+threadNumber+" Data read:"+dataLine);
dataQ.offer(dataLine);
}
}
}catch(Exception e){
System.out.println(e);
}finally{
try {
if(br!=null)
br.close();
} catch (IOException e) {
System.out.println(e);
}
}
}
}
Consumer.java
package
spring.app;
import java.util.concurrent.BlockingQueue;
import javax.annotation.Resource;
import spring.app.common.JobRunner;
public class Consumer implements Runnable {
@Resource
private BlockingQueue<String> dataQ;
private int threadNumber;
public void setThreadNumber(int threadNumber) {
this.threadNumber = threadNumber;
}
@Override
public void run() {
while(!dataQ.isEmpty() || !JobRunner.isReadComplete){
try {
String data=dataQ.poll();
if(data!=null){
System.out.println("Consumer:"+threadNumber+" Data consumed:"+data);
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
import java.util.concurrent.BlockingQueue;
import javax.annotation.Resource;
import spring.app.common.JobRunner;
public class Consumer implements Runnable {
@Resource
private BlockingQueue<String> dataQ;
private int threadNumber;
public void setThreadNumber(int threadNumber) {
this.threadNumber = threadNumber;
}
@Override
public void run() {
while(!dataQ.isEmpty() || !JobRunner.isReadComplete){
try {
String data=dataQ.poll();
if(data!=null){
System.out.println("Consumer:"+threadNumber+" Data consumed:"+data);
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
We need to have separate instances of producer/consumer
while creating and submitting it to thread pool. So while specifying
producer/consumer beans in context file these beans should have prototype
scope. Thus it will create separate instance on each request.
<bean id="producer" class="spring.app.Producer"
scope="prototype" />
<bean id="consumer" class="spring.app.Consumer" scope="prototype" />
<bean id="consumer" class="spring.app.Consumer" scope="prototype" />
So our final context file
will be like as below.
context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.5.xsd
">
<context:annotation-config />
<bean id="propertyFile"
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="location" value="classpath:config.properties" /> </bean>
<bean id="producerCount" class="java.lang.Integer" autowire="byName">
<constructor-arg value="${producer.thread.count}"></constructor-arg>
</bean>
<bean id="consumerCount" class="java.lang.Integer" autowire="byName">
<constructor-arg value="${consumer.thread.count}"></constructor-arg>
</bean>
<bean id="producer" class="spring.app.Producer" scope="prototype" />
<bean id="consumer" class="spring.app.Consumer" scope="prototype" />
<bean id="dataQ" class="java.util.concurrent.LinkedBlockingQueue"
autowire="byName">
<constructor-arg type="int" value="1000"></constructor-arg>
</bean>
<bean id="fileQ" class="java.util.concurrent.LinkedBlockingQueue"
autowire="byName">
<constructor-arg type="int" value="1000"></constructor-arg>
</bean>
<bean id="resourceHelper" class="spring.app.common.ResourceHelper">
<property name="inputFiles" value="file:D:\\CODE\\data\\incoming\\"></property>
</bean> </beans>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.5.xsd
">
<context:annotation-config />
<bean id="propertyFile"
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="location" value="classpath:config.properties" /> </bean>
<bean id="producerCount" class="java.lang.Integer" autowire="byName">
<constructor-arg value="${producer.thread.count}"></constructor-arg>
</bean>
<bean id="consumerCount" class="java.lang.Integer" autowire="byName">
<constructor-arg value="${consumer.thread.count}"></constructor-arg>
</bean>
<bean id="producer" class="spring.app.Producer" scope="prototype" />
<bean id="consumer" class="spring.app.Consumer" scope="prototype" />
<bean id="dataQ" class="java.util.concurrent.LinkedBlockingQueue"
autowire="byName">
<constructor-arg type="int" value="1000"></constructor-arg>
</bean>
<bean id="fileQ" class="java.util.concurrent.LinkedBlockingQueue"
autowire="byName">
<constructor-arg type="int" value="1000"></constructor-arg>
</bean>
<bean id="resourceHelper" class="spring.app.common.ResourceHelper">
<property name="inputFiles" value="file:D:\\CODE\\data\\incoming\\"></property>
</bean> </beans>
Final Project Structure