CompletionService详解
CompletionService的作用和具体用法?
Java中的CompletionService
是一个非常有用的接口,它将Executor和BlockingQueue的功能组合在一起。你可以将Callable任务提交给它去执行,然后使用类似于队列的操作,如take或poll,来获取已完成的结果。这是一个生产者-消费者的设计,其中Callable任务是生产者,产生的结果被放入队列,而用户通过take或poll操作作为消费者取出并消费这些结果。
下面是一个简单的使用CompletionService
的示例:
javaCopy code// 创建一个线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 使用ExecutorCompletionService,它是CompletionService接口的实现类
CompletionService<String> service = new ExecutorCompletionService<>(executor);
// 提交任务
for (int i = 0; i < 5; i++) {
int finalI = i;
service.submit(() -> {
Thread.sleep(1000 * finalI);
return "Result of Task " + finalI;
});
}
// 检索并输出结果
for (int i = 0; i < 5; i++) {
try {
System.out.println(service.take().get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
// 关闭线程池
executor.shutdown();
在这个示例中,我们提交了5个任务,每个任务会休眠一段时间然后返回一个字符串。由于CompletionService
内部使用了阻塞队列,我们可以调用take
方法按照任务完成的顺序(而非提交的顺序)获取结果。注意,take
方法会阻塞,如果你想要非阻塞的操作,可以使用poll
方法。
最后,别忘了在完成任务后关闭线程池。
CompletionService
的主要用途是处理大量独立的任务,这些任务可能会在不同的时间完成,并且你希望立即处理这些已完成的任务。
使用poll的例子,如果长时间没有返回怎么处理?
使用poll
方法可以非阻塞地获取CompletionService
的结果。它会立即返回一个Future
,如果没有任务完成,则返回null
。
如果你想要长时间没有返回结果时进行特定处理,可以考虑设置一个超时。例如,你可以循环地调用poll
,每次调用后检查是否返回了结果,如果没有结果,则等待一段时间再继续。如果超过了你设定的总超时时间,你可以抛出一个异常或执行其他操作。
下面是一个例子:
javaCopy codeExecutorService executor = Executors.newFixedThreadPool(3);
CompletionService<String> service = new ExecutorCompletionService<>(executor);
for (int i = 0; i < 5; i++) {
int finalI = i;
service.submit(() -> {
Thread.sleep(1000 * finalI);
return "Result of Task " + finalI;
});
}
long timeout = 10000; // 10 seconds
long start = System.currentTimeMillis();
for (int i = 0; i < 5; i++) {
while (true) {
Future<String> future = service.poll();
if (future != null) {
try {
System.out.println(future.get());
break;
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
} else {
// 检查是否超时
if (System.currentTimeMillis() - start > timeout) {
throw new RuntimeException("Timeout after " + timeout + " milliseconds");
}
// 等待一段时间再继续
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
executor.shutdown();
这个例子中,我们在每次调用poll
并未获得结果后,等待1秒再继续。如果总时间超过了10秒,我们将抛出一个异常。
注意,这个例子使用了一个固定的等待时间(1秒),并且会在每次poll
返回null
时等待。在实践中,你可能想要使用更复杂的逻辑,例如逐步增加等待时间,或者只在连续多次poll
返回null
后等待。
一下是另一篇文章,介绍基本用法的:
原文:(48条消息) CompletionService详解_张孟浩_jay的博客-CSDN博客
当我们有批量执行多个异步任务的业务,并且需要调用get()获取执行结果的时候,我们写的代码一般如下:
Future f1 = excutor.submit(c1);
f1.get();
Future f2 = excutor.submit(c2);
f2.get();
f1.get()在获取成功之前会被阻塞,会阻塞c2的执行,严重降低了效率。
这时我们可以通过CompletionService来解决这一问题!
CompletionService结构

CompletionSerive接口有一个实现类ExecutorCompletionService,结构如下

BlockingQueue q = new LinkedBlockingQueue();
ThreadPoolExecutor executor = new ThreadPoolExecutor(4,6,3, TimeUnit.SECONDS,q);
CompletionService cs = new ExecutorCompletionService<>(executor);
cs.submit(new Callable() {
@Override
public Integer call() throws Exception {
Thread.sleep(2000);
return 1;
}
});
cs.submit(new Callable() {
@Override
public Integer call() throws Exception {
Thread.sleep(2000);
return 1;
}
});
for (int i = 0;i < 3;i++){
Integer f = cs.take().get();
System.out.println(f);
}
源码解析:
public ExecutorCompletionService(Executor executor,
BlockingQueue> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
//线程池
this.executor = executor;
//AbstractExecutorService
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
//保存Future的队列
this.completionQueue = completionQueue;
}
public Future submit(Callable task) {
if (task == null) throw new NullPointerException();
//包装成FutureTask
RunnableFuture f = newTaskFor(task);
//会调用FutureTask的run()
executor.execute(new QueueingFuture(f));
return f;
}
private class QueueingFuture extends FutureTask {
QueueingFuture(RunnableFuture task) {
super(task, null);
this.task = task;
}
//将task加入到completionQueue
protected void done() { completionQueue.add(task); }
private final Future task;
}
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//调用call()方法,result为执行的结果
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
//这里这里这里
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//将FutureTask的outcome设置result
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
//这里这来这里
finishCompletion();
}
}
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
//将task加入到completionQueue
done();
callable = null; // to reduce footprint
}
总结
从源码解析看,ExecutorCompletionService中内置了一个Future的completionQueue,在任务调用完成后,会将submit返回的future放入到completionQueue。用户可以通过completionQueue.take()得到future然后调用future.get()来获取任务执行结果。