java.util.concurrent并发编程包,这个包下都是Java处理线程相关的类

虚假唤醒

多个线程中使用wait方法的时候应始终定义在while中,wait在哪里睡就在哪里醒,会继续往下判断,如果使用的是if只会执行一次

现在有四个线程,AB做加法,CD做减法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class Test {
public static void main(String[] args) {
TestDemo testDemo = new TestDemo();

new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
testDemo.incr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
testDemo.incr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
testDemo.decr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
testDemo.decr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start();
}
}

class TestDemo {
private int number = 0;

public synchronized void incr() throws InterruptedException {
if (number != 0) {
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName() + " : " + number);
this.notifyAll();
}

public synchronized void decr() throws InterruptedException {
if (number == 0) {
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + " : " + number);
this.notifyAll();
}
}

image-20220404170147070

上面的代码会出现虚假唤醒的情况,我们来试着分析一下为什么?

1
2
3
4
5
6
7
假设:
A获取锁执行++;
A再次获取锁判断number!=0,这时候阻塞;
C获取锁执行--;
B获取锁执行++;
A获取锁,从当前位置醒来继续往下执行,又对number进行了++操作,所以得到2
...

为了解决这种情况的发生,我们应该在每次醒来时都进行判断,将if改为while即可:

1
2
3
while (number != 0) {
this.wait();
}

Lock实现案例

Locksynchronized的区别 →Lock是接口而synchronized是关键字,Lock有着比synchronized更广泛的锁的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 创建Lock
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();

public void incr() throws InterruptedException {
lock.lock();
try {
while (number != 0) {
condition.await();
}
number++;
System.out.println(Thread.currentThread().getName() + " : " + number);
condition.signalAll();
} finally {
lock.unlock();
}
}

Condition它用来替代传统的Object的wait ()notify ()实现线程间的协作,依赖于Lock接口,需注意:传统的wait方法会自动释放锁,而使用lock需手动释放

线程集合不安全

集合本身的方法上并没有synchronized 关键字,所以是不安全的,看源码:

1
2
3
4
5
public boolean add(E var1) {
this.ensureCapacityInternal(this.size + 1);
this.elementData[this.size++] = var1;
return true;
}

示例代码:

1
2
3
4
5
6
7
8
List<String> list = new ArrayList<>();

for (int i = 0; i < 30; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list);
}, String.valueOf(i)).start();
}

执行上面的代码会得到一个ConcurrentModificationException 异常,由于集合中的方法并不是同步的,所以在多个线程同时写的时候就会抛出异常,如何解决呢?

方案一:使用Vector解决并发修改异常

1
List<String> list = new Vector<>();

方案二:使用Collections解决并发修改异常

1
List<String> list = Collections.synchronizedList(new ArrayList<>());

方案三:使用CopyOnWriteArrayList解决并发修改异常

前面两种方法其实并不常用,一般都是通过写时复制技术来解决,那何为写时复制呢?

集合在每次写的时候都会将元素复制一份出来,在新的集合中写,然后再合并,这样就实现了单写多读的操作

1
List<String> list = new CopyOnWriteArrayList();

HashSet和HashMap线程不安全

跟集合一样,方法也没有synchronized关键字,也会得到并发修改异常,所以要通过写时复制技术来单写多读

HashSet:

1
2
// 通过CopyOnWriteArraySet解决
Set<String> set = new CopyOnWriteArraySet<>();

HashMap:

1
2
// 通过ConcurrentHashMap解决
Map<String, String> map = new ConcurrentHashMap<>();

多线程锁

公平锁和非公平锁

公平锁:多个线程都能得到执行

非公平锁:谁先抢到谁就执行,其他线程不能执行

ReentrantLock 来配置公平锁或非公平锁:

1
2
3
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

可以看到源码中通过truefalse来配置锁

可重入锁

synchronizedLock都是可重入锁,可重入锁即可多次获得该锁

就比如我们回家,用钥匙开门之后就能随意进出房间了

1
2
3
4
5
6
7
8
9
10
11
12
Object o = new Object();
new Thread(() -> {
synchronized (o) {
System.out.println(Thread.currentThread().getName() + " 外层");
synchronized (o) {
System.out.println(Thread.currentThread().getName() + " 中层");
synchronized (o) {
System.out.println(Thread.currentThread().getName() + " 内层");
}
}
}
}, "t1").start();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ReentrantLock lock = new ReentrantLock();
new Thread(() -> {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + " 外层");
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + " 内层");
} finally {
lock.unlock();
}
} finally {
lock.unlock();
}
}, "t1").start();

死锁

两个或两个以上线程,因争夺资源造成互相等待的现象,需外力干涉来避免死锁

