1 线程

Wu Jun 2018-12-18 21:52:59
05 Java > 00 Java 基础 > 07 并发

1 基本概念

1.1 并发和并行的区别

1.2 进程和线程的区别

2 实现多线程

在 Java 中要想实现多线程,有三种手段

警告:不要调用 run 方法,直接调用 run 方法只会执行同一个线程中的任务,而不会启动新线程。应调用 start 方法,会常见一个执行 run 方法的新线程。

2.1 继承 Thread 类

【不再被推荐,可以使用线程池】

Thread 类与大部分的 Java API 有显著的差别,它的所有关键方法都是声明为 Native 的;

对于 Sun JDK 来说,它的 Windows 版与 Linux 版的线程实现方式都是使用一对一的线程模型实现的,一条 Java 线程就映射到一条轻量级进程之中,因为 Windows 和 Linux 系统提供的线程模式就是一对一的;

创建一个 Thread 类的子类定义一个线程,并调用实例的 start 方法。

class MyThread extends Thread{
	public void run(){
		task code
	}
}

2.2 实现 Runable 接口

1)实现 Runnable 接口类的 run 方法
class MyRunnable implements Runnable{
	public void run() {
		task code
	}
}
2)由 Runnable 创建一个 Thread 对象
Runnable runnable = new MyRunnable();
Thread thread = new Thread(r);
3)启动线程 .start()
t.start()

2.3 实现 Callable 接口

1)Callable

Callable 与 Runnable 类似,它们的主要区别是 Callable 的 call() 方法可以返回值和抛出异常。

Callable 接口是一个参数化的类型,只有一个方法 call。

public interface Ca11able<V>{
    V call() throws Exception;
}

Callable 可以返回装载有计算结果的 Future 对象。
Thread 类只支持 Runnable。Callable 可以使用 ExecutorService

2)Future

Future 保存异步计算的结果。

可以启动一个计算,将 Future 对象交给某个线程,然后忘掉它。 Future 对象的所有者在结果计算好之后就可以获得它。

public interface Future<V>{
    V  get() throws ...;//阻塞获取结果。
    V  get(long timeout, TimeUnit unit) throws ...;//计时阻塞获取结果。
    void cancel(boolean mayInterrupt);//试图取消任务。如任务已开始,由参数决定是否取消
    boolean isCancelled();//任务是否已取消。
    boolean isDone();//任务是否已完成。
}

可以通过多个方法获得 Future:

Future 可以设定时间限制,当任务在指定时间没有完成时,get()将抛出TimeoutException,此时应利用Future取消任务

3)FutureTask

FutureTask 包装器是一种非常便利的机制,可将 Callable 转换成 Future 和 Runnable,它同时实现二者的接口。

Callable<Integer> myComputation = ...;
FutureTask<Integer> task = new FutureTask<Integer>(myComputation);
Thread t = new Thread(task); // it's a Runnable
t.start();
...
Integer result = task.get(); // It's a Future

FutureTask 是为了弥补 Thread 的不足而设计的,可以准确地知道线程什么时候执行完成,并获得到线程执行完成后返回的结果。

FutureTask 是一种可取消的异步计算任务,通过 Callable 实现,等价于可以携带结果的 Runnable,并且有三个状态:等待、运行和完成。

3 中断线程

当线程的 run 方法执行方法体中最后一条语句并经由 return 返回时。或出现了在方法中没有捕获的异常时,线程终止。

在 Java 中没有一种安全的抢占式方法来停止线程,只有一些协作式的机制,使请求取消的任务和代码都遵循一种协商好的协议。

3.1 线程阻塞

3.2 线程中断

没有可以强制线程终止的方法,interrupt 方法可以用来请求终止线程。isInterrupted() 可以检测线程是否被中断。

1)中断策略
2)响应中断

有两种实用策略可用于处理 InterruptedException:

Runnable r = () -> {
    try{
        //...
        while (!Thread.currentThread().isInterrupted() && /* more work to do */) {
            // do more work
        }catch(InterruptedException e) {
            // thread was interrupted during sleep or wait
        }finally {
            // cleanup, if required
        }
        // exiting the run method terminates the thread
    }
}
void mySubTask() throws InterruptedException
{
    // ...
    sleep(delay);
    // ...
}

3.3 通过 Future 来实现中断

public class TimedRun {
    private static final ExecutorService taskExec = Executors.newCachedThreadPool();

    public static void timedRun(Runnable r, long timeout, TimeUnit unit) throws InterruptedException {
        Future<?> task = taskExec.submit(r);
        try {
            task.get(timeout, unit);//在限时内取得结果,如果要等待则等待
        } catch (TimeoutException e) {
            // 接下来任务会被取消
        } catch (ExecutionException e) {
            // 如果在任务中抛出了异常,那么重新抛出该异常
            throw launderThrowable(e.getCause());
        } finally {
            // 如果任务已经结束,那么执行取消操作也不会有任何影响
            task.cancel(true); // 如果任务正在进行,那么将被中断
        }
    }
}

