`
ispring
  • 浏览: 355613 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

线程池的实现

    博客分类:
  • Java
阅读更多
设计目标
     提供一个线程池的组件,具有良好的伸缩性,当线程够用时,销毁不用线程,当线程不够用时,自动增加线程数量;
     提供一个工作任务接口和工作队列,实际所需要的任务都必须实现这个工作任务接口,然后放入工作队列中;
     线程池中的线程从工作队列中,自动取得工作任务,执行任务。
主要控制类和功能接口设计

线程池管理器 ThreadPoolManager 的功能:
     管理线程池中的各个属性变量
     最大工作线程数
     最小工作线程数
     激活的工作线程总数
     睡眠的工作线程总数
     工作线程总数 (即:激活的工作线程总数+睡眠的工作线程总数)
    创建工作线程
     销毁工作线程
     启动处于睡眠的工作线程
     睡眠处于激活的工作线程
     缩任务:当工作线程总数小于或等于最小工作线程数时,销毁多余的睡眠的工作线程,使得现有工作线程总数等于最小工作任务总数
     伸任务:当任务队列任务总数大于工作线程数时,增加工作线程总数至最大工作线程数
     提供线程池启动接口
     提供线程池销毁接口

工作线程 WorkThread  的功能:
     从工作队列取得工作任务
     执行工作任务接口中的指定任务
工作任务接口 ITask   的功能:
     提供指定任务动作

工作队列 IWorkQueue  的功能:
     提供获取任务接口,并删除工作队列中的任务;
     提供加入任务接口;
     提供删除任务接口;
     提供取得任务总数接口;
     提供自动填任务接口;(当任务总数少于或等于默认总数的25%时,自动装填)
     提供删除所有任务接口;

[ ThreadPoolManager ]
package test.thread.pool1;
import java.util.ArrayList;
import java.util.List;
import test.thread.pool1.impl.MyWorkQueue;

/**
 * <p>Title: 线程池管理器</p>
 * <p>Description: </p>
 * @version 1.0
 */

public class ThreadPoolManager {
    /*最大线程数*/
    private int threads_max_num;

    /*最小线程数*/
    private int threads_min_num;
  
    /* 线程池线程增长步长 */
    private int threads_increase_step = 5;

    /* 任务工作队列 */
    private IWorkQueue queue;
  
    /* 线程池监视狗 */
    private PoolWatchDog poolWatchDog ;
  
    /* 队列线程 */
    private Thread queueThread ;
  
    /* 线程池 封装所有工作线程的数据结构 */
    private List pool = new ArrayList();
  
    /* 线程池中 封装所有钝化后的数据结构*/
    private List passivePool = new ArrayList();
  
    /* 空闲60秒 */
    private static final long IDLE_TIMEOUT = 60000L;
  
    /* 关闭连接池标志位 */
    private boolean close = false;
  
    /**
     * 线程池管理器
     * @param queue 任务队列
     * @param threads_min_num 工作线程最小数
     * @param threads_max_num 工作线程最大数
     */
    public ThreadPoolManager(int threads_max_num
                          ,int threads_min_num
                          ,IWorkQueue queue){
        this.threads_max_num = threads_max_num;
        this.threads_min_num = threads_min_num;
        this.queue = queue;    
    }

    /**
     * 线程池启动
     */
    public void startPool(){
        System.out.println("=== startPool..........");
        poolWatchDog = new PoolWatchDog("PoolWatchDog");
        poolWatchDog.setDaemon(true);
        poolWatchDog.start();
        System.out.println("=== startPool..........over");
  }

    /**
     * 线程池销毁接口
     */
    public void destoryPool(){
        System.out.println("==========================DestoryPool starting ...");
        this.close = true;
        int pool_size = this.pool.size();
    
        //中断队列线程
         System.out.println("===Interrupt queue thread ... ");
        queueThread.interrupt();
        queueThread = null;
    
        System.out.println("===Interrupt thread pool ... ");
        Thread pool_thread = null;
        for(int i=0; i<pool_size; i++){
            pool_thread = (Thread)pool.get(i);
            if(pool_thread !=null 
              && pool_thread.isAlive() 
              && !pool_thread.isInterrupted()){
                  pool_thread.interrupt();
                  System.out.println("Stop pool_thread:"
                          +pool_thread.getName()+"[interrupt] "
                          +pool_thread.isInterrupted());
            }
        }//end for
    
        if(pool != null){
            pool.clear();
        }
        if(passivePool != null){
            pool.clear();
        }
    
        try{
            System.out.println("=== poolWatchDog.join() starting ...");
            poolWatchDog.join();
            System.out.println("=== poolWatchDog.join() is over ...");
        }
        catch(Throwable ex){
            System.out.println("###poolWatchDog ... join method throw a exception ... " + ex.toString());
        }
    
        poolWatchDog =null;
            System.out.println("==============================DestoryPool is over ...");    
    }
  
  
    public static void main(String[] args) throws Exception{
        ThreadPoolManager threadPoolManager1 = new ThreadPoolManager(10,5,new MyWorkQueue(50,30000));
    
        threadPoolManager1.startPool();
        Thread.sleep(60000);
        threadPoolManager1.destoryPool();
    }



[ PoolWatchDog ]
  /**
   * 线程池监视狗
   */
  private class PoolWatchDog extends Thread{
    public PoolWatchDog(String name){
      super(name);
    }
  
    public void run(){
      Thread workThread = null;
      Runnable run = null;
      
      //开启任务队列线程,获取数据--------
      System.out.println("===QueueThread starting ... ... ");
      queueThread = new Thread(new QueueThread(),"QueueThread");
      queueThread.start();
      
      System.out.println("===Initial thread Pool ... ...");
      //初始化线程池的最小线程数,并放入池中
      for(int i=0; i<threads_min_num; i++){
        run = new WorkThread();
        workThread = new Thread(run,"WorkThread_"+System.currentTimeMillis()+i);
        workThread.start();
        if(i == threads_min_num -1){
          workThread = null;
          run = null;
        }
      }
      System.out.println("===Initial thread Pool..........over ,and get pool's size:"+pool.size());

      //线程池线程动态增加线程算法--------------
      while(!close){
      
        //等待5秒钟,等上述线程都启动----------
        synchronized(this){          
          try{
            System.out.println("===Wait the [last time] threads starting ....");
            this.wait(15000);
          }
          catch(Throwable ex){
            System.out.println("###PoolWatchDog invoking is failure ... "+ex);
          }
        }//end synchronized
          
        //开始增加线程-----------------------spread动作
        int queue_size = queue.getTaskSize();
        int temp_size = (queue_size - threads_min_num);
        
        if((temp_size > 0) && (temp_size/threads_increase_step > 2) ){
          System.out.println("================Spread thread pool starting ....");
          for(int i=0; i<threads_increase_step && (pool.size() < threads_max_num); i++){
            System.out.println("=== Spread thread num : "+i);
            run = new WorkThread();
            workThread = new Thread(run,"WorkThread_"+System.currentTimeMillis()+i);
            workThread.start();
          }//end for
          
          workThread = null;
          run = null;    
          System.out.println("===Spread thread pool is over .... and pool size:"+pool.size());
        }//end if
          
        //删除已经多余的睡眠线程-------------shrink动作
        int more_sleep_size = pool.size() - threads_min_num;//最多能删除的线程数
        int sleep_threads_size = passivePool.size();
        if(more_sleep_size >0 && sleep_threads_size >0){
          System.out.println("================Shrink thread pool starting ....");        
          for(int i=0; i < more_sleep_size && i < sleep_threads_size ; i++){
            System.out.println("=== Shrink thread num : "+i);
            Thread removeThread = (Thread)passivePool.get(0);
            if(removeThread != null && removeThread.isAlive() && !removeThread.isInterrupted()){
              removeThread.interrupt();
            }
          }
          System.out.println("===Shrink thread pool is over .... and pool size:"+pool.size());          
        }

        System.out.println("===End one return [shrink - spread operator] ....");    
      }//end while
    }//end run 
  }//end private class



[ WorkThread ]
  /**
   * 工作线程
   */
  class WorkThread implements Runnable{
  
    public WorkThread(){
    }
  
    public void run(){
      String name = Thread.currentThread().getName();
      System.out.println("===Thread.currentThread():"+name);
      pool.add(Thread.currentThread());    
    
      while(true){
      
        //获取任务---------
        ITask task = null;
        try{
          System.out.println("===Get task from queue is starting ... ");
          //看线程是否被中断,如果被中断停止执行任务----
          if(Thread.currentThread().isInterrupted()){
            System.out.println("===Breaking current thread and jump whlie [1] ... ");
            break;
          }
          task = queue.getTask();
        }
        catch(Throwable ex){
          System.out.println("###No task in queue:"+ex);
        }//end tryc
        
        if(task != null){
          //执行任务---------
          try{
            System.out.println("===Execute the task is starting ... ");
            //看线程是否被中断,如果被中断停止执行任务----
            if(Thread.currentThread().isInterrupted()){
              System.out.println("===Breaking current thread and jump whlie [1] ... ");
              break;
            }     
            task.executeTask();
            //任务执行完毕-------
            System.out.println("===Execute the task is over ... ");
          }
          catch(Throwable ex){
            System.out.println("###Execute the task is failure ... "+ex);
          }//end tryc
          
        }else{
          //没有任务,则钝化线程至规定时间--------
          synchronized(this){
            try{
              System.out.println("===Passivate into passivePool ... ");
              
              //看线程是否被中断,如果被中断停止执行任务----
              boolean isInterrupted = Thread.currentThread().isInterrupted();
              if(isInterrupted){
                System.out.println("===Breaking current thread and jump whlie [1] ... ");
                break;
              }
//              passivePool.add(this);
            passivePool.add(Thread.currentThread());

              
              //准备睡眠线程-------
              isInterrupted = Thread.currentThread().isInterrupted();
              if(isInterrupted){
                System.out.println("===Breaking current thread and jump whlie [2] ... ");
                break;
              }              
              this.wait(IDLE_TIMEOUT);
            }
            catch(Throwable ex1){
              System.out.println("###Current Thread passivate is failure ... break while cycle. "+ex1);
              break;
            }
          }          
        }        
      }//end while--------
      
      if(pool.contains(passivePool)){
        pool.remove(this);
      }
      if(passivePool.contains(passivePool)){
        passivePool.remove(this);
      }
      System.out.println("===The thread execute over ... "); 
    }//end run----------
  }



[ QueueThread ]
class QueueThread implements Runnable{
  
    public QueueThread(){
    }
  
    public void run(){
      while(true){
        //自动装在任务--------
        queue.autoAddTask();
        System.out.println("===The size of queue's task is "+queue.getTaskSize());
      
        synchronized(this){
          if(Thread.currentThread().isInterrupted()){
            break;
          }else{
              try{
                this.wait(queue.getLoadDataPollingTime());
              }
              catch(Throwable ex){
                System.out.println("===QueueThread invoked wait is failure ... break while cycle."+ex);
                break;
              }
          }//end if
        }//end synchr
        
      }//end while
    }//end run
  } 
}


[ WorkQueue ]
package test.thread.pool1;

import java.util.LinkedList;
import test.thread.pool1.impl.MyTask;

/**
 * <p>Title: 工作队列对象 </p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2005</p>
 * <p>Company: </p>
 * @author not attributable
 * @version 1.0
 */

public abstract class WorkQueue implements IWorkQueue{
  /* 预计装载量 */
  private int load_size;
  
  /* 数据装载轮循时间 */
  private long load_polling_time;
  
  /* 队列 */
  private LinkedList queue = new LinkedList();
  
  /**
   * 
   * @param load_size 预计装载量
   * @param load_polling_time 数据装载轮循时间
   */
  public WorkQueue(int load_size,long load_polling_time){
    this.load_size = (load_size <= 10) ? 10 : load_size;
    this.load_polling_time = load_polling_time;
  }

  /* 数据装载轮循时间 */
  public long getLoadDataPollingTime(){
    return this.load_polling_time;
  }


  /*获取任务,并删除队列中的任务*/
  public synchronized ITask getTask(){
    ITask task = (ITask)queue.getFirst();
    queue.removeFirst();
    return task;
  }

  /*加入任务*/
  public void  addTask(ITask task){
    queue.addLast(task);
  }

  /*删除任务*/
  public synchronized void removeTask(ITask task){
    queue.remove(task);
  }

  /*任务总数*/
  public synchronized int getTaskSize(){
    return queue.size();
  }

  /*自动装填任务*/
  public synchronized void autoAddTask(){
  
    synchronized(this){
      float load_size_auto = load_size - getTaskSize() / load_size;
      System.out.println("===load_size_auto:"+load_size_auto);
      
      if(load_size_auto > 0.25){        
        autoAddTask0();
      }
      else {
        System.out.println("=== Not must load new work queue ... Now! ");
      }    
    }
  }

  /*删除所有任务*/
  public synchronized void clearAllTask(){
    queue.clear();
  }
  
  /**
   * 程序员自己实现该方法
   */
  protected abstract void autoAddTask0();
}



[ MyWorkQueue ]
package test.thread.pool1.impl;

import java.util.LinkedList;
import test.thread.pool1.WorkQueue;

/**
 * <p>Title: 例子工作队列对象 </p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2005</p>
 * <p>Company: </p>
 * @author not attributable
 * @version 1.0
 */

public class MyWorkQueue extends WorkQueue{

  /**
   * @param load_size 预计装载量
   * @param load_polling_time 数据装载轮循时间
   */
  public MyWorkQueue(int load_size,long load_polling_time){
    super(load_size,load_polling_time);
  }

  /**
   * 自动加载任务
   */
  protected synchronized void autoAddTask0(){
    //-------------------
    System.out.println("===MyWorkQueue ...  invoked autoAddTask0() method ...");
    for(int i=0; i<10; i++){
      System.out.println("===add task :"+i);
      this.addTask(new MyTask());
    }    
    //-------------------
  }
}




[ MyTask ]
package test.thread.pool1.impl;
import test.thread.pool1.ITask;

/**
 * <p>Title: 工作任务接口 </p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2005</p>
 * <p>Company: </p>
 * @author not attributable
 * @version 1.0
 */

public class MyTask implements ITask {

  /**
   * 执行的任务
   * @throws java.lang.Throwable
   */
  public void executeTask() throws Throwable{
    System.out.println("["+this.hashCode()+"] MyTask ... invoked executeTask() method ... ");
  }
}

分享到:
评论
1 楼 hpjianhua 2010-10-08  
学习下...

相关推荐

Global site tag (gtag.js) - Google Analytics