十、分支合并框架

10.1、ForkJoinPool

Fork:把一个复杂任务进行分拆,大事化小

Join:把分拆任务的结果进行合并

10.2、原理

类似归并算法,采取分而治之的思想

未命名文件 (3)

10.3、相关类

1、ForkJoinPool

img

分支合并池 类比=> 线程池

2、ForkJoinTask

img

ForkJoinTask 类比=> FutureTask

3、RecursiveTask

img

递归任务:继承后可以实现递归(自己调自己)调用的任务

  • 以斐波那契数列为例
1
2
3
4
5
6
7
8
9
10
11
12
class Fibonacci extends RecursiveTask {
final int n;
Fibonacci(int n) { this.n = n; }
Integer compute() {
if (n
return n;
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}
}

10.4、实例

使用RecursiveTask计算从begin乘到end,其中begin、end均由用户输入且begin < end

1、创建一个MyTask类

这个类定义继承至 RecursiveTask

在compute方法中创建两个MyTask对象,然后调用MyTask对象的fork方法开启重新开启两个线程,再次调用compute方法计算。

如果传入那两个MyTask对象 的end - begin > 10,那么会在该对象中重新开启线程调用compute方法进行计算。

调用MyTask对象的join方法可以获得该对象compute 方法得到的结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class MyTask extends RecursiveTask<Integer> {
private static final Integer ADJUST_VALUE = 10;
private int begin;
private int end;
private int result = 1;

public MyTask(int begin,int end) {
this.begin = begin;
this.end = end;
}


@Override
protected Integer compute() {
if((end - begin) <= ADJUST_VALUE) {
//如果end和begin的差值小于10,那么证明数据计算量小,此时不使用分支合并框架,直接计算
for (int i = begin; i <= end; i++) {
result = result * i;
}
return result;
}
int middle = (end + begin) / 2;
MyTask task01 = new MyTask(begin,middle);
MyTask task02 = new MyTask(middle + 1,end);
task01.fork();
task02.fork();
result = task01.join() + task02.join();
return result;
}
}

2、使用ForkJoinPool调用MyTask中的compute方法

  • 创建一个MyTask对象
  • 创建一个ForkJoinPool对象
  • 调用ForkJoinPool对象的submit方法,传入上面的MyTask对象,submit方法返回一个ForkJoinTask 对象
  • 根据ForkJoinTask的get方法获取MyTask对象的compute方法的计算结果
  • 关闭ForkJoinPool对象
1
2
3
4
5
6
7
8
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyTask task = new MyTask(1,11);
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Integer> joinTask = pool.submit(task);
Integer result = joinTask.get();
System.out.println("计算结果为:" + result);
pool.shutdown();
}

3、运行程序,查看结果

image-20210302161845167

十一、悲观锁、乐观锁和CAS

11.1、悲观锁

​ 总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。再比如Java里面的同步原语synchronized关键字的实现就是悲观锁,volatile关键字虽然是synchronized关键字的轻量级实现,但是其无法保证原子性,所以一般也要搭配锁使用。

11.2、乐观锁

​ 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库提供的类似于write_condition机制,其实都是提供的乐观锁。在Java中java.util.concurrent.atomic包下面的原子变量类就是使用了乐观锁的一种实现方式CAS实现的。

11.3、CAS(Compare And Swap)

1、介绍

CAS就是根据乐观锁的设计思想来实现,在取数据的时候,判断一下在此期间是否有人修改,如果没有修改,则直接使用。

CAS 是由硬件实现的

CAS可以将read-modify-write这类的操作(如i++)转换为原子操作

例如:AtomicInteger中的 incrementAndGet()就使用了CAS自旋锁

  • 比如说有一个值0,我们需要对它进行递增,原本为保证数据一致性,我们需要对它加一把锁,但现在为了提高效率,我们使用自旋锁来完成。
  • 读取当前值,当前需要递增的值X为0;计算结果值,X 的结果值应为1,在将1赋给X 之前,我们再次读取X 的值,看看X的值是否和第一次读取的内存值相同,如果相同,就将1赋给X
  • 如果当前读取到的值和第一次读取到的值不同,假设内存中X的值已经由0变为2,那么此时再次读取2,计算结果,得到3,在将3写给X之前,读取内存值,看看这次读取到的内存值是否和上一次读取到的值(2)相等,如果相等,将3赋给X,否则继续自旋,直到修改成功或者放弃操作。

2、CAS原理

CAS原理:CAS有三个操作数,即内存值v,旧的操作数a,新的操作数b。当我们需要更新v值为b时,首先我们判断v值是否和我们之前的所见值a相同,若相同则将v赋值为b,若不同,则什么都不做。是一种非阻塞算法**(non-blocking algorithm).**在java中可以通过锁和循环CAS的方式来实现原子操作。

