当前位置:首页 > 开发 > 编程语言 > 多线程 > 正文

ThreadPoolExecutor 中饱和策略分析

发表于: 2014-07-03   作者:bo_hai   来源:转载   浏览次数:
摘要: import java.util.concurrent.TimeUnit; public class ThreadPoolTask implements Runnable { private final Object threadPoolTaskData; private static long consumerTaskSleepTime = 2L; public Th
import java.util.concurrent.TimeUnit;

public class ThreadPoolTask implements Runnable {

	private final Object threadPoolTaskData;
	private static long consumerTaskSleepTime = 2L;
	
	public ThreadPoolTask(Object tasks) {
		this.threadPoolTaskData = tasks;
	}
	
	@Override
	public void run() {
		System.out.println("start :" + threadPoolTaskData);
		try {
			TimeUnit.SECONDS.sleep(consumerTaskSleepTime);
		} catch (Exception e) {
			e.printStackTrace();
		}
		System.out.println("finish " + threadPoolTaskData);   
	}
}

 Abort(中止)策略:该策略会抛出未检查的RejectedExecutionException。调用者可以捕获这个异常,然后根据需求编写自己的处理代码。代码如下:

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPool {

	private static int executePrograms = 0;
	private static int produceTaskMaxNumber = 10;
	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 3, 
				TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),new ThreadPoolExecutor.AbortPolicy());
		for(int i = 0 ; i < produceTaskMaxNumber; i ++) {
			try {
				String task = "task@ " + i;
				System.out.println("put " + task);
				threadPoolExecutor.execute(new ThreadPoolTask(task));
				TimeUnit.SECONDS.sleep(executePrograms);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
}

 程序的执行结果如下:

put task@ 0
put task@ 1
start :task@ 0
put task@ 2
put task@ 3
put task@ 4
put task@ 5
start :task@ 1
put task@ 6
start :task@ 5
put task@ 7
start :task@ 6
put task@ 8
java.util.concurrent.RejectedExecutionException
put task@ 9
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1774)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:768)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:656)
	at com.bohai.thread.pool.ThreadPool.main(ThreadPool.java:22)
java.util.concurrent.RejectedExecutionException
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1774)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:768)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:656)
	at com.bohai.thread.pool.ThreadPool.main(ThreadPool.java:22)
java.util.concurrent.RejectedExecutionException
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1774)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:768)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:656)
	at com.bohai.thread.pool.ThreadPool.main(ThreadPool.java:22)
finish task@ 0
start :task@ 2
finish task@ 1
start :task@ 3
finish task@ 5
start :task@ 4
finish task@ 6
finish task@ 2
finish task@ 3
finish task@ 4

 分析运行结果:

corepoolsize = 2 ,maxpoolsize = 4,queue size = 3。运行中的线程数+队列中等待的线程数 = 7,及提交的线程数大于7时,会抛出异常。

 

