My Custom Thread Pool Executor in Java

ThreadPoolExecutor is a feature added by java concurrent api to maintain and reuse threads efficiently , so that our programs don’t have to worry about creating and destroying threads and focus on the core functionality. I have created a custom thread pool executor to get better understanding of how thread pool executor would work .

Functionality :

  • It maintains a fixed thread pool ,and creates threads and start the threads even if no task is submitted whereas ThreadPoolExecutor creates threads on demand , i.e. whenever a runnable is submitted to pool and the number of threads are less than core pool size .
  • In ThreadPoolExecutor, we provide a waiting queue ,where new runnable task waits when all threads are busy running existing task. Once the queue is filled , new threads will be created up to maximum pool size. In MyThreadPool , i am storing the runnable in a linked list , so every task will wait in the list and it is unbounded , so no usage of maxPoolSize in this .
  • In ThreadPoolExecutor , we use Future Objects to get the result from task , future.get() method will block if the result is not available , or we use CompletionService . In MyThreadPoolExecutor , i have created a simple interface called ResultListener , user has to provide a implementation of this as to how he wants the output to be processed . After every task is completed , the ResultListener will get callback with the output of task or error method will be called in case of any exception.
  • When shutdown method is called , MyThreadPoolExecutor will stop accepting new tasks and complete the remaining tasks .
  • I have provided very basic functionality as compared to ThreadPoolExecutor , i have used simple thread mechanism like wait() , notify() , notifyAll(), and join().
  • Performance wise it is similar to ThreadPoolExecutor , some times better in some cases. Do let me know if you find any interesting results or ways to improve it .
