2 线程池

Wu Jun 2020-03-06 23:36:29
05 Java > 00 Java 基础 > 07 并发

线程池,指管理一组同构工作线程的资源池。线程池与工作队列密切相关,其中工作队列保存了所有等待执行的线程。工作者线程从工作队列中获取一个线程,执行任务,然后返回线程池并等待下一个任务。

1 创建线程池

一般一个简单线程池至少包含下列组成部分。

  1. 线程池管理器(ThreadPoolManager):创建线程池,销毁线程池,添加新任务
  2. 工作线程(WorkThread):线程池中线程
  3. 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行。
  4. 任务队列:用于存放没有处理的任务。提供一种缓冲机制。

ExecutorService 包含运行/关闭/已终止三种状态

1.1 Executor 类静态工厂方法

执行器(Executor)类有许多静态工厂方法用来构建线程池。

方法 描述
newCachedThreadPool 必要时创建新线程;空闲线程会被保留 60 秒
newFixedThreadPool 线程包含固定数量的线程;空闲线程会一直被保留
newSingleThreadExecutor 只有一个线程的“池”,该线程顺序执行每一行提交的任务
newScheduledThreadPool 用于预定执行而构建的固定线程池,替代 java.util.Time
newSingleThreadScheduledExecutor 用于预定执行而构建的单线程“池”

1.2 ThreadPoolExecutor 构造函数

ThreadPoolExecutor 为一些 Executor 提供了基本的实现,还可以通过 ThreadPoolExecutor 的构造函数来实例化一个对象,并根据自己的需求来定制。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
}

2 配置 ThreadPoolExecutor

2.1 线程池因素

Executor 的执行逻辑:

线程复用

线程执行时循环去队列取任务,没有任务了就进入阻塞状态,队列有新任务就被唤醒去执行任务,空闲时间超时就销毁

2.2 任务队列

ThreadPoolExecutor 允许提供一个 BlockingQueue 来保存等待执行的任务。

基本的任务排队方法有3种:

2.3 饱和策略

ThreadPoolExecutor 的饱和策略通过调用 setRejectedExecutionHandler 来修改。

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()) ;

2.4 线程工厂

如在应用程序中需要利用安全策略来控制访问权限,可用 Executor 中的 privilegedThreadFactory 工厂来定制自己的线程工厂。

public interface ThreadFactory {
    Thread newThread(Runnable r);
}

2.5 扩展 ThreadPoolExecutor

ThreadPoolExecutor 是可扩展的,提供了几个可以在子类化中改写的方法:beforeExecute、 afterExecute 和 terminated。可以添加日志、计时 监视、通知等功能

private class SaveOlderCategoryTask implements Callable<String> {
    DcdCategory category;

    SaveOlderCategoryTask(DcdCategory category) {
        this.category = category;
    }

    @Override
    public String call() {
        Long oldMinBehotTime = getMinBehotTime(category.getId());
        saveCategory(category, oldMinBehotTime, 0L);
        return "complete";
    }
}
public static void doExecutor(Collection<FutureTask> list) {
    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
            .setNameFormat("dcd-pool-%d").build();

    //Common Thread Pool
    ExecutorService pool = new ThreadPoolExecutor(5, list.size(),
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());

    list.forEach(pool::submit);
    list.forEach(e -> {
        try {
            log.info((String) e.get());
        } catch (InterruptedException | ExecutionException e1) {
            e1.printStackTrace();
        }
    });
    pool.shutdown();
}

3 提交任务

3.1 控制任务组

使用执行器有更有意义的原因,控制一组相关任务。

List<Callable<T>> tasks = ...;
List<Future<T>> results = executor.invokeAll(tasks);
for(Future<T> result : results)
    processFurther(result.get());

可以用 ExecutorCompletionService 来进行排列。

ExecutorCompletionService<T> service = new ExecutorCompletionService<>(executor);
for (Callalbe<T> task : tasks) {
    service.submit(task);
}
for (int i = 0; i < tasks.size(); i++) {
    processFurther(service.take().get());
}

3.2 CompletionService

1)CompletableFuture
CompletableFuture<String> contents = readPage(url);
CompletableFuture<List<URL>> links = contents.thenApply(Parse::getLinks);