Java中的实现:Java中 java.util.concurrent.atomic包相关类就是CAS的实现,通过自旋转CAS来尝试获得锁。

CAS自旋锁适用于锁使用者保持锁时间比较短的情况中,因为自旋锁使用者一般保持锁的时间很短,所以才选择自旋而不是睡眠。

Java通过和循环CAS的方式实现原子操作。

image-20210302183544760

CAS最终通过 lock cmpxchg 指令来实现

3、CAS存在的问题

  • ABA问题:

​ 因为CAS在进行操作的时候,总是需要比较新的操作数和旧的操作数,如果相同则更新。但是如果新的操作数经过两次修改之后返回原来的值,那么久出现了ABA问题。解决问题的方法就是增加一个版本号,不仅仅通过检查值得变化来确定是否更新。

比如两个线程

  • 线程1 查询A的值为a,与旧值a比较,
  • 线程2 查询A的值为a,与旧值a比较,相等,更新为b值
  • 线程2 查询A的值为b,与旧值b比较,相等,更新为a值
  • 线程1 相等,更新B的值为c

可以看到这样的情况下,线程1 可以正常 进行CAS操作,将值从a变为c 但是在这之间,实际A值已经发了a->b b->a的转换

image-20210302182801422

4、使用CAS实现线程安全的计数器

编写一个CASCounter类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class CASCounter {
private volatile long value;

public long getValue() {
return value;
}


/***
*
* @param expectedValue 期望值
* @param newValue
* @return
*/
private boolean compareAndSwap(long expectedValue,long newValue) {
//如果当前value的值与期望的expectValue值一样,就把当前value字段的值换为newValue值
synchronized (this) {
if(value == expectedValue) {
value = newValue;
return true;
}
return false;
}
}

public long incrementAndGet() {
long oldValue;
long newValue;
do {
oldValue = value;
newValue = oldValue + 1;
} while(!compareAndSwap(oldValue,newValue));
return newValue;
}
}

创建100个线程,运行程序,查看结果

1
2
3
4
5
6
CASCounter casCounter = new CASCounter();
for (int i = 0; i < 100; i++) {
new Thread(() -> {
System.out.println(casCounter.incrementAndGet());
}).start();
}

结果

image-20210302225833762

11.4、CAS的ABA问题

​ CAS实现原子操作背后有一个假设:共享变量的当前值和当前线程提供的期望值相同,就认为这个变量没有被其他线程修改过。

​ 实际上这种假设不一定总是成立,设有共享变量 count = 0

  • A线程对 count 值修改为10
  • B线程对 count 值修改为20
  • C线程对 count 值修改为0

​ 当前线程看到 count 变量的值现在为0,现在能否认为 count 变量的值没有被其他线程更新呢?这样的结果能不能被接受?

​ 如果想要规避ABA问题,可以为共享变量引入一个修订号(版本号、时间戳),每次修改共享变量时,相应的修订号就会增加一。

11.5、原子变量类

​ 原子变量类基于CAS实现,当对共享变量进行 read-modify-write 更新操作时,使用原子变量类可以保障操作的原子性和可见性。对变量的 read-modify-write 更新操作是指当前的操作不是一个简单的复制,而是变量的新值依赖于变量的旧值,如自增操作i++.

​ 原子变量类有十二个

分组原子变量类
基础数据型AtomicInteger、AtomicLong、AtomicBoolean
数组型AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
字段更新器AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater
引用型AtomicReference、AtomicStampedReference、AtomicMarlableReference

11.6、使用AtomicLong定义计数器

十二、轻量级同步机制-volatile

12.1、volatile 的作用

​ volatile关键字的作用是使变量在多个线程之间可见,解决了变量的可见性

​ volatile让变量每次在使用的时候,都从主存中取。而不是从各个线程的“工作内存”。volatile变量对于保证每次访问的线程都能得到当前volatile变量的最新值,但是volatile变量并不保证并发的正确性。

注:volatile 关键字仅仅是保证所有线程均从主内存中获取数据,无法保证原子性

12.2、volatile和synchronized的区别

  • volatile 关键字是线程同步的轻量级实现,所以 volatile 性能肯定比 synchronized要好;
  • volatile 只能修饰变量,而 synchronized 可以修饰代码块和方法
  • 随着JDK新版本的发布,synchronized 的执行效率也有了很大的提升,所以开发中synchronized的使用比率还是很大的。
  • 多线程访问 volatile 变量不会发生阻塞,而 synchronized 可能会阻塞;
  • volatile 可以保证数据的可见性,但是不能保证原子性,而synchronized既可以保证原子性,也可以保证可见性。
  • 关键字 volatile 解决的是变量在多个线程之间的可见性;synchronized 可以解决多个线程之间访问公共资源的同步性