Java 线程中的Callable和Future

Java 线程中两个非常有用的并发控制类,Callable用来产生结果,Future用来获取结果;

Callable

Callable 接口类似于Runnable,但是Runnable不会返回结果,并且无法抛出返回结果的异常,而Callable功能更强大一些,被线程执行后,可以返回值,这个返回值可以被Future拿到,也就是说,Future可以拿到异步执行任务的返回值;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* A task that returns a result and may throw an exception.
* Implementors define a single method with no arguments called
* {@code call}.
*
* <p>The {@code Callable} interface is similar to {@link
* java.lang.Runnable}, in that both are designed for classes whose
* instances are potentially executed by another thread. A
* {@code Runnable}, however, does not return a result and cannot
* throw a checked exception.
*
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*/
V call() throws Exception;
}

Future

Future 用于获取异步操作的结果,提供了同个方法:

  1. 判断操作是否完成 isDone();
  2. 获取操作结果 get();
  3. 取消操作 cancel();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* A {@code Future} represents the result of an asynchronous
* computation. Methods are provided to check if the computation is
* complete, to wait for its completion, and to retrieve the result of
* the computation. Additional methods are provided to
* determine if the task completed normally or was cancelled. Once a
* computation has completed, the computation cannot be cancelled.
* If you would like to use a {@code Future} for the sake
* of cancellability but not provide a usable result, you can
* declare types of the form {@code Future<?>} and
* return {@code null} as a result of the underlying task.
*
public interface Future<V> {
/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when {@code cancel} is called,
* this task should never run. If the task has already started,
* then the {@code mayInterruptIfRunning} parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, {@link #isDone} {@link #isCancelled}
* will always return {@code true} if this method returned {@code true}.
*
* @param mayInterruptIfRunning {@code true} if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
*/
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*/
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

FutureTask

FutureTask实现了两个接口,RunnableFuture,所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值;
假设有一个很耗时的返回值需要计算,并且这个返回值不是立刻需要的话,那么就可以使用这个组合,用另一个线程去计算返回值,而当前线程在使用这个返回值之前可以做其它的操作,等到需要这个返回值时,再通过Future获取返回值。

Example

  • 直接使用Thread
1
2
3
4
ObjCallable c0 = new ObjCallable("abc-Thread");
FutureTask<Obj> ftask = new FutureTask<Obj>(c0);
new Thread(ftask).start();
print("ftask#get() > " + ftask.get().name);
  • 使用FutureTask
1
2
3
4
ObjCallable c0 = new ObjCallable("abc-FutureTask");
FutureTask<Obj> ftask2 = new FutureTask<Obj>(c0);
executor.submit(ftask2);
print("ftask2#get() > " + ftask2.get().name);
  • 使用ExecutorService
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
ExecutorService es = Executors.newCachedThreadPool();
ObjCallable c1 = new ObjCallable("abc");
print(System.currentTimeMillis() + " - f1-Begin");
Future<Obj> f1 = es.submit(c1);
// TODO: do something special
Obj f1o = f1.get();
print("f1#get() > " + f1o.name);
print(System.currentTimeMillis() + " - fTask1-Begin");
c1.name = "abcd";
FutureTask<Obj> fTask1 = (FutureTask<Obj>) es.submit(c1);
// TODO: do something special
Obj fTaskO = fTask1.get();
print("fTask1#get() > " + fTaskO.name);
// submit Runnable
FutureTask<Obj> fRunnable = (FutureTask<Obj>) es.submit(new TR(), fTaskO);
while (!fRunnable.isDone()) {
Thread.sleep(200);
print("wait....");
}
try {
Obj o = fRunnable.get();
print("fRunnable#get() > " + o.name);
} catch (ExecutionException e) {
e.printStackTrace();
}
//
// Loop-get
List<Future<Obj>> futureTasks = new ArrayList<Future<Obj>>(6);
for (int i = 0; i < 5; i++) {
Future<Obj> future = es.submit(new ObjCallable("peo-" + (i + 1)));
futureTasks.add(future);
}
//
for (int i = 0; i < futureTasks.size(); i++) {
Future<Obj> future = futureTasks.get(i);
Obj o = future.get();
print(o.name);
assert o.name.equals("peo-" + (i + 1));
}
CompletionService<Obj> completionService = new ExecutorCompletionService<Obj>(es);
// 1.CompletionService.take 会获取并清除已经完成Task的结果,如果当前没有已经完成Task时,会阻塞。
// 2.创建一个Future类型的集合,用Executor提交的任务返回值添加到集合中,最后遍历集合取出数据,这种方法是按照Future加入的顺序。
for (int i = 0; i < 5; i++) {
completionService.submit(new ObjCallable("peo-" + (i + 1)));
}
Future<Obj> temp = null;
// 提交到CompletionService中的Future是按照完成的顺序排列的;
int i = 0;
while (i < 5 && (temp = completionService.take()) != null) {
Obj o = temp.get();
print("completionService#take = " + o.name + " - " + i++);
}
//
es.shutdown();

ObjCallable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
static class TR implements Runnable {
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
print("TR-end");
}
public Object call() {
return new Obj();
}
}
static class ObjCallable implements Callable<Obj> {
private static final Random random = new Random();
private String name;
public ObjCallable(String name) {
this.name = name;
}
public Obj call() throws Exception {
Obj o = new Obj();
o.name = this.name;
Thread.sleep(random.nextInt(800));
return o;
}
}