Tuesday, 17 May 2016

Simple producer-consumer design pattern using spring


Nowadays The Producer/Consumer design pattern is very important multi-processing framework.Today we are going to see how to create a simple producer-consumer framework using spring framework and blocking queues.

Basic flow:-
Producer thread reads the data from different text files present in input location. At the same time consumer threads start consuming the data. Here we are going to describe the development of this simple pattern.
Let's use maven build. We need to add spring-context dependency in pom.xml file for spring.

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>4.2.5.RELEASE</version>
        </dependency>
     

Step 1: config and context
As a first step, We need a property file to read the required properties like producer/consumer thread count. We can have a config.properties file as shown below. 

producer.thread.count=2
consumer.thread.count=2
   
Let's have a context file to load the required beans to spring container.Create a context.xml and config.properties files and add the same to classpath. 

Step 2:Properties Reading from context 
In context file we need to specify all the required beans for your application. For reading config.properties file from the classpath, we need to add PropertyPlaceholderConfigurer bean in context.xml file as follows. 
 <bean id="propertyFile"
        class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">        <property name="location" value="classpath:config.properties" />    </bean>
  
Now we can load properties from config.properties as shown below.
 <bean id="producerCount" class="java.lang.Integer" autowire="byName">
        <constructor-arg value="${producer.thread.count}"></constructor-arg>
    </bean>
    <bean id="consumerCount" class="java.lang.Integer" autowire="byName">
        <constructor-arg value="${consumer.thread.count}"></constructor-arg>
    </bean>

Step 3:Main and Runner 

  Here we are going to create a Main class and Runner class with an execute method to start our producers and consumers. 
App.java 
 package spring.app;
import spring.app.common.JobRunner;
public class App
{   
    public static void main( String[] args )
    {       
       JobRunner jobRunner=new JobRunner();   
       jobRunner.execute();
     
    }
}

 JobRunner.java
 package spring.app.common;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import spring.app.Consumer;
import spring.app.Producer;


public class JobRunner {   

    private int producerCount;
    private int consumerCount;
   
    public static boolean isReadComplete;
    private ApplicationContext cp;

    public void execute(){
       
       
        cp = new ClassPathXmlApplicationContext("context.xml");        
        producerCount=(Integer)cp.getBean("producerCount");
        consumerCount=(Integer)cp.getBean("consumerCount");;

        ThreadPoolExecutor producerPoolExecutor=(ThreadPoolExecutor) Executors.newFixedThreadPool(producerCount);
          // producer
           for(int i=1; i<=producerCount;i++){
               Producer p=(Producer)cp.getBean("producer");
               p.setThreadNumber(i);
               producerPoolExecutor.execute(p);  
              
           }
           //consumer
           ThreadPoolExecutor consumerPoolExecutor=(ThreadPoolExecutor) Executors.newFixedThreadPool(consumerCount);

           for(int i=1; i<=consumerCount;i++){
               Consumer c=(Consumer)cp.getBean("consumer");
               c.setThreadNumber(i);
               consumerPoolExecutor.execute(c);  
              
           }
          try {
            producerPoolExecutor.awaitTermination(1, TimeUnit.SECONDS);
            producerPoolExecutor.shutdown();
            isReadComplete=true;
            consumerPoolExecutor.awaitTermination(1, TimeUnit.SECONDS);
            consumerPoolExecutor.shutdown();
            System.out.println("Completed");
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }
}

Here we are using ThreadPoolExecutor class to start and run the producer-consumer threads in an efficient way. Thread pool will take care of the creation and termination of threads and provide improved performance when executing large numbers of asynchronous tasks. Shutdown method of executor initiates an orderly shutdown in which previously submitted tasks(threads) are executed, but no new tasks will be accepted. Invocation has no additional effect if already shut down.This method does not wait for previously submitted tasks to complete execution. So here we are using  awaitTermination  method to do that.

As mentioned earlier producer threads will read all the files from input location. So let's have a file queue to load the all the file details from the input location. So we are going to create ResoureHelper class to populate all the files to file queue. ResourceHelper class implements InitializingBean.So afterPropertiesSet() method in ResourceHelper class will be  executed immediately after the context initialization.
PFB the ResourceHelper class created.ResourceHelper.java


ResourceHelper.java
 
package spring.app.common;

import java.io.File;
import java.util.concurrent.LinkedBlockingQueue;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.Resource;

public class ResourceHelper implements InitializingBean {

    @Autowired
    private LinkedBlockingQueue<File> fileQ;  
   
    private Resource inputFiles; 
      