thenApply 方法不会阻塞。它会返回一个 future。第一个 future 完成时,其结果会提供给 getLinks 方法,这个方法的返回值是最终的结果。

从概念上讲,CompletableFuture 是一个简单 API,不过有很多不同方法来组合可完成 future。

2)CompletionService

Future 调用 get() 获取值会阻塞住调用线程,为了解决这个问题,1.8加入了 CompletableFuture 类。与事件处理器不同,CompletableFuture 可以“组合”(composed)。

3.3 递归算法的并行化

当串行循环中的各个迭代操作之间彼此独立,并且每个迭代操作执行的工作量比管理一个新任务时带来的开销更多,那么这个串行循环就适合并行化;

void processSequntially(List<Element> elements){
    for(Element e:elements){
        process(e);
    }
}

void processInParallel(Executor exec,List<Element> elements){
    for(final Element e:elements){
        exec.execute(new Runnable(){
            public void run(){
                process(e);
            }
        });
    }
}

在一些递归设计中同样可以采用循环并行化的方法进行优化;

public<T> void sequentialRecursive(List<Node> nodes,Collection<T> results){
    for(Node<T> n:nodes){
        results.add(n.compute());
        sequentialRecursive(n.getChildren(),results);
    }
}

public<T> void parrallelRecursive(final Executor exec,List<Node<T>> nodes,final Collection<T> results){
    //遍历过程串行
    for(final Node<T> n:nodes){
        exec.execute(new Runnable(){
            public void run(){
                //compute并行计算
                results.add(n.compute());
            }
        });
        parrallelRecursive(exec,n.getChildren(),results);
    }
}

等待通过并行方式计算的结果

public<T> Collection<T> getParrallelResults(List<Node<T>> nodes) throws InterruptedException{
    ExecutorService exec=Executors.newCachedThreadPool();
    Queue<T> resultQueue=new ConcurrentLinkedQueue<T>();
    parrallelRecursive(exec,nodes,resultQueue);
    exec.shutdown();
    exec.awaitTermination(Long.Max_VALUE,TimeUnit.SECONDS);
    return resultQueue;
}

4 批量获取多条线程的执行结果

当向线程池提交 callable 任务后,我们可能需要一次性获取所有返回结果,有三种处理方法。

4.1 方法一:自己维护返回结果

// 创建一个线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);

// 存储执行结果的List
List<Future<String>> results = new ArrayList<Future<String>>();

// 提交10个任务
for ( int i=0; i<10; i++ ) {
    Future<String> result = executorService.submit( new Callable<String>(){
        public String call(){
            int sleepTime = new Random().nextInt(1000);
            Thread.sleep(sleepTime);
            return "线程"+i+"睡了"+sleepTime+"秒";
        }
    } );
    // 将执行结果存入results中
    results.add( result );
}

// 获取10个任务的返回结果
for ( int i=0; i<10; i++ ) {
    // 获取包含返回结果的future对象
    Future<String> future = results.get(i);
    // 从future中取出执行结果(若尚未返回结果,则get方法被阻塞,直到结果被返回为止)
    String result = future.get();
    System.out.println(result);
}

此方法的弊端:

  1. 需要自己创建容器维护所有的返回结果,比较麻烦;
  2. 从list中遍历的每个Future对象并不一定处于完成状态,这时调用get()方法就会被阻塞住,如果系统是设计成每个线程完成后就能根据其结果继续做后面的事,这样对于处于list后面的但是先完成的线程就会增加了额外的等待时间。

4.2 方法二:使用 ExecutorService 的 invokeAll 函数

本方法能解决第一个弊端,即并不需要自己去维护一个存储返回结果的容器。当我们需要获取线程池所有的返回结果时,只需调用invokeAll函数即可。 但是,这种方式需要你自己去维护一个用于存储任务的容器。

// 创建一个线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);

// 创建存储任务的容器
List<Callable<String>> tasks = new ArrayList<Callable<String>>();

