《第3章 多线程》节略部分
线程池参数
package cn.javabook.chapter03.pool;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
* 带注释的ThreadPoolExecutor
*
*/
public class ThreadPoolExecutor1 {
public ThreadPoolExecutor1(
/*
* corePoolSize:初始化时指定的核心线程数,包括空闲线程,必须大于等于0,当有新任务提交时,会执行以下判断(workCount为当前活跃的线程数量):
* 当workCount< corePoolSize:即使线程池中有空闲线程,也会创建新线程
* 当corePoolSize ≤ workCount < maximumPoolSize:只有workQueue满时才创建新线程
* 当corePoolSize < workCount < maximumPoolSize:且超过corePoolSize部分的线程空闲时间达到keepAliveTime时,就回收这些线程,当设置allowCoreThreadTimeOut(true)时,
* 线程池中corePoolSize范围内的线程空闲时间达到keepAliveTime也将被回收
* 当设置corePoolSize == maximumPoolSize:线程池的大小固定,此时如有新任务提交,且workQueue未满时,会将请求放入workQueue,等待有空闲的线程从workQueue中取任务并处理
* 当workCount ≥ maximumPoolSize:若workQueue满,则采取handler对应的策略
*
*/
int corePoolSize,
// maximumPoolSize:初始化时指定的最大线程数量
int maximumPoolSize,
// keepAliveTime:线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize时,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是等待,直到等待的时间超过了keepAliveTime
long keepAliveTime,
// 空闲时间单位
TimeUnit unit,
/*
* workQueue:阻塞队列的类型是保存等待执行的任务的阻塞队列,主要有四种提交方式:
* SynchronousQueue:同步队列,这个“队列”内部只包含了一个元素,队列的size始终为0,每执行一个put,就需要一个take来解除阻塞,反之也一样。饱和状态下,线程池能处理的最大线程数量为maximumPoolSize
* 使用SynchronousQueue队列,提交的任务不会保存,而是会马上提交执行
* 需要对程序的并发量有个准确的评估,才能设置合适的maximumPoolSize数量,否则很容易就会执行拒绝策略
* ArrayBlockingQueue:有界任务队列,饱和状态下,线程池能处理的最大线程数量为maximumPoolSize + ArrayBlockingQueue.SIZE
* LinkedBlockingQueue:无界任务队列,线程池的任务队列可以无限制的添加新的任务,此时线程池能够创建的最大线程数是corePoolSize,
* 而maximumPoolSize就无效了,线程池饱和状态下能处理的最大线程数量只取决于系统的性能
* PriorityBlockingQueue:优先任务队列,同LinkedBlockingQueue一样,它也是一个无界的任务队列,只不过需要自己实现元素的Comparable排序接口
*
*/
BlockingQueue<Runnable> workQueue,
// threadFactory:创建新线程,使新创建的线程有相同的优先级且为非守护线程,同时设置线程的名称,默认使用Executors.DefaultThreadFactory类创建
ThreadFactory threadFactory,
/*
* handler:表示线程池的饱和策略,意思就是如果阻塞队列满了并且没有空闲的线程,此时如果继续提交任务,就需要采取一种策略处理该任务,线程池提供了4种策略
* AbortPolicy:直接抛出异常,这是默认策略
* CallerRunsPolicy:如果线程池的线程数量达到上限,则把任务队列中的任务放在调用者的线程当运行
* DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务
* DiscardPolicy:直接丢弃任务
*/
RejectedExecutionHandler handler) {
// balabala… …
}
}
四大线程池
除了可以通过ThreadPoolExecutor
自定义线程池外,Java已经预先定义了一些常用的线程池,不必考虑用什么阻塞队列合适的问题。
在Java 8
之前,存在四大传统线程池。
newFixedThreadPool
:定长线程池。newCachedThreadPool
:缓存线程池。newSingleThreadExecutor
:单线程线程池。newScheduledThreadPool
:任务调度线程池。
在Java 8
中新增了newWorkStealingPool
抢占式线程池,但目前这个用的不多,还是以之前的四大类线程池为主。在这四大
之中,又以newFixedThreadPool
出现的频率较高。
图3-28 newFixedThreadPool线程池的结构
newFixedThreadPool线程池使用的是LinkedBlockingQueue无界阻塞队列,而且corePoolSize = maximumPoolSize,数量在创建时指定。