产生死锁的原因:

  • 资源系统不足

  • 进程运行推进顺序不合适

  • 资源分配不当

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static void main(String[] args) {
new Thread(() -> {
synchronized (a) {
try {
System.out.println(Thread.currentThread().getName() + " waiting...");
TimeUnit.SECONDS.sleep(2);
synchronized (b) {
System.out.println(Thread.currentThread().getName() + " get b");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "线程A").start();
new Thread(() -> {
synchronized (b) {
try {
System.out.println(Thread.currentThread().getName() + " waiting...");
TimeUnit.SECONDS.sleep(2);
synchronized (a) {
System.out.println(Thread.currentThread().getName() + " get a");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "线程B").start();
}

两个线程都在尝试获取对方线程资源,就造成了死锁,这是通过代码输出来判断是否为死锁,JDK中有一个堆栈跟踪工具,可以通过命令查看是否为死锁

Callable

Runnable接口缺失了一项功能,当线程终止时,无法获得线程返回的结果,为了支持此功能,Java中提供了Callable接口

这两个接口之间的区别主要是:

  1. 是否有返回值
  2. 是否抛出异常
  3. 实现方法名称不同,一个是run,一个是call
1
2
3
4
5
6
7
class Demo implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("test callable...");
return "hello";
}
}

使用Callable 就不能直接用Thread来创建线程了,需要使用FutureTask

1
2
3
FutureTask<String> task = new FutureTask<>(new Demo());
new Thread(task, "callable").start();
System.out.println(task.get()); // 获取call()中的返回值

强大的辅助类

CountDownLatch减少计数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(3);

for (int i = 1; i <= 3; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "号同学离开");

// 计数器-1
countDownLatch.countDown();
}, String.valueOf(i)).start();
}

// 当计数器没有变为0时就会一直等待
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "班长锁门离开了");
}
}

班长总是在最后一个才离开,这就是CountDownLatch 的作用

CyclicBarrier循环栅栏

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class CyclicBarrierDemo {

private static final intNUMBER= 7;

public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER, () -> {
System.out.println("恭喜你集齐七颗龙珠");
});

for (int i = 1; i <= 7; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "颗龙珠");
// 等待
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
}

只有在集齐七颗龙珠后才会执行CyclicBarrier 中的方法

Semaphore信号灯

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class SemaphoreDemo {
public static void main(String[] args) {
// 设置许可数量,只有三个车位
Semaphore semaphore = new Semaphore(3);

// 模拟六辆汽车
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
try {
// 抢占车位
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "号车抢到了车位");
// 设置随机停车时间
TimeUnit.SECONDS.sleep(new Random().nextInt(5));
System.out.println(Thread.currentThread().getName() + "号车离开了车位");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放车位
semaphore.release();
}
}, String.valueOf(i)).start();
}
}
}

用信号灯模拟停车的场景,只有三个车位,只有当某个车位的车离开了之后,其他的车才能抢占车位

读写锁

在多线程环境下对资源进行读写操作的时候,是可能会发生死锁的,需要用Java提供的读写锁来上锁和解锁,读写锁在读的时候是不能进行写操作的。

写锁:独占锁(一次只能一个线程进行写操作),读锁:共享锁(可多个线程进行读操作)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class Resource {
private Map<String, Object> map = new HashMap<>();

private ReadWriteLock lock = new ReentrantReadWriteLock();

public void put(String key, Object value) {
// 添加写锁
lock.writeLock().lock();

System.out.println(Thread.currentThread().getName() + "正在写操作" + key);
try {
TimeUnit.MICROSECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
// 释放锁
lock.writeLock().unlock();
}
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "写完了" + key);
}

public Object get(String key) {
// 添加读锁
lock.readLock().lock();

Object result = null;
System.out.println(Thread.currentThread().getName() + "正在读操作" + key);
try {
TimeUnit.MICROSECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.readLock().unlock();
}
result = map.get(key);
System.out.println(Thread.currentThread().getName() + "读完了" + key);
return result;
}
}

锁降级:

读写锁在读的时候是不能进行写操作的。我们可以将写锁降为读锁,读锁不能升级为写锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class DowngradeDemo {
public static void main(String[] args) {
ReadWriteLock lock = new ReentrantReadWriteLock();
Lock writeLock = lock.writeLock();
Lock readLock = lock.readLock();

// 锁降级
// 1.获取写锁
writeLock.lock();
System.out.println("write");
// 2.获取读锁
readLock.lock();
System.out.println("read");
// 3.释放写锁和读锁
writeLock.unlock();
readLock.unlock();
}
}

阻塞队列

当队列为空时,获取元素将阻塞,直到插入新的元素,当队列满时,添加元素将阻塞

