当前位置:首页 > 开发 > 编程语言 > 多线程 > 正文

ThreadPoolExecutor类学习

发表于: 2015-04-26   作者:cfyme   来源:转载   浏览次数:
摘要: ThreadPoolExecutor类学习     Java中的线程池技术主要用的是ThreadPoolExecutor 这个类。先来看这个类的构造函数,   ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,  

ThreadPoolExecutor类学习

 

 

Java中的线程池技术主要用的是ThreadPoolExecutor 这个类。先来看这个类的构造函数,

 

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,

 

BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 

 

    corePoolSize       线程池维护线程的最少数量

 

    maximumPoolSize    线程池维护线程的最大数量 

 

    keepAliveTime      线程池维护线程所允许的空闲时间  

 

    workQueue          任务队列,用来存放我们所定义的任务处理线程

 

    threadFactory      线程创建工厂

 

    handler            线程池对拒绝任务的处理策略

 

     ThreadPoolExecutor 将根据 corePoolSize和 maximumPoolSize 设置的边界自动调整池大小。当新任务在方法execute(Runnable) 中提交时, 如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程。 如果设置的corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。

 

     ThreadPoolExecutor是Executors类的实现,Executors类里面提供了一些静态工厂,生成一些常用的线程池,主要有以下几个:

 

     newSingleThreadExecutor:创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。  

 

     newFixedThreadPool:创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

 

     newCachedThreadPool:创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。

 

      在实际的项目中,我们会使用得到比较多的是newFixedThreadPool,创建固定大小的线程池,但是这个方法在真实的线上环境中还是会有很多问题,这个将会在下面一节中详细讲到。

 

      当任务源源不断的过来,而我们的系统又处理不过来的时候,我们要采取的策略是拒绝服务。RejectedExecutionHandler接口提供了拒绝任务处理的自定义方法的机会。在ThreadPoolExecutor中已经包含四种处理策略。

 

      1)CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。

 

          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

 

             if (!e.isShutdown()) {

 

                 r.run();

 

            }

 

        }

 

这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。

 

     2)AbortPolicy:处理程序遭到拒绝将抛出运行时 RejectedExecutionException

 

         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

 

              throw new RejectedExecutionException();

 

        }

 

 这种策略直接抛出异常,丢弃任务。

 

      3)DiscardPolicy:不能执行的任务将被删除

 

          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}

 

   这种策略和AbortPolicy几乎一样,也是丢弃任务,只不过他不抛出异常。

 

     4)DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,

 

则重复此过程)

 

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

 

            if (!e.isShutdown()) {

 

                e.getQueue().poll();

 

                e.execute(r);

 

            }

 

        }

 

      该策略就稍微复杂一些,在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。这个策略需要适当小心。

 

 

 

3.  ThreadPoolExecutor无界队列使用

 

public class ThreadPool {

	private final static String poolName = "mypool";
	static private ThreadPool threadFixedPool = new ThreadPool(2);
    private ExecutorService executor;
    static public ThreadPool getFixedInstance() {
	   return threadFixedPool;
   }

private ThreadPool(int num) {
	   executor = Executors.newFixedThreadPool(num, new DaemonThreadFactory(poolName));
}

public void execute(Runnable r) {
	   executor.execute(r);
}

public static void main(String[] params) {
	   class MyRunnable implements Runnable {
				public void run() {
						 System.out.println("OK!");

						 try {
								   Thread.sleep(10);

						 } catch (InterruptedException e) {
								   e.printStackTrace();
						 }
				}
	   }

	   for (int i = 0; i < 10; i++) {
		 ThreadPool.getFixedInstance().execute(new MyRunnable());

	   }

	   try {

				Thread.sleep(2000);
				System.out.println("Process end.");

	   } catch (InterruptedException e) {
				e.printStackTrace();
	   }

	}

}

 

 

       在这段代码中,我们发现我们用到了Executors.newFixedThreadPool()函数,这个函数的实现是这样子的:

 

return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); 

 

       它实际上是创建了一个无界队列的固定大小的线程池。执行这段代码,我们发现所有的任务都正常处理了。但是在真实的线上环境中会存在这样的一个问题,前端的用户请求源源不断的过来,后端的处理线程如果处理时间变长,无法快速的将用户请求处理完返回结果给前端,那么任务队列中将堵塞大量的请求。这些请求在前端都是有超时时间设置的,假设请求是通过套接字过来,当我们的后端处理进程处理完一个请求后,从队列中拿下一个任务,发现这个任务的套接字已经无效了,这是因为在用户端已经超时,将套接字建立的连接关闭了。这样一来我们这边的处理程序再去读取套接字时,就会发生I/0 Exception. 恶性循环,导致我们所有的处理服务线程读的都是超时的套接字,所有的请求过来都抛I/O异常,这样等于我们整个系统都挂掉了,已经无法对外提供正常的服务了。

 

     对于海量数据的处理,现在业界都是采用集群系统来进行处理,当请求的数量不断加大的时候,我们可以通过增加处理节点,反正现在硬件设备相对便宜。但是要保证系统的可靠性和稳定性,在程序方面我们还是可以进一步的优化的,我们下一节要讲述的就是针对线上出现的这类问题的一种处理策略。

 

 

 

