当前位置:首页 > 开发 > 编程语言 > 多线程 > 正文

ThreadPool定时重试

发表于: 2015-04-22   作者:dai_lm   来源:转载   浏览:
摘要: 项目需要当某事件触发时,执行http请求任务,失败时需要有重试机制,并根据失败次数的增加,重试间隔也相应增加,任务可能并发。 由于是耗时任务,首先考虑的就是用线程来实现,并且为了节约资源,因而选择线程池。 为了解决不定间隔的重试,选择Timer和TimerTask来完成 package threadpool; public class ThreadPoolTest {
项目需要当某事件触发时,执行http请求任务,失败时需要有重试机制,并根据失败次数的增加,重试间隔也相应增加,任务可能并发。
由于是耗时任务,首先考虑的就是用线程来实现,并且为了节约资源,因而选择线程池。
为了解决不定间隔的重试,选择Timer和TimerTask来完成

package threadpool;

public class ThreadPoolTest {

	/**
	 * @param args
	 */
	public static void main(String[] args) {

		System.out.println("start");
		
		ThreadPoolManager poolManager = new ThreadPoolManager(3);
		poolManager.start();

		MyTaskList list = new MyTaskList(poolManager);
		
		new MyTask(list, "A").start();
		new MyTask(list, "B").start();
		new MyTask(list, "C").start();
		new MyTask(list, "D").start();
		new MyTask(list, "E").start();
		new MyTask(list, "F").start();
		new MyTask(list, "G").start();

		try {
			Thread.sleep(30000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		poolManager.stop();
		
		System.out.println("stop");
		
	}

}

package threadpool;

import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolManager {

	/** 线程池的大小 */
	private int poolSize;
	private static final int MIN_POOL_SIZE = 1;
	private static final int MAX_POOL_SIZE = 10;
	/** 线程池 */
	private ExecutorService threadPool;
	/** 请求队列 */
	private LinkedList<ThreadPoolTask> asyncTasks;
	/** 轮询线程 */
	private Thread poolThread;
	/** 轮询时间 */
	private static final int SLEEP_TIME = 200;

	public ThreadPoolManager(int poolSize) {
		if (poolSize < MIN_POOL_SIZE)
			poolSize = MIN_POOL_SIZE;
		if (poolSize > MAX_POOL_SIZE)
			poolSize = MAX_POOL_SIZE;
		this.poolSize = poolSize;
		threadPool = Executors.newFixedThreadPool(this.poolSize);
		asyncTasks = new LinkedList<ThreadPoolTask>();
	}

	/**
	 * 向任务队列中添加任务
	 * 
	 * @param task
	 */
	public void addAsyncTask(ThreadPoolTask task) {
		synchronized (asyncTasks) {
			// Log.i(TAG, "add task: " + task.getURL());
			asyncTasks.addLast(task);
		}
	}

	/**
	 * 从任务队列中提取任务
	 * 
	 * @return
	 */
	private ThreadPoolTask getAsyncTask() {
		synchronized (asyncTasks) {
			if (asyncTasks.size() > 0) {
				ThreadPoolTask task = asyncTasks.removeFirst();
				// Log.i(TAG, "remove task: " + task.getURL());
				return task;
			}
		}
		return null;
	}

	/**
	 * 开启线程池轮询
	 * 
	 * @return
	 */
	public void start() {
		if (poolThread == null) {
			poolThread = new Thread(new PoolRunnable());
			poolThread.start();
		}
	}

	/**
	 * 结束轮询,关闭线程池
	 */
	public void stop() {
		poolThread.interrupt();
		poolThread = null;
	}

	/**
	 * 实现轮询的Runnable
	 * 
	 * @author carrey
	 * 
	 */
	private class PoolRunnable implements Runnable {
		@Override
		public void run() {
			// Log.i(TAG, "开始轮询");
			try {
				while (!Thread.currentThread().isInterrupted()) {
					ThreadPoolTask task = getAsyncTask();
					if (task == null) {
						try {
							Thread.sleep(SLEEP_TIME);
						} catch (InterruptedException e) {
							Thread.currentThread().interrupt();
						}
						continue;
					}
					threadPool.execute(task);
				}
			} finally {
				threadPool.shutdown();
			}
			// Log.i(TAG, "结束轮询");
		}
	}
}

package threadpool;


public class ThreadPoolTask implements Runnable {

	private String tag;
	
	private Callback callback;
	
	public ThreadPoolTask(String tag, Callback callback) {
		this.tag = tag;
		this.callback = callback;
	}

	@Override
	public void run() {
		System.out.println(tag + " is running on " + Thread.currentThread());
		
		try {
			// 模拟耗时任务
			Thread.sleep(700);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		if (callback != null)
			callback.onRetry();
	}

	public interface Callback {
		public void onRetry();
	}

}

package threadpool;

import java.lang.reflect.Field;
import java.util.Timer;
import java.util.TimerTask;

public class MyTaskList {

	private ThreadPoolManager poolManager;

	private Timer timer;

	public MyTaskList(ThreadPoolManager poolManager) {
		this.poolManager = poolManager;

		timer = new Timer();
	}

	public void addTask(ThreadPoolTask task) {
		if (task != null)
			poolManager.addAsyncTask(task);
	}

	public void addTask(TimerTask task, long delay) {
		// 重置TimerTask,不然会发生Exception
		try {
			Class<?> clazz = TimerTask.class;
			Field field = clazz.getDeclaredField("state");
			field.setAccessible(true);
			field.set(task, 0);
		} catch (Exception e) {
		}

		timer.schedule(task, delay);
	}

}

package threadpool;

import java.util.TimerTask;

import threadpool.ThreadPoolTask.Callback;

public class MyTask implements Callback {

	private MyTaskList list;
	private ThreadPoolTask task;

	private String tag;

	private int retry = 0;

	public MyTask(MyTaskList list, String tag) {
		this.list = list;
		this.tag = tag;
	}

	public void start() {
		task = new ThreadPoolTask(tag, this);
		start(0);
	}

	private void start(int retry) {
		// 最多重试3次
		if (retry >= 3) {
			System.out.println(tag + " finished " + Thread.currentThread());
			return;
		}

		doSomething();

		this.retry = retry;

		list.addTask(task);
	}

	@Override
	public void onRetry() {
		// 重试间隔
		list.addTask(timertask, 1000);
	}

	private TimerTask timertask = new TimerTask() {

		@Override
		public void run() {
			start(retry + 1);
		}

	};

	private void doSomething() {
		System.out.println("Retry[" + retry + "] " + tag + " on "
				+ Thread.currentThread());
	}
}

ThreadPool定时重试

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

编辑推荐
首先,看看线程池的样子: 从上图看出,线程池维护1个至n个线程,操作系统从请求队列中提取请求分配
Poco::ThreadPool提供线程池功能,减少线程的创建和销毁所带来的开销,适合在服务器上应用。创建线
首先,看看线程池的样子: 从上图看出,线程池维护1个至n个线程,操作系统从请求队列中提取请求分配
简介 在前面的一篇文章里我对java threadpool的几种基本应用方法做了个总结。Java的线程池针对不同
1 什么是线程池? 2 class CJobImpl { public: virtual void DoJob()=0; }; class CJob : public CJ
keepAliveTime只针对大于corePoolSize且小于maximumPoolSize的空闲线程,比如corePoolSize=10,maxPo
相关概念: 线程池可以看做容纳线程的容器; 一个应用程序最多只能有一个线程池; ThreadPool静态类
相关概念: 线程池可以看做容纳线程的容器; 一个应用程序最多只能有一个线程池; ThreadPool静态类
摘要: 系列文章,从一个基本的代码说起,逐步探索 ThreadPool 的奥妙。 首先,看看线程池的样子:
  有这样的场景,淘宝开放平台上有销售订单API,销售订单金额API,商品上下架API,退款API等各种
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号