// 提交10个任务
for ( int i=0; i<10; i++ ) {
    Callable<String> task = new Callable<String>(){
        public String call(){
            int sleepTime = new Random().nextInt(1000);
            Thread.sleep(sleepTime);
            return "线程"+i+"睡了"+sleepTime+"秒";
        }
    };
    executorService.submit( task );
    // 将task添加进任务队列
    tasks.add( task );
}

// 获取10个任务的返回结果
List<Future<String>> results = executorService.invokeAll( tasks );

// 输出结果
for ( int i=0; i<10; i++ ) {
    // 获取包含返回结果的future对象
    Future<String> future = results.get(i);
    // 从future中取出执行结果(若尚未返回结果,则get方法被阻塞,直到结果被返回为止)
    String result = future.get();
    System.out.println(result);
}

4.3 方法三:使用 CompletionService

CompletionService 内部维护了一个阻塞队列,只有执行完成的任务结果才会被放入该队列,这样就确保执行时间较短的任务率先被存入阻塞队列中。

ExecutorService exec = Executors.newFixedThreadPool(10);

final BlockingQueue<Future<Integer>> queue = new LinkedBlockingDeque<Future<Integer>>(
        10);
//实例化CompletionService
final CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
        exec, queue);

// 提交10个任务
for ( int i=0; i<10; i++ ) {
    completionService.submit( new Callable<String>(){
        public String call(){
            int sleepTime = new Random().nextInt(1000);
            Thread.sleep(sleepTime);
            return "线程"+i+"睡了"+sleepTime+"秒";
        }
    } );
}

// 输出结果
for ( int i=0; i<10; i++ ) {
    // 获取包含返回结果的future对象(若整个阻塞队列中还没有一条线程返回结果,那么调用take将会被阻塞,当然你可以调用poll,不会被阻塞,若没有结果会返回null,poll和take返回正确的结果后会将该结果从队列中删除)
    Future<String> future = completionService.take();
    // 从future中取出执行结果,这里存储的future已经拥有执行结果,get不会被阻塞
    String result = future.get();
    System.out.println(result);
}

5 中断服务

5.1 关闭 ExecutorService

ExecutorService 提供了两种关闭方法:

5.2 “毒丸”对象

“毒丸”:“当得到这个对象时,立即停止”。 在 FIFO 队列中使用

5.3 示例:只执行一次的服务

如果某个方法需要处理一批任务,并且当所有任务都处理完成后才返回,那么可以通过一个私有的 Executor 来简化服务的生命周期管理,其中该 Executor 的生命周期是由这个方法来控制的。(在这种情况下,invokeAll 和 invokeAny 等方法通常会起较大的作用。)

public class CheckForMail {
      public boolean checkMail(Set<String> hosts,long timeout,TimeUnit unit)throws InterruptedException{
          ExecutorService exec=Executors.newCachedThreadPool();
          //之所以用AtomicBoolean代替volatile,是因为能从内部的Runnable中访问hasNewMail标志,因此它必须是final类型以免修改
          final AtomicBoolean hasNewMail=new AtomicBoolean(false);//初始值为false
          try{
              for(final String host:hosts)
                  exec.execute(new Runnable(){ //Executes the given command at some time in the future
                      public void run(){
                          if(checkMail(host))
                              hasNewMail.set(true);
                      }
                  });
          }finally {
            exec.shutdown();
            exec.awaitTermination(timeout, unit);
        }
          return hasNewMail.get();
      }
      private boolean checkMail(String host) {
          //检查邮件
          return false;
      }
}

5.4 shutdownNow 的局限性

使用 shutdownNow 时,虽然他会返回已提交但尚未开始的任务,但我们无法在关闭中知道正在执行的任务的状态

记录哪些任务是在关闭后取消的:

public class TrackingExecutor extends AbstractExecutorService{
    private final ExecutorService exec;
    private final Set<Runnable> tasksCancelledAtShutdown=
            Collections.synchronizedSet(new HashSet<Runnable>());   //创建一个线程安全的Set来记录那些任务是关闭后取消的

    public List<Runnable> getCancelledTasks(){	//客户端可以通过这个方法知道,哪些任务是在关闭后取消的
        if(!exec.isShutdown())
            throw new IllegalStateException();
        return new ArrayList<Runnable>(tasksCancelledAtShutdown);
    }