4.   ThreadPoolExecutor有界队列使用

     

public class ThreadPool {

	 private final static String poolName = "mypool";

	 static private ThreadPool threadFixedPool = null;

	 public ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2);

	 private ExecutorService executor;


	 static public ThreadPool getFixedInstance() {
			   return threadFixedPool;

	 }

	 private ThreadPool(int num) {
			   executor = new ThreadPoolExecutor(2, 4,60,TimeUnit.SECONDS, queue,new DaemonThreadFactory(poolName), new ThreadPoolExecutor.AbortPolicy());

	 }

	 public void execute(Runnable r) {
			   executor.execute(r);

	 }
	

	 public static void main(String[] params) {

			   class MyRunnable implements Runnable {
						public void run() {
								 System.out.println("OK!");
								 try {
										  Thread.sleep(10);

								 } catch (InterruptedException e) {
										   e.printStackTrace();
								 }

						}

			   }

			   int count = 0;
			   for (int i = 0; i < 10; i++) {
						try {

								 ThreadPool.getFixedInstance().execute(new MyRunnable());

						} catch (RejectedExecutionException e) {
								 e.printStackTrace();
								 count++;
						}
			   }

			   try {
						log.info("queue size:" + ThreadPool.getFixedInstance().queue.size());
						Thread.sleep(2000);
			   } catch (InterruptedException e) {
						e.printStackTrace();

			   }

			   System.out.println("Reject task: " + count);
	 }

}

 

 

       首先我们来看下这段代码几个重要的参数,corePoolSize 为2,maximumPoolSize为4,任务队列大小为2,每个任务平均处理时间为10ms,一共有10个并发任务。

 

      执行这段代码,我们会发现,有4个任务失败了。这里就验证了我们在上面提到有界队列时候线程池的执行顺序。当新任务在方法 execute(Runnable) 中提交时, 如果运行的线程少于 corePoolSize,则创建新线程来处理请求。 如果运行的线程多于corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程,如果此时线程数量达到maximumPoolSize,并且队列已经满,就会拒绝继续进来的请求。

 

    现在我们调整一下代码中的几个参数,将并发任务数改为200,执行结果Reject task: 182,说明有18个任务成功了,线程处理完一个请求后会接着去处理下一个过来的请求。在真实的线上环境中,会源源不断的有新的请求过来,当前的被拒绝了,但只要线程池线程把当下的任务处理完之后还是可以处理下一个发送过来的请求。

 

     通过有界队列可以实现系统的过载保护,在高压的情况下,我们的系统处理能力不会变为0,还能正常对外进行服务,虽然有些服务可能会被拒绝,至于如何减少被拒绝的数量以及对拒绝的请求采取何种处理策略我将会在下一篇文章《系统的过载保护》中继续阐述。

ThreadPoolExecutor类学习

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

编辑推荐
http://www.iteye.com/topic/1118660 整个ThreadPoolExecutor的任务处理有4步操作: 第一步,初始的
参考文章: http://stackoverflow.com/questions/6290470/eclipse-debugger-always-blocks-on-thread
背景 前段时间一个项目中因为涉及大量的线程开发,把jdk cocurrent的代码重新再过了一遍。这篇文章
来源 :【java并发】juc Executor框架详解 Java线程池架构原理和源码解析(ThreadPoolExecutor) 1. E
先看一下新建一个ThreadPoolExecutor的构建参数: public ThreadPoolExecutor(int corePoolSize, in
大家先从ThreadPoolExecutor的整体流程入手: 针对ThreadPoolExecutor代码。我们来看下execute方法
前言:最近在做分布式海量数据处理项目,使用到了java的线程池,所以搜集了一些资料对它的使用做了
背景 前段时间一个项目中因为涉及大量的线程开发,把jdk cocurrent的代码重新再过了一遍。这篇文章
背景 前段时间一个项目中因为涉及大量的线程开发,把jdk cocurrent的代码重新再过了一遍。这篇文章
这篇文章分为两部分,前面是ThreadPoolExecutor的一些基本知识,后一部分则是Mina中一个特殊的Threa
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号