    public void setInputFiles(Resource inputFiles) {
        this.inputFiles = inputFiles;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        for(File file:inputFiles.getFile().listFiles()){
            fileQ.offer(file);
        }

    }

}
In context file we can declare resource property as shown below.

   <bean id="resourceHelper" class="spring.app.common.ResourceHelper">
        <property name="inputFiles" value="file:D:\\CODE\\data\\incoming\\"></property>
    </bean>

Step 4: Initialize the required blocking queues  
 
Blocking queues additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.We need two queues here. One for files(fileQ) and other one for sharing the data between Producer and consumer(dataQ). Initializations of queues in context file as follows.Constructor argument indicates the queue size.

   <bean id="dataQ" class="java.util.concurrent.LinkedBlockingQueue"  autowire="byName">
        <constructor-arg type="int" value="1000"></constructor-arg>
    </bean>
    <bean id="fileQ" class="java.util.concurrent.LinkedBlockingQueue" autowire="byName">
        <constructor-arg type="int" value="1000"></constructor-arg>
    </bean>
These queues can be auto-wired into Producer and consumer classes as required.Now let's go through the creation of Producer and Consumer thread classes.

Step 5: Creation of Producer and Consumer threads
 Producer thread will read the files from fileQ and offer(load) the data to dataQ which will be shared with consumer.

Producer.java

package spring.app;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.concurrent.BlockingQueue;

import javax.annotation.Resource;

public class Producer implements Runnable,Cloneable {

    @Resource   
    private BlockingQueue<String> dataQ;
   
    @Resource
    private BlockingQueue<File> fileQ;
   
    private int threadNumber;   

    public void setThreadNumber(int threadNumber) {
        this.threadNumber = threadNumber;
    }

    public void run() {
       
        BufferedReader br = null;
        try{
            while(!fileQ.isEmpty()){
                File f=fileQ.poll();
                System.out.println("Processing "+f.getName());               
                br=new BufferedReader(new FileReader(f));
                String dataLine=null;
                while((dataLine=br.readLine())!=null){
                    System.out.println("Producer:"+threadNumber+" Data read:"+dataLine);
                    dataQ.offer(dataLine);
                }
            }
        }catch(Exception e){
            System.out.println(e);
        }finally{
            try {
                if(br!=null)
                    br.close();
            } catch (IOException e) {
                System.out.println(e);
            }
        }

    }

}
Consumer.java


package spring.app;

import java.util.concurrent.BlockingQueue;

import javax.annotation.Resource;

import spring.app.common.JobRunner;

public class Consumer implements Runnable {

    @Resource   
    private BlockingQueue<String> dataQ;
   
    private int threadNumber;
   
   
    public void setThreadNumber(int threadNumber) {
        this.threadNumber = threadNumber;
    }


    @Override
    public void run() {
        while(!dataQ.isEmpty() || !JobRunner.isReadComplete){
           
            try {
                String data=dataQ.poll();
                if(data!=null){
                    System.out.println("Consumer:"+threadNumber+" Data consumed:"+data);
                }
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

    }

}
We need to have separate instances of producer/consumer while creating and submitting it to thread pool. So while specifying producer/consumer beans in context file these beans should have prototype scope. Thus it will create separate instance on each request.
 
    <bean id="producer" class="spring.app.Producer" scope="prototype" />
    <bean id="consumer" class="spring.app.Consumer" scope="prototype" />
  
So our final context file will be like as below.

context.xml
 <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
      http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.5.xsd
    ">

    <context:annotation-config />

    <bean id="propertyFile"
        class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">        <property name="location" value="classpath:config.properties" />    </bean>

    <bean id="producerCount" class="java.lang.Integer" autowire="byName">
        <constructor-arg value="${producer.thread.count}"></constructor-arg>
    </bean>

    <bean id="consumerCount" class="java.lang.Integer" autowire="byName">
        <constructor-arg value="${consumer.thread.count}"></constructor-arg>
    </bean>
    <bean id="producer" class="spring.app.Producer" scope="prototype" />
    <bean id="consumer" class="spring.app.Consumer" scope="prototype" />

    <bean id="dataQ" class="java.util.concurrent.LinkedBlockingQueue"
        autowire="byName">
        <constructor-arg type="int" value="1000"></constructor-arg>
    </bean>
    <bean id="fileQ" class="java.util.concurrent.LinkedBlockingQueue"
        autowire="byName">
        <constructor-arg type="int" value="1000"></constructor-arg>
    </bean>

    <bean id="resourceHelper" class="spring.app.common.ResourceHelper">
        <property name="inputFiles" value="file:D:\\CODE\\data\\incoming\\"></property>
    </bean> </beans>
  
 Final Project Structure