    public void execute(final Runnable runnable){
        exec.execute(new Runnable(){
            public void run(){
                try{
                    runnable.run();
                }finally{           //如果已经ExecutorService关闭了并且任务被中断(取消),添加到Set中
                    if(isShutdown()&&Thread.currentThread().isInterrupted())
                        tasksCancelledAtShutdown.add(runnable);
                }
            }
        });
    }
    // 将ExecutorService的其他方法委托给exec
}

6 预定执行

ScheduledExecutorService 接口具有为预定执行(Scheduled Execution)或重复执行任务而设计的方法。它是一种允许使用线程池机制的 java.util.Timer 的泛化。

可以预定 Runnable 或 Callable 在初始的延迟之后只运行依次。也可以预定一个 Runnable 对象周期性地运行。

ScheduledExecutorService newScheduledThreadPool(int threads)
//返回一个线程池,它使用给定的线程数来调度任务。

ScheduledExecutorService newSingleThreadScheduledExecutor()
//返回一个执行器,它在一个单独线程中调度任务。

相比 Timer,ScheduledThreadPoolExecutor 更加完善:

7 Fork-Join 框架

有些应用使用了大量线程, 但其中大多数都是空闲的。另外一些应用可能对每个处理器内核分别使用一个线程,来完成计算密集型任务。

Java SE 7 中引入 fork-join 框架,专门用来支持后一类应用。

Fork 就是把一个大任务切分为若干子任务并行的执行,Join 就是合并这些子任务的执行结果,最后得到这个大任务的结果。

要完成这种递归计算,需要扩展 RecursiveTask 的类或者提供一个扩展 RecursiveAction 的类,再覆盖 compute 方法来生成并调用子任务,然后合并其结果。

class Conter extends RecursiveTask<integer> {
    // ...
    protected Integer compute() {
        if (to - from < THRESHOLD) {
            // solve problem directly
        } else {
            int mid = (from + to) / 2;
            Counter first = new Counter(values, form, mid, filter);
            Counter second = new Counter(values, mid, to, filter);
            invokeAll(first, second);
            return first.join() + second.join();
        }
    }
}

invokeAll 方法接收到很多任务并阻塞,直到所有这些任务都已经完成。join 方法将生成结果。对每个子任务应用了 join,并返回其总和。

7.1 工作窃取

工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。使用双端队列减少线程竞争,被窃取任务的线程从头部拿任务,窃取任务的线程从尾部拿。

一个工作线程空闲时,它会从一个双端队列的尾部“窃取”一个任务。

public static void main(String[] args) {
    // ...
    Counter counter = new Counter(numbers, 0, numbers.length, x -> x > 0.5);
    ForkJoinPool pool = new ForkJoinPool();
    pool.invoke(counter);
    long end = System.currentTimeMillis();
    System.out.printf("count: %d, time: %d\n", counter.join(), end - mid);
}

8 使用实例

@Slf4j
public class ExecutorUtils {

    private void updateAuthorsSourceMedia(List<Author> authors) {
        if (authors.size() > MULTI_THREAD_LIMIT) {
            List<List<Author>> result = new ArrayList<>(authors.stream().collect(Collectors.groupingBy(x -> authors.indexOf(x) % 10)).values());
            List<FutureTask<Integer>> results = new ArrayList<>();
            result.forEach(e -> {
                FutureTask<Integer> ft = new FutureTask<>(new UpdateAuthorSourceMediaTask(e));
                results.add(ft);
            });
            ExecutorUtils.doExecutor(results);
        } else {
            new UpdateAuthorSourceMediaTask(authors).call();
        }
    }

    public static void doExecutor(Collection<FutureTask<Integer>> list) {
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat("pool-%d").build();

        ExecutorService pool = new ThreadPoolExecutor(list.size(), list.size(),
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());

        list.forEach(pool::submit);
        Integer total = 0;
        for (FutureTask<Integer> ft : list) {
            try {
                total = total + ft.get();
            } catch (InterruptedException | ExecutionException e1) {
                e1.printStackTrace();
            }
        }
        log.info("============== Total:{} ===============", total);
        pool.shutdown();
    }

}