3.4 处理不可中断的阻塞

不是所有 Java 中的阻塞机制都会直接响应 InterruptException,如一个线程由于执行同步的 Socket I/O 或者等待获得内置锁而阻塞。但他们也有类似 InterruptException 的机制。

可以通过重写 Thread.interrupt 方法,将非标准的取消操作封装在 Thread 中(如加入close Socket的逻辑)。

3.5 采用 newTaskFor 来封装非标准的取消

public interface CancellableTask<T> extends Callable<T> {
       void cancel();
       RunnableFuture<T> newTask();
}
@ThreadSafe
class CancellingExecutor extends ThreadPoolExecutor {
	...
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        if (callable instanceof CancellableTask)
            return ((CancellableTask<T>) callable).newTask();
        else
            return super.newTaskFor(callable);
    }
}
public abstract class SocketUsingTask <T> implements CancellableTask<T> {
     private Socket socket;

    protected synchronized void setSocket(Socket s) {
        socket = s;
    }
   //定义了Future.cancel来关闭套接字
    public synchronized void cancel() {
        try {
            if (socket != null)
                socket.close();
        } catch (IOException ignored) {
        }
    }
   //如果SocketUsingTask通过其自己的Future来取消,那么底层的套接字将被关闭并且线程将被中断
    public RunnableFuture<T> newTask() {
        return new FutureTask<T>(this) {  //Creates a FutureTask that will, upon running(正在运行), execute the given Callable.
            @SuppressWarnings("finally")
            public boolean cancel(boolean mayInterruptIfRunning) {
                try {
                    SocketUsingTask.this.cancel();
                } finally {  //调用super.cancel
                    return super.cancel(mayInterruptIfRunning);
                }
            }
        };
    }
}

4 线程状态

线程可以有如下 6 种状态:

4.1 状态方法

要确定一个线程的当前状态,可调用 getState 方法。

Thread.getState()//得到线程的状态
Thread.join()//等待终止指定的线程

4.2 状态转换

5 线程优先级

线程调度是指系统为线程分配处理器使用权的过程,主要调度方式有两种,分别是

Java 语言一共设置了 10 个级别的线程优先级,不过线程优先级并不是太靠谱,原因就是操作系统的线程优先级不见得总是与 Java 线程的优先级一一对应,另外优先级还可能被系统自行改变;

不要将程序构建为功能的正确性依赖于优先级。

Linux 上线程不具有优先级。

6 未捕获异常处理器

线程组中的某个线程由于抛出了未捕获的异常(RuntimeException)而退出时,会调用 ThreadGroup.uncaughtException() 方法。

使用 Unchecked Exception 处理器目的在于健壮线程异常退出的记录。可以用 setUncaughtExceptionHandler 方法为任何线程安装一个处理器。也可以用Thread类的静态方法 setDefaultUncaughtExceptionHandler 为所有线程安装一个默认的处理器。

如果不安装默认的处理器,此时的处理器就是该线程的 ThreadGroup 对象。

注释:线程组是一个可以统一管理的线程集合。不要在自己的程序中使用线程组。

ThreadGroup 类实现 Thread.UncaughtExceptionHandler 接口。它的 uncaughtException 方法做如下操作:

  1. 如果该线程组有父线程组,那么父线程组的 uncaughtException 方法被调用。
  2. 否则,如果 Thread.getDefaultExceptionHandler 方法返回一个非空的处理器,则调用该处理器。
  3. 否则,如果 Throwable 是 ThreadDeath 的一个实例,什么都不做。
  4. 否则,线程的名字以及 Throwable 的栈踪迹被输出到 System.err 上。

7 JVM 关闭

JVM 既可以正常关闭,也可以强行关闭。

7.1 关闭钩子

在正常关闭中,JVM 首先调用所有已注册的关闭钩子(Shutdown Hook)。关闭钩子是指通过 Runtime.addShutdownHook 注册的但尚未开始的线程。

关闭钩子应该是线程安全的,并且不应该对应用程序的状态或者JVM的关闭原因做出任何假设。关闭钩子必须尽快退出,因为它们会延迟JVM的结束时间,而用户可能希望 JVM 能尽快终止。

关闭钩子可以用于实现服务或应用程序的清理工作,例如删除临时文件,或者清除无法由操作系统自动清除的资源。

public static void main(String[] args) {
    SpringApplication.run(SkuSyncApplication.class, args);

    //将 hook 线程添加到运行时环境中去
    Runtime.getRuntime().addShutdownHook(new Thread() {
        @Override
        public void run() {
            System.out.println("ShutdownHook");
        }
    });
}

