2023年7月19日 作者 zeroheart

CompletionService详解

CompletionService的作用和具体用法?

Java中的CompletionService是一个非常有用的接口,它将Executor和BlockingQueue的功能组合在一起。你可以将Callable任务提交给它去执行,然后使用类似于队列的操作,如take或poll,来获取已完成的结果。这是一个生产者-消费者的设计,其中Callable任务是生产者,产生的结果被放入队列,而用户通过take或poll操作作为消费者取出并消费这些结果。

下面是一个简单的使用CompletionService的示例:

javaCopy code// 创建一个线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 使用ExecutorCompletionService,它是CompletionService接口的实现类
CompletionService<String> service = new ExecutorCompletionService<>(executor);

// 提交任务
for (int i = 0; i < 5; i++) {
    int finalI = i;
    service.submit(() -> {
        Thread.sleep(1000 * finalI);
        return "Result of Task " + finalI;
    });
}

// 检索并输出结果
for (int i = 0; i < 5; i++) {
    try {
        System.out.println(service.take().get());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

// 关闭线程池
executor.shutdown();

在这个示例中,我们提交了5个任务,每个任务会休眠一段时间然后返回一个字符串。由于CompletionService内部使用了阻塞队列,我们可以调用take方法按照任务完成的顺序(而非提交的顺序)获取结果。注意,take方法会阻塞,如果你想要非阻塞的操作,可以使用poll方法。

最后,别忘了在完成任务后关闭线程池。

CompletionService的主要用途是处理大量独立的任务,这些任务可能会在不同的时间完成,并且你希望立即处理这些已完成的任务。

使用poll的例子,如果长时间没有返回怎么处理?

使用poll方法可以非阻塞地获取CompletionService的结果。它会立即返回一个Future,如果没有任务完成,则返回null

如果你想要长时间没有返回结果时进行特定处理,可以考虑设置一个超时。例如,你可以循环地调用poll,每次调用后检查是否返回了结果,如果没有结果,则等待一段时间再继续。如果超过了你设定的总超时时间,你可以抛出一个异常或执行其他操作。

下面是一个例子:

javaCopy codeExecutorService executor = Executors.newFixedThreadPool(3);
CompletionService<String> service = new ExecutorCompletionService<>(executor);

for (int i = 0; i < 5; i++) {
    int finalI = i;
    service.submit(() -> {
        Thread.sleep(1000 * finalI);
        return "Result of Task " + finalI;
    });
}

long timeout = 10000; // 10 seconds
long start = System.currentTimeMillis();

for (int i = 0; i < 5; i++) {
    while (true) {
        Future<String> future = service.poll();
        if (future != null) {
            try {
                System.out.println(future.get());
                break;
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        } else {
            // 检查是否超时
            if (System.currentTimeMillis() - start > timeout) {
                throw new RuntimeException("Timeout after " + timeout + " milliseconds");
            }

            // 等待一段时间再继续
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

executor.shutdown();

这个例子中,我们在每次调用poll并未获得结果后,等待1秒再继续。如果总时间超过了10秒,我们将抛出一个异常。

注意,这个例子使用了一个固定的等待时间(1秒),并且会在每次poll返回null时等待。在实践中,你可能想要使用更复杂的逻辑,例如逐步增加等待时间,或者只在连续多次poll返回null后等待。

一下是另一篇文章,介绍基本用法的:

原文:(48条消息) CompletionService详解_张孟浩_jay的博客-CSDN博客

当我们有批量执行多个异步任务的业务,并且需要调用get()获取执行结果的时候,我们写的代码一般如下:

Future f1 = excutor.submit(c1);
f1.get();
Future f2 = excutor.submit(c2);
f2.get();

f1.get()在获取成功之前会被阻塞,会阻塞c2的执行,严重降低了效率。

这时我们可以通过CompletionService来解决这一问题!

CompletionService结构

CompletionSerive接口有一个实现类ExecutorCompletionService,结构如下

BlockingQueue q = new LinkedBlockingQueue();
ThreadPoolExecutor executor = new ThreadPoolExecutor(4,6,3, TimeUnit.SECONDS,q);
CompletionService cs = new ExecutorCompletionService<>(executor);

cs.submit(new Callable() {
@Override
public Integer call() throws Exception {
Thread.sleep(2000);
return 1;
}
});
cs.submit(new Callable() {
@Override
public Integer call() throws Exception {
Thread.sleep(2000);
return 1;
}
});

for (int i = 0;i < 3;i++){
Integer f = cs.take().get();
System.out.println(f);
}

源码解析:

public ExecutorCompletionService(Executor executor,
BlockingQueue> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
//线程池
this.executor = executor;
//AbstractExecutorService
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
//保存Future的队列
this.completionQueue = completionQueue;
}

public Future submit(Callable task) {
if (task == null) throw new NullPointerException();
//包装成FutureTask
RunnableFuture f = newTaskFor(task);
//会调用FutureTask的run()
executor.execute(new QueueingFuture(f));
return f;
}

private class QueueingFuture extends FutureTask {
QueueingFuture(RunnableFuture task) {
super(task, null);
this.task = task;
}
//将task加入到completionQueue
protected void done() { completionQueue.add(task); }
private final Future task;
}

public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//调用call()方法,result为执行的结果
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
//这里这里这里
set(result);
}
} finally {

        runner = null;

        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }

}

protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//将FutureTask的outcome设置result
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
//这里这来这里
finishCompletion();
}
}

private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
//将task加入到completionQueue
done();

callable = null;        // to reduce footprint

}

总结
从源码解析看,ExecutorCompletionService中内置了一个Future的completionQueue,在任务调用完成后,会将submit返回的future放入到completionQueue。用户可以通过completionQueue.take()得到future然后调用future.get()来获取任务执行结果。