package com.util;import java.util.concurrent.Callable;/** * Run submitted task of {@link MyThreadPool} After running the task , It calls * on {@link ResultListener}object with {@link Output}which contains returned * result of {@link Callable}task. Waits if the pool is empty. *  * @author abhishek *  * @param  */import java.util.concurrent.Callable;/*** Run submitted task of {@link MyThreadPool} After running the task , It calls* on {@link ResultListener}object with {@link Output}which contains returned* result of {@link Callable}task. Waits if the pool is empty.** @author abhishek** @param <V>*/public class MyThread<V> extends Thread {    /**    * MyThreadPool object, from which the task to be run    */    private MyThreadPool<V> pool;    private boolean active = true;    public boolean isActive() {        return active;    }    public void setPool(MyThreadPool<V> p) {        pool = p;    }    /**    * Checks if there are any unfinished tasks left. if there are , then runs    * the task and call back with output on resultListner Waits if there are no    * tasks available to run If shutDown is called on MyThreadPool, all waiting    * threads will exit and all running threads will exit after finishing the    * task    */    public void run() {        ResultListener<V> result = pool.getResultListener();        Callable<V> task;        while (true)        {            task = pool.removeFromQueue();            if (task != null)            {                try                {                    V output = task.call();                    result.finish(output);                } catch (Exception e)                {                    result.error(e);                }            } else            {                if (!isActive())                break;                else                {                    synchronized (pool.getWaitLock())                    {                        try                        {                            pool.getWaitLock().wait();                        } catch (InterruptedException e)                        {                            // TODO Auto-generated catch block                            e.printStackTrace();                        }                    }                }            }        }    }    void shutdown() {        active = false;    }}
package com.util;import java.util.LinkedList;import java.util.concurrent.Callable;/*** This class is used to execute submitted {@link Callable} tasks. this class* creates and manages fixed number of threads User will provide a* {@link ResultListener}object in order to get the Result of submitted task** @author abhishek***/public class MyThreadPool<V> {    private Object waitLock = new Object();    public Object getWaitLock() {        return waitLock;    }    /**    * list of threads for completing submitted tasks    */    private final LinkedList<MyThread<V>> threads;    /**    * submitted task will be kept in this list untill they run by one of    * threads in pool    */    private final LinkedList<Callable<V>> tasks;    /**    * shutDown flag to shut Down service    */    private volatile boolean shutDown;    /**    * ResultListener to get back the result of submitted tasks    */    private ResultListener<V> resultListener;    /**    * initializes the threadPool by starting the threads threads will wait till    * tasks are not submitted    *    * @param size    * Number of threads to be created and maintained in pool    * @param myResultListener    * ResultListener to get back result    */    public MyThreadPool(int size, ResultListener<V> myResultListener) {        tasks = new LinkedList<Callable<V>>();        threads = new LinkedList<MyThread<V>>();        shutDown = false;        resultListener = myResultListener;        for (int i = 0; i < size; i++) {            MyThread<V> myThread = new MyThread<V>();            myThread.setPool(this);            threads.add(myThread);            myThread.start();        }    }    public ResultListener<V> getResultListener() {        return resultListener;    }    public void setResultListener(ResultListener<V> resultListener) {        this.resultListener = resultListener;    }    public boolean isShutDown() {        return shutDown;    }    public int getThreadPoolSize() {        return threads.size();    }    public synchronized Callable<V> removeFromQueue() {        return tasks.poll();    }    public synchronized void addToTasks(Callable<V> callable) {        tasks.add(callable);    }    /**    * submits the task to threadPool. will not accept any new task if shutDown    * is called Adds the task to the list and notify any waiting threads    *    * @param callable    */    public void submit(Callable<V> callable) {        if (!shutDown) {            addToTasks(callable);            synchronized (this.waitLock) {                waitLock.notify();            }            } else {            System.out.println('task is rejected.. Pool shutDown executed');        }    }    /**    * Initiates a shutdown in which previously submitted tasks are executed,    * but no new tasks will be accepted. Waits if there are unfinished tasks    * remaining    *    */    public void stop() {        for (MyThread<V> mythread : threads) {            mythread.shutdown();        }        synchronized (this.waitLock) {            waitLock.notifyAll();        }        for (MyThread<V> mythread : threads) {            try {                mythread.join();                } catch (InterruptedException e) {                // TODO Auto-generated catch block                e.printStackTrace();            }        }    }}
package com.util;/** * This interface imposes finish method  * which is used to get the {@link Output} object  * of finished task * @author abhishek * * @param  */public interface ResultListener                         { public void finish(T obj); public void error(Exception ex);}

you can implement this class as you want to get back and process the result returned by tasks.

package com.util;public class DefaultResultListener implements ResultListener{ @Override public void finish(Object obj) { } @Override public void error(Exception ex) {  ex.printStackTrace(); }}

For example this class will add the number returned by tasks .

package com.util;import java.util.concurrent.atomic.AtomicInteger;/** * ResultListener class to keep track of total matched count * @author abhishek *  * @param  */public class MatchedCountResultListener                        implements ResultListener                         {	/**	 * matchedCount to keep track of the number of matches returned by submitted	 * task	 */	AtomicInteger matchedCount = new AtomicInteger();	/**	 * this method is called by ThreadPool to give back the result of callable	 * task. if the task completed successfully then increment the matchedCount by	 * result count	 */	@Override	public void finish(V obj) {		//System.out.println('count is '+obj);		matchedCount.addAndGet((Integer)obj);	}	/**	 * print exception thrown in running the task	 */	@Override	public void error(Exception ex) {		ex.printStackTrace();	}	/**	 * returns the final matched count of all the finished tasks	 * 	 * @return	 */	public int getFinalCount() {		return matchedCount.get();	}}

This is a test class which runs simple for loop using CompletionService and MyThreadPoolExecutor

package test;import java.util.concurrent.Callable;import java.util.concurrent.CompletionService;import java.util.concurrent.ExecutorCompletionService;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import com.util.DefaultResultListener;import com.util.MyThreadPool;public class TestClass {	public static void main(String[] args) throws InterruptedException {		CompletionService                       threadService;		ExecutorService service = Executors.newFixedThreadPool(2);		threadService = new ExecutorCompletionService                       (service);		long b = System.currentTimeMillis();		for(int i =0;i<50000;i++){			threadService.submit(new MyRunable (i));		}		service.shutdown();		System.out.println('time taken by Completion Service ' + (System.currentTimeMillis()-b));		DefaultResultListener result = new DefaultResultListener();		MyThreadPool                         newPool = new MyThreadPool                         (2,result);		long a = System.currentTimeMillis();		int cc =0;		for(int i =0;i<50000;i++)		{			cc = cc+i;		}		System.out.println('time taken without any pool ' + (System.currentTimeMillis()-a));		a= System.currentTimeMillis();		for(int i =0;i<5000;i++){			newPool.submit(new MyRunable (i));		}		newPool.stop();		System.out.println('time taken by myThreadPool ' + (System.currentTimeMillis()-a));	}}class MyRunable implements Callable{	int index = -1;	public MyRunable(int index)	{		this.index = index;	}	@Override	public Integer call() throws Exception {		return index;	}}

Reference: My Custom Thread Pool Executor in Java from our JCG partner Abhishek Somani at the Java , J2EE , Server blog.




Source : http://www.javacodegeeks.com/2013/03/my-custom-thread-pool-executor-in-java.html

Back to Top