package com.java.book.chapter03.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* "四大"线程池
*
*/
public class FourBigThreadPool {
public static void main(String[] args) {
// 定长线程池
ExecutorService service1 = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
System.out.println("创建线程 " + i);
service1.execute(() -> System.out.println("当前线程 " + Thread.currentThread().getName()));
}
service1.shutdown();
// 缓存线程池
ExecutorService service2 = Executors.newCachedThreadPool();
for (int i = 0; i < 1_000_000_000; i++) {
System.out.println("创建线程 " + i);
service2.execute(() -> System.out.println("当前线程 " + Thread.currentThread().getName()));
}
service2.shutdown();
// 单线程线程池
ExecutorService service3 = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
System.out.println("创建线程 " + i);
service3.execute(() -> System.out.println("当前线程 " + Thread.currentThread().getName()));
}
service3.shutdown();
// 任务调度线程池
ScheduledExecutorService service4 = Executors.newScheduledThreadPool(3);
for (int i = 0; i < 10; i++) {
// 延迟1秒后执行,仅执行1次
service4.schedule(() -> System.out.println("当前线程 " + Thread.currentThread().getName()),
1,
TimeUnit.SECONDS);
}
service4.shutdown();
// 任务调度线程池
ScheduledExecutorService service5 = Executors.newScheduledThreadPool(3);
for (int i = 0; i < 10; i++) {
// 延迟1秒后执行,每3秒执行1次
service4.scheduleAtFixedRate(() -> System.out.println("当前线程 " + Thread.currentThread().getName()),
1,
3,
TimeUnit.SECONDS);
}
service5.shutdown();
}
}
newCachedThreadPool
不会限制线程池中的线程数量,可以无限往里增加,直到线程数量超出最大整数范围或者把机器拖垮为止。
它在这里创造了10亿个线程,计算机都照单全收,运行了很长时间都没有停下来,所以笔者也只好强迫它休息了。
CAS
因为整个java.util.concurrent
工具包都是建立在CAS
之上的,尤其是Java中大多数锁的实现基类AbstractQueuedSynchronizer
,也是以CAS
为基础,提供了一系列的独占锁、共享锁、可重入锁、自旋锁、读写锁等线程控制手段。所以从某种程度上来说,CAS
可以用来取代synchronized
的强制同步,提升性能。

CAS
的实现都在java.util.concurrent.atomic
包下。以AtomicInteger
为例,从源码可以看出CAS
操作都是通过sun
包下Unsafe
类完成的,而Unsafe类
中的方法都是native
方法,由本地实现,和操作系统、CPU都有关系。

CAS
相关类继承结构如下图所示。

锁与AQS
悲观锁与乐观锁处理方式的不同如下图所示。

自旋锁与非自旋锁流程的不同如下图所示。

JUC
在前几节中已经把线程生命周期
、常见Thread API
、线程关键字
、线程池
、CAS
、锁和AQS
都撸过了一遍,现在就剩下怎么来用它了。其实想用好多线程也是可以取一些巧的,比如好多面试官喜欢问的JUC
,就是一种很有效的取巧方式。因为许多可以应用AQS
的场景相对来说是较为固定的,比如前面讲过的排队摇号器。而如果每次都要自己从头来实现AQS
,无疑是一件效率比较低下的事情。这正是JUC
可以提供方便的地方。JUC
是java.util.concurrent
的首字母缩写,是Java所能提供的各种并发工具类的统称。它主要分为下面这几类。
同步器。
线程安全的容器。
阻塞队列。
一些特殊的类。