7.2 守护线程

守护线程(Daemon Thread)的唯一用途是为其他线程提供服务。守护线程最典型的应用就是 GC (垃圾回收器)。

当只剩下守护线程时,虚拟机就退出了,不阻碍 JVM 的关闭。

在 JVM 启动时创建的所有线程中,除了主线程以外,其他的线程都是守护线程。新线程继承创建它的线程的守护状态,默认情况下,主线程创建的所有线程都是普通线程。

普通线程与守护线程之间的差异仅在于当线程退出时发生的操作。当一个线程退出时,JVM 会检查其他正在运行的线程,如果这些线程都是守护线程,则 JVM 会正常退出。当 JVM 停止时,所有仍然存在的守护线程都将被抛弃——既不会执行 finally 代码块,也不会执行回卷栈,而 JVM 只是直接退出。

守护线程应该永远不去访问固有资源,如文件、数据库,因为它会在任何时候甚至在一个操作的中间发生中断。应尽可能少地使用守护线程 —— 很少有操作能够在不进行清理的情况下被安全地抛弃。守护线程最好用于执行“内部"任务,例如周期性地从内存的缓存中移除逾期的数据。

此外,守护线程通常不能用来替代应用程序管理程序中各个服务的生命周期

7.3 终结器

垃圾回收器对那些定义了 finalize 方法的对象会进行特殊处理:在回收器释放它们后,调用它们的 finalize 方法,从而保证一些持久化的资源被释放。

除了管理本地方法获取的对象,尽量避免编写或使用包含终结器的类。finalize 方法性能开销大,编写困难。在大多数情况下,通过使用 finally 代码块和显式的 close 方法,能够比使用终结器更好地管理资源。

8 线程优化

引入多线程会增加开销: 线程之间协调、上下文切换、线程的创建和销毁、线程的调度。

如果过度地使用线程,开销甚至会超过性能的提升。

8.1 对性能的思考

1)性能和可伸缩性

可伸缩性:当增加计算资源时, 程序的吞吐量或者处理能力能相应地增加。

性能的两类衡量指标——“运行速度”和“处理能力”,是完全独立的,有时候甚至是相互矛盾的。

2)评估各种性能权衡因素

避免不成熟的优化。首先使程序正确, 然后在提高运行速度——如果它还运行得不够快。

在使某个方案比其他方案更快之前, 首先问自己一些问题:

对性能的提升可能是并发错误的最大来源。

以测试为基准,不要猜测。

8.2 线程引入的开销

对于提升性能而引入的线程来说, 并行带来的性能提升必须超过并发导致的开。

1)上下文切换

线程大于CPU,将导致上下文切换。保存当前线程的上下文, 并将新调度线程的上下文设置为当前上下文。

上下文切换开销:

线程阻塞(竞争锁,线程等待,IO阻塞)会导致上下文切换,无阻塞算法有助于线程上下文切换。

2)内存同步

应将重点放在锁竞争的地方。非竞争同步的开销,经优化后已微乎其微了。

3)阻塞

阻塞行为由JVM分析历史等待时间来选择:

由于锁竞争而导致阻塞时, 线程在持有锁时将存在一定的开销,当它释放锁时,必须告诉操作系统恢复运行阻塞的线程。

8.3 减少上下文切换的开销

当任务在运行和阻塞这两个状态之间转换时,就相当于一次上下文切换。在服务器应用程序中,发生阻塞的原因之一就是在处理请求时产生各种日志消息。

通过将I/O操作从处理请求的线程中分离出来,可以缩短处理请求的平均服务时间。

9 构件高效且可伸缩的结果缓存

分析一个简单的HashMap缓存
public class Memoizer3<A, V> implements Computable<A, V>{
    private final Map<A, Future<V>> cache = new ConcurrentHashMap<>();
    private final Computable<A, V> c;
    
    public Memoizer3(Computable<A, V> c){
        this.c = c;
    }
    
    public V compute(final A arg) throws InterruptedException{
        while(true){
        Future<V> f = cache.get(arg);
        if(f == null){// 非原子,多个线程可进入
            Callable<v> eval = new Callable<v>(){
                public V call() throws InterruptedException{
                    return c.compute(arg);
                }
            };
            FutureTask<V> ft = new FutureTask<v>(eval);
            f = cache.putIfAbsent(arg, ft);//原子操作,在此限制其他线程
            if(f == null){
                f = ft;
                ft.run();
            }
        }
        try{
            return f.get();
        }cache(CancellationException e){
            cache.remove(arg, f);//计算取消,移除Future
        }cache(ExcutionException e){
            throw launderThrowable(e.getCause());
        }
        }
    }
}

10 并发技巧清单