使用阻塞队列的好处就是,我们不需要关心什么时候阻塞线程,什么时候唤醒线程,这些操作都交给BlockingQueue来做

1
2
3
4
5
6
7
8
// 创建阻塞队列
BlockingQueue<Object> queue = new ArrayBlockingQueue<>(3);

queue.add("a")
queue.add("b")
queue.add("c")
// Queue full
queue.add("d")

线程池

一种线程使用模式,维护着多个线程,等待着监督管理,避免了频繁创建与销毁线程的代价,不仅能保证内核的充分利用,还能防止过分调度

线程池使用方式

通过Executors 工具类来创建线程

Executors.newFixedThreadPool(): 一池N线程

Executors.newSingleThreadExecutor(): 一池一线程

Executors.newCachedThreadPool(): 根据需求创建线程,可扩容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ThreadPoolDemo {
public static void main(String[] args) {
// 一池N线程
ExecutorService threadPool1 = Executors.newFixedThreadPool(5);

// 一池一线程
ExecutorService threadPool2 = Executors.newSingleThreadExecutor();

// 一池可扩容线程
ExecutorService threadPool3 = Executors.newCachedThreadPool();

// 10个客户请求
for (int i = 1; i <=10 ; i++) {
// 执行
threadPool3.execute(()->{
System.out.println(Thread.currentThread().getName()+"正在办理业务");
});
}

threadPool3.shutdown();
}
}

查看源码可以发现Executors调用的方法底层都使用了ThreadPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

构造方法中有7个参数,分别是什么意思呢?

corePoolSize :核心(常驻)的线程数量,比如一个银行有10个窗口,平时只开放5个窗口

maximumPoolSize:最大线程数量,就好比银行一共有10个窗口

keepAliveTime :线程存活时间

unit :搭配keepAliveTime 设置线程存活时间

workQueue :阻塞队列

threadFactory :用于创建线程

handler :拒绝策略(多种)

线程池的工作流程和拒绝策略

上面的流程图即为线程池的工作流程:首先通过execute()来创建一个池子,核心线程数为2,如果要创建第三个线程,就会放到workQueue中等待,当workQueue满时就会创建新的线程直到

maximumPoolSize满,当maximumPoolSize满时就会执行拒绝策略。

JDK内置的拒绝策略:

AbortPolicy :抛出 RejectedExecutionException来拒绝新任务的处理。

CallerRunsPolicy :“调用者运行”一种调节机制,该策略不会抛弃任务和异常,而是将某些任务回退到调用者,降低新任务的流量。

DiscardPolicy :抛弃队列中等待最久的任务,然后把当前任务添加到队列中,尝试再次提交当前任务。

DiscardOldestPolicy :该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常,如果允许任务丢失,那这是最好的一种策略。

自定义线程

一般都是用自定义线程,在阿里巴巴开发手册中线程池不允许用Executors去创建,而是通过ThreadPoolExecutor 的方式,这样的处理方式让写的人更加明确线程池的运行规则,规避资源耗尽的风险。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class CustomThreadPoolDemo {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 5, 2, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy());

// 10个客户请求
for (int i = 1; i <=10 ; i++) {
// 执行
threadPoolExecutor.execute(()->{
System.out.println(Thread.currentThread().getName()+"正在办理业务");
});
}

threadPoolExecutor.shutdown();
}
}

分支合并框架(Fork/Join)

可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class ForkJoinDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyTask task = new MyTask(1, 100);

// 创建分支合并池对象
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> submit = forkJoinPool.submit(task);
// 获取最终合并之后的结果
System.out.println(submit.get());
forkJoinPool.shutdown();
}
}

class MyTask extends RecursiveTask<Integer> {

// 拆分时差值不能大于10
private static final Integer VALUE= 10;
private int begin;
private int end;
private int result;

public MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}

// 拆分和合并的过程
@Override
protected Integer compute() {
if (end - begin <=VALUE) {
// 相加
for (int i = begin; i <= end; i++) {
result = result + i;
}
} else {
// 进一步做拆分
// 获取中间值
int middle = (begin + end) / 2;
// 拆分左边
MyTask task1 = new MyTask(begin, middle);
// 拆分右边
MyTask task2 = new MyTask(middle + 1, end);

task1.fork();
task2.fork();
// 合并结果
result = task1.join() + task2.join();
}
return result;
}
}

异步回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class AsynchronousCallbackDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 异步调用,无返回值
CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + "completableFuture1");
});
completableFuture1.get();

// 异步调用,有返回值
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "completableFuture2");
return 1024;
});
completableFuture2.whenComplete((result, exception) -> {
System.out.println("--t--" + result); // 方法返回值
System.out.println("--u--" + exception); // 异常信息
}).get();
}
}