在它们之中,应用较为广泛的当属同步器、阻塞队列和线程安全的容器。因为阻塞队列之前已有说明,故此下面就来盘一盘同步器和线程安全的容器这两类。
四大同步器
所谓同步器
,就像有事大家商量好了一起办一样:诸多线程统一在同一个容器之中,彼此速度虽有差别,但执行任务时却能步调一致,进退有序。
主要的JUC
同步器有四个。
CountDownLatch
:字面意思是倒计时,如果开发的系统功能中有倒计时的需求,那么CountDownLatch
是最合适的工具。它还有一个大家耳熟能详的别称:发令枪
。比如当裁判员喊开始的时候,或者火箭点火发射的时候,所有的人、设备、部门都会依次检查确认自己是否准备好行动,只有当全部都确认准备好了才能开始下一个动作,也就是等到条件满足的时候(一般是倒计时结束),就开始执行预设动作。Semaphore
:字面意思是信号量
,好比马路上的红绿灯,或者就餐排队时餐馆发的数字序号,一次只允许若干个人过马路或者就餐(信号量可能不太好理解,笔者更倾向于叫它摇号机或叫号器)。CyclicBarrier
:字面意思是屏障
或者栅栏
,与CountDownLatch
比较像,但它更侧重于工作本身,即指定的若干个任务都满足考核标准(也就是屏障)之后,才能继续进行下面的工作,且可反复使用。这有点像一个研发团队,只有当所有人都确认完设计原型后,才能开始进入开发;只有全部开发完成之后,才能开始进行系统集成测试等。Exchanger
:字面意思就是交换机
,用于线程之间交换数据。即当某个线程想与另一个线程交换数据时,就可以通过它来实现。
下面一个个地来演示它们的用法。
首先是CountDownLatch发令枪
组件。

T1 ~ T5
5个线程依次确认就绪,全部准备好之后主线程才能继续执行。
package com.java.book.chapter03.juc;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* CountDownLatch"发令枪"
*
*/
public class CountDownLatchTester implements Runnable {
static final CountDownLatch latch = new CountDownLatch(10);
@Override
public void run() {
// 检查任务
try {
System.out.println(Thread.currentThread().getName() + " 检查完毕!");
} catch (Exception e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 10; i > 0; i--) {
Thread.sleep(1000);
executor.submit(new CountDownLatchTester());
System.out.println(i);
}
TimeUnit.MILLISECONDS.sleep(1000);
// 最后检查确认
latch.await();
System.out.println();
System.out.println("点火,发射!");
// 关闭线程池
executor.shutdown();
}
}
从结果可以看到,正如火箭发射一样,在倒计时过程中,各部门依次确认是否正常可发射。全部确认OK后才能执行点火
动作。
Semaphore摇号机
组件的功能如下图所示。

T1 ~ T10
这10个线程分别从Semaphore
中通过,一次只允许通过指定的个数。前面用AQS
自定义的方式实现了摇号机,现在再用Semaphore
直接来实现它。
package com.java.book.chapter03.juc;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* Semaphore"信号量"或"摇号机"
*
*/
public class SemaphoreTester implements Runnable {
static final Semaphore semaphore = new Semaphore(3);
@Override
public void run() {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " 开始进餐");
TimeUnit.MILLISECONDS.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
semaphore.release();
}
public static void main(String[] args) {
ExecutorService excutor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
excutor.submit(new SemaphoreTester());
}
excutor.shutdown();
}
接下来是CyclicBarrier栅栏
组件。

CyclicBarrier
有点类似于接力赛。
当最后一名选手拿过接力棒的时候,才算全部接力完成。
每个选手拿一次接力棒,参赛人数就减1。
接力赛的成败,由最后一名选手确定。
package com.java.book.chapter03.juc;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* CyclicBarrier"栅栏"或"接力赛"
*
*/
public class CyclicBarrierTester implements Runnable {
private final static CyclicBarrier barrier = new CyclicBarrier(3);
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(1000);
System.out.println(Thread.currentThread().getName() + " 已接棒,等待结果...");
// 只有最后一个线程执行后,所有的线程才能执行 2 所代表的动作
barrier.await();
TimeUnit.MILLISECONDS.sleep(1000);
// 2 所有线程都会执行的动作
System.out.println(Thread.currentThread().getName() + " 已成功拿下第一");
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
executor.submit(new CyclicBarrierTester());
}
// 关闭线程池
executor.shutdown();
}
}
最后是Exchanger交换机
,其功能示意如下图所示。

