使用CompletionService批处理任务(线程池阻塞线程)

CompletionService ExecutorService BlockingQueueFuture 

如果你向Executor提交了一个批处理任务,并且希望在它们完成后获得结果。为此你可以保存与每个任务相关联的Future,然后不断地调用timeout为零的get,来检验Future是否完成。这样做固然可以,但却相当乏味。幸运的是,还有一个更好的方法:完成服务  (Completion service)。

CompletionService整合了Executor和BlockingQueue的功能。你可以将Callable任务提交给它去执行,然后使用类似于队列中的take和poll方法,在结果完整可用时获得这个结果,像一个打包的Future。ExecutorCompletionService是实现CompletionService接口的一个类,并将计算任务委托给一个Executor。

ExecutorCompletionService的实现相当直观。它在构造函数中创建一个BlockingQueue,用它去保持完成的结果。计算完成时会调用FutureTask中的done方法。当提交一个任务后,首先把这个任务包装为一个QueueingFuture,它是FutureTask的一个子类,然后覆写done方法,将结果置入BlockingQueue中,take和poll方法委托给了BlockingQueue,它会在结果不可用时阻塞。

 1 import java.util.Random;  
 2 import java.util.concurrent.BlockingQueue;  
 3 import java.util.concurrent.Callable;  
 4 import java.util.concurrent.CompletionService;  
 5 import java.util.concurrent.ExecutionException;  
 6 import java.util.concurrent.ExecutorCompletionService;  
 7 import java.util.concurrent.ExecutorService;  
 8 import java.util.concurrent.Executors;  
 9 import java.util.concurrent.Future;  
10 import java.util.concurrent.LinkedBlockingQueue;  
11   
12 public class Test17 {  
13     public static void main(String[] args) throws Exception {  
14         Test17 t = new Test17();  
15         t.count1();  
16         t.count2();  
17     }  
18     //使用阻塞容器保存每次Executor处理的结果,在后面进行统一处理  
19     public void count1() throws Exception{  
20         ExecutorService exec = Executors.newCachedThreadPool();  
21         BlockingQueue> queue = new LinkedBlockingQueue>();  
22         for(int i=0; i<10; i++){  
23             Future future =exec.submit(getTask());  
24             queue.add(future);  
25         }  
26         int sum = 0;  
27         int queueSize = queue.size();  
28         for(int i=0; i){  
29             sum += queue.take().get();  
30         }  
31         System.out.println("总数为:"+sum);  
32         exec.shutdown();  
33     }  
34     //使用CompletionService(完成服务)保持Executor处理的结果  
35     public void count2() throws InterruptedException, ExecutionException{  
36         ExecutorService exec = Executors.newCachedThreadPool();  
37         CompletionService execcomp = new ExecutorCompletionService(exec);  
38         for(int i=0; i<10; i++){  
39             execcomp.submit(getTask());  
40         }  
41         int sum = 0;  
42         for(int i=0; i<10; i++){  
43             //检索并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。  
44             Future future = execcomp.take();  
45             sum += future.get();  
46         }  
47         System.out.println("总数为:"+sum);  
48         exec.shutdown();  
49     }  
50     //得到一个任务  
51     public Callable getTask(){  
52         final Random rand = new Random();  
53         Callable task = new Callable(){  
54             @Override  
55             public Integer call() throws Exception {  
56                 int i = rand.nextInt(10);  
57                 int j = rand.nextInt(10);  
58                 int sum = i*j;  
59                 System.out.print(sum+"\t");  
60                 return sum;  
61             }  
62         };  
63         return task;  
64     }  
65     /** 
66      * 执行结果: 
67      *   6   6   14  40  40  0   4   7   0   0   总数为:106 
68      *  12  6   12  54  81  18  14  35  45  35  总数为:312 
69      */  
70 }  

ExecutorCompletionService统一了ExecutorService和BlockingQueue,既有线程池功能,能提交任务,又有阻塞队列功能,能判断所有线程的执行结果。

你可能感兴趣的