DiscardPolicy 策略会悄悄抛弃新提交的任务:代码如下:

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPool {

	private static int executePrograms = 0;
	private static int produceTaskMaxNumber = 10;
	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 3, 
				TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),new ThreadPoolExecutor.DiscardPolicy());
		for(int i = 0 ; i < produceTaskMaxNumber; i ++) {
			try {
				String task = "task@ " + i;
				System.out.println("put " + task);
				threadPoolExecutor.execute(new ThreadPoolTask(task));
				TimeUnit.SECONDS.sleep(executePrograms);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
}

 执行结果如下:

put task@ 0
put task@ 1
start :task@ 0
put task@ 2
put task@ 3
put task@ 4
put task@ 5
start :task@ 1
put task@ 6
put task@ 7
put task@ 8
put task@ 9
start :task@ 6
start :task@ 5
finish task@ 0
finish task@ 6
start :task@ 2
finish task@ 1
start :task@ 3
start :task@ 4
finish task@ 5
finish task@ 2
finish task@ 3
finish task@ 4

 分析结果:

 提交的任务数有10个,开始和结束的任务数7个。抛弃了新提交的任务7、8、9。

 

DiscardOldestPolicy 抛弃旧的线程。代码如下:

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPool {

	private static int executePrograms = 0;
	private static int produceTaskMaxNumber = 10;
	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 3, 
				TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),new ThreadPoolExecutor.DiscardOldestPolicy());
		for(int i = 0 ; i < produceTaskMaxNumber; i ++) {
			try {
				String task = "task@ " + i;
				System.out.println("put " + task);
				threadPoolExecutor.execute(new ThreadPoolTask(task));
				TimeUnit.SECONDS.sleep(executePrograms);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
}

 执行结果如下:

put task@ 0
put task@ 1
put task@ 2
put task@ 3
put task@ 4
put task@ 5
put task@ 6
put task@ 7
put task@ 8
put task@ 9
start :task@ 0
start :task@ 5
start :task@ 1
start :task@ 6
finish task@ 0
start :task@ 7
finish task@ 5
start :task@ 8
finish task@ 1
start :task@ 9
finish task@ 6
finish task@ 7
finish task@ 8
finish task@ 9

 对结果进行分析:

线程池抛弃了2、3、4,线程0、1没有进入队列,进行开始执行。所以不会被抛出。

 

CallerRunsPolicy 策略实现一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。它不会在线程池的某个线程中执行新提交的任务,而是在一个调用了execute的线程中执行该任务。代码如下:

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPool {

	private static int executePrograms = 0;
	private static int produceTaskMaxNumber = 10;
	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 3, 
				TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),new ThreadPoolExecutor.CallerRunsPolicy());
		for(int i = 0 ; i < produceTaskMaxNumber; i ++) {
			try {
				String task = "task@ " + i;
				System.out.println("put " + task);
				threadPoolExecutor.execute(new ThreadPoolTask(task));
				TimeUnit.SECONDS.sleep(executePrograms);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
}

 运行结果如下:

put task@ 0
put task@ 1
put task@ 2
put task@ 3
put task@ 4
put task@ 5
start :task@ 1
put task@ 6
put task@ 7
start :task@ 7
start :task@ 6
start :task@ 0
start :task@ 5
finish task@ 7
put task@ 8
finish task@ 1
start :task@ 2
finish task@ 6
finish task@ 0
finish task@ 5
put task@ 9
start :task@ 3
start :task@ 8
start :task@ 4
finish task@ 2
start :task@ 9
finish task@ 3
finish task@ 8
finish task@ 4
finish task@ 9

 对结果进行分析:

所有的线程都执行了。

ThreadPoolExecutor 中饱和策略分析

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

编辑推荐
分析完AbstractExecutorService异步任务提交之后,一直留着一个问题:就是任务提交之后的最终执行方
java并发之ThreadPoolExecutor分析 ThreadPoolExecutor线程池是我们平时使用最多的线程池处理工具,
既然最终任务都是由execute(Runnable)方法执行,就直接来看该方法实现的所在类。 首当其冲肯定是类T
参考文章: http://stackoverflow.com/questions/6290470/eclipse-debugger-always-blocks-on-thread
一、序言 关于“池”的概念,我的理解是它是为了让我们更快的获得资源,节省时间,在我所知的所有池
背景 前段时间一个项目中因为涉及大量的线程开发,把jdk cocurrent的代码重新再过了一遍。这篇文章
来源 :【java并发】juc Executor框架详解 Java线程池架构原理和源码解析(ThreadPoolExecutor) 1. E
http://www.iteye.com/topic/1118660 整个ThreadPoolExecutor的任务处理有4步操作: 第一步,初始的
先看一下新建一个ThreadPoolExecutor的构建参数: public ThreadPoolExecutor(int corePoolSize, in
<iframe align="top" marginwidth="0" marginheight="0" src="http://www.zealware.com/csdnblog0
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号