package com.java.book.chapter03.juc;
import java.util.concurrent.Exchanger;
/**
* Exchanger"交换机"
*
*/
public class ExchangerTester implements Runnable {
Exchanger<Object> exchanger = null;
Object object = null;
public ExchangerTester(Exchanger<Object> exchanger, Object object) {
this.exchanger = exchanger;
this.object = object;
}
@Override
public void run() {
try {
Object previous = this.object;
this.object = this.exchanger.exchange(this.object);
System.out.println(Thread.currentThread().getName() + " 用对象 " + previous + " 交换对象 " + this.object);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Exchanger<Object> exchanger = new Exchanger<Object>();
ExchangerTester tester1 = new ExchangerTester(exchanger, "A");
ExchangerTester tester2 = new ExchangerTester(exchanger, "B");
new Thread(tester1).start();
new Thread(tester2).start();
}
}
安全的容器
线程安全容器的集合类继承结构如下图所示。

ConcurrentHashMap
继承自抽象类AbstractMap
并实现了ConcurrentMap
接口,它是线程安全的Map
,侧重于放入或者获取的速度,而不在乎顺序。
因为现在的Java应用基本上都升级到了JDK 1.8
以上,有的甚至和官方保持同步,在JDK 1.7
及以下版本会出现的问题,在新的版本中都修复了,而且性能越来越好。所以只需要知道一点:在并发量比较高的环境中尽量使用ConcurrentHashMap
。
为了有个直观的印象ConcurrentHashMap
到底是怎么个线程安全法,这里笔者拿之前的项目代码做演示。电商里为了造一些刷单数据,需要模拟真实用户
,而这些所谓的真实用户其实也是模拟出来的,
package com.java.book.chapter03.juc;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* 随机姓名产生器
*
*/
public class GenerateRandomInfo {
private static final String firstname = "赵钱孙李周吴郑王冯陈褚卫蒋沈韩杨朱秦尤许何吕施张孔曹严华金魏陶姜戚谢邹喻水云苏潘" +
"葛奚范彭郎鲁韦昌马苗凤花方俞任袁柳鲍史唐费岑薛雷贺倪汤滕殷罗毕郝邬安常乐于时傅卞齐康伍余元卜顾孟平黄和穆萧尹姚邵湛汪祁毛" +
"禹狄米贝明臧计成戴宋茅庞熊纪舒屈项祝董粱杜阮席季麻强贾路娄危江童颜郭梅盛林刁钟徐邱骆高夏蔡田胡凌霍万柯卢莫房缪干解应宗丁" +
"宣邓郁单杭洪包诸左石崔吉龚程邢滑裴陆荣翁荀羊甄家封芮储靳邴松井富乌焦巴弓牧隗山谷车侯伊宁仇祖武符刘景詹束龙叶幸司韶黎乔苍" +
"双闻莘劳逄姬冉宰桂牛寿通边燕冀尚农温庄晏瞿茹习鱼容向古戈终居衡步都耿满弘国文东殴沃曾关红游盖益桓公晋楚闫";
private static final String lastname = "伟刚勇毅俊秀娟英华慧巧美娜静淑惠珠翠雅芝玉萍红娥玲芬芳燕彩春菊兰凤洁梅琳素云莲真环" +
"雪荣爱妹霞香月莺媛艳瑞凡佳嘉琼勤珍贞莉桂娣叶璧璐娅琦晶妍茜秋珊莎锦黛青倩婷姣婉娴瑾颖露瑶怡婵雁蓓纨仪荷丹蓉眉君琴蕊薇菁梦" +
"岚苑婕馨瑗琰韵融园艺咏卿聪澜纯毓悦昭冰爽琬茗羽希宁欣飘育滢馥筠柔竹霭凝晓欢霄枫芸菲寒伊亚宜可姬舒影荔枝思丽峰强军平保东文" +
"辉力明永健世广志义兴良海山仁波宁贵福生龙元全国胜学祥才发武新利清飞彬富顺信子杰涛昌成康星光天达安岩中茂进林有坚和彪博诚先" +
"敬震振壮会思群豪心邦承乐绍功松善厚庆磊民友裕河哲江超浩亮政谦亨奇固之轮翰朗伯宏言若鸣朋斌梁栋维启克伦翔旭鹏泽晨辰士以建家" +
"致树炎德行时泰盛雄琛钧冠策腾楠榕风航弘";
/**
* 随机产生姓氏
*
*/
public static String getFirstName() {
int strLen = firstname.length();
int index = new Random().nextInt(strLen - 1);
return firstname.substring(index, index + 1);
}
/**
* 随机产生名字
*
*/
public static String getLastName() {
int strLen = lastname.length();
int index = new Random().nextInt(strLen - 1);
return lastname.substring(index, index + 2);
}
public static void count(final String mapName, final List<String> list, final Map<String, Integer> map) {
long start = System.currentTimeMillis();
// 这里为了比较效率,特意调大了人数
for (int i = 0; i < 10_000_000; i++) {
String name = getFirstName() + getLastName();
list.add(name);
}
System.out.println(mapName + " 产生了 " + list.size() + " 个名字");
for (int i = 0; i < 5; i++) {
new Thread(() -> {
list.stream().map(str -> str.substring(0, 1)).forEach(str -> {
Integer count = 1;
if (map.containsKey(str)) {
// 多线程情况下,普通的HashMap的get()方法得到的结果可能为null
// 但ConcurrentHashMap一定不会是null
count = map.get(str);
if (null == count) {
System.out.println("count == null");
}
++count;
}
map.put(str, count);
});
System.out.println("总共 " + map.size() + " 个姓氏");
}).start();
}
int count = 0;
for(Map.Entry<String, Integer> entry : map.entrySet()) {
count = count + entry.getValue();
}
long end = System.currentTimeMillis();
// 最后统计名字总数
System.out.println("执行时间:" + (end - start) + " 毫秒");
}
public static void main(String[] args) throws IOException {
List<String> list = new ArrayList<>();
Map<String, Integer> map = new HashMap<>();
count("HashMap", list, map);
System.out.println("========================");
list = new ArrayList<>();
map = new ConcurrentHashMap<>();
count("ConcurrentHashMap", list, map);
}
}
执行后的结果打印如下。
HashMap 产生了 10000000 个名字
count == null
执行时间:3321 毫秒
========================
Exception in thread "Thread-3" java.lang.NullPointerException
......
总共 281 个姓氏
总共 281 个姓氏
总共 281 个姓氏
总共 281 个姓氏
ConcurrentHashMap 产生了 10000000 个名字
执行时间:4181 毫秒
总共 271 个姓氏
总共 271 个姓氏
总共 271 个姓氏
总共 271 个姓氏
总共 271 个姓氏
从结果可以非常清楚地看到,ConcurrentHashMap
执行正常而HashMap
却出现了null
,这在代码中也明确地指了出来。
JUC
中一些其他的类,例如,ForkJoinPool
,可以把任务分解成更小的任务,被分解出来任务还会被继续分解成更小的任务,更形象地说法是分形器
。而RecursiveTask
是一种会返回结果的任务,它可以将自己分解成若干更小的任务,并将这些任务的执行结果合并到一个结果里。不过在实际开发中这两种工具类出现的频率不是很高。
算上阻塞队列中的ArrayBlockingQueue
和LinkedBlockingQueue
,整个JUC
里面最重要的就是这3大部分7个Java类,掌握好它们,在多线程开发中就能如虎添翼。
感谢支持
更多内容,请移步《超级个体》。