学习java很久很久了,得有个5年了,但是从来都没有真正的走进java世界,希望从这篇文章开始,把自己对java的点点滴滴都记录下来。
从java5开始,java就提供了名叫Executor framework的机制,主要是围绕着Executor接口, 它的接口 ExecutorService, 以及实现了这两个接口的ThreadPoolExecutor类来展开,这种机制把线程的执行和创建分离开了,你只需要创建一个线程,然后把线程丢给Executor,让它执行去吧。使用这个机制的另外一个好处是可以使用Callable接口,它类似于Runnable接口,但是有两个不一样的特性。
- 它的主要方法是call(), 它可以携带一个返回值。
- 当你发送了一个Callable对象给executor之后,你可以拿到一个实现了Future接口的对象,通过这个对象,你可以控制对象的状态以及Callable对象的结果。
public Server(){executor=(ThreadPoolExecutor)Executors.newFixedThreadPool(5);System.out.printf("Server: Task Count: %d\n",executor.getTaskCount());} 提交带返回值的任务。public class FactorialCalculator implements Callable{ private Integer number;public FactorialCalculator(Integer number){this.number=number;}@Overridepublic Integer call() throws Exception { int result = 1;if ((num==0)||(num==1)) {result=1;} else {for (int i=2; i<=number; i++) {result*=i;TimeUnit.MILLISECONDS.sleep(20);}}System.out.printf("%s: %d\n",Thread.currentThread().getName(),result);return result;}public class Main {public static void main(String[] args) { ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors.newFixedThreadPool(2);List > resultList=new ArrayList<>();Random random=new Random();for (int i=0; i<10; i++){Integer number= random.nextInt(10);FactorialCalculator calculator=new FactorialCalculator(number);Future result=executor.submit(calculator);resultList.add(result);}do { for (int i=0; i result=resultList.get(i);System.out.printf("Main: Task %d: %s\n",i,result.isDone());}try {TimeUnit.MILLISECONDS.sleep(50);} catch (InterruptedException e) {e.printStackTrace();}} while (executor.getCompletedTaskCount() result=resultList.get(i);Integer number=null;try {number=result.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}System.out.printf("Main: Task %d: %d\n",i,number);}executor.shutdown();}}}
在Future对象里面,我们可以通过result.isDone()方法来判断线程是否计算完毕。
执行一堆任务,只返回第一个完成的任务。result = executor.invokeAny(taskList);
执行全部,resultList=executor.invokeAll(taskList);
推迟执行,executor.awaitTermination(1, TimeUnit.DAYS);
把任务的执行和结果的处理分开,需要用到CompletionService, CompletionServicei有两个方法,take和poll,poll是如果没有,它就会立刻返回一个null,take是没有的话,会一直等待。
当线程池调用了关闭之后,它需要等待当前所有进行中的线程结束才会完全关闭,在这个过程当中提交的线程,需要拒绝处理,我们需要实现一个RejectedExecutionHandler,重写它的rejectedExecution方法,然后听过executor的setRejectedExecutionHandler()方法来设置。
public class RejectedTaskController implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.printf("RejectedTaskController: The task %s has been rejected\n",r.toString());System.out.printf("RejectedTaskController: %s\n",executor.toString());System.out.printf("RejectedTaskController: Terminating: %s\n",executor.isTerminating());System.out.printf("RejectedTaksController: Terminated: %s\n",executor.isTerminated());}
Fork/Join Framework
Fork/Join Framwork的诞生是为了解决如下图这样的问题的
我们可以利用ForkJoinPool,它是一个特殊的Executors。
ForkJoin 框架主要是有两个操作:
Fork:把一个任务分成几个小任务,然后执行
Join:等待小任务的完成,并生成一个新任务。
这里我把ForkJoin称为刀叉框架,刀叉框架和Executor框架不一样的地方在于work-stealing算法,不同于Executor框架,刀叉框架在等待子任务的完成之前就已经创建并开始运行Join方法,Join方法一直在检测任务是否完成并且开始运行。通过这样的方式,可以很好的利用runtime的优势,提高性能。
这样就有一些限制:
任务只能采用Fork和Join来作为同步机制,而不能采用别的同步机制,如果采用其他的机制,他们在同步操作的时候,不能执行别的任务。比如:当你在刀叉框架里面一个任务sleep的时候,别的正在执行的任务也会停止,直到该线程sleep结束。
刀叉框架的核心是两个类:
(1)ForkJoinPool,它实现了ExecutorService接口和work-stealing算法,通过它可以很好的管理正在运行的任务以及了解任务的信息。
(2)ForkJoinTask,任务的基类,它提供了fork()方法和join()方法来控制任务的状态。不同通常,你会实现他们的两个子类,不带返回结果的RecursiveAction和带返回结果的RecursiveTask。