JUC
CAS,compare and swap
比较并交换,是一条CPU原子指令cmprxchg,有三个操作数,分别为内存位置、预期原值及更新值,其作用为比较该内存位置的值是否等于预期值,若等于预期值,内存位置的值变为更新值,若不等于预期值,则不进行更新,指令的返回结果是内存位置当前的实际最新值,整个过程是原子操作。
Java中unsafe封装的本地方法使用了CAS,而原子类如AtomicInteger等使用了unsafe相关的本地方法。
CAS本质是一种乐观锁,是一种并发编程的常用方式。
SQL中的条件更新与CAS有异曲同工之妙:update set score=score+1 from table where id=3 and score=59,若多个线程执行该语句,所有语句均可执行成功,但只有第一条执行成功的语句成功更新了score字段的值。
ABA问题
CAS会比较内存地址中的值与预期值是否一致,一致则更新,但若该值原本为A,被另外线程更新为B,然后被又一个线程更新为A,则此时CSA比较时发现符合预期,但实质上发生过变化。
解决方式是使用版本号在变量前面追加上版本号,每次变量更新的时候把版本号加1,那么A->B->A就会变成1A->2B->3A。
因为CAS需要在操作值的时候,检查值有没有发生变化,比如没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时则会发现它的值没有发生变化,但是实际上却变化了。
ABA问题的解决思路就是使用版本号。JDK的Atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet方法的作用是首先检查当前引用是否等于预期引用,并且检查当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
Atomic
AtomicInteger
Lock
AQS
AQS,AbstractQuenedSynchronizer,抽象队列同步,java.util.concurrent.locks.AbstractQuenedSynchronizer。
AQS是一个抽象类,其实现了通用的线程同步机制,并基于模板方法设计模式留白获取资源和释放资源方法的实现,ReentrantLock,Semaphore等同步器都是AQS的子类,可以通过继承AQS实现自定义同步器。
核心思想
AQS持有资源private volatile int private volatile int state,当线程要获取资源时,基于CAS方式尝试获取state,获取到就会修改state,获取不到线程进入sync queue排队等待资源释放。
sync queue基于CLH实现,CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每个请求资源的线程封装成CLH队列的一个结点(Node)来实现资源的等待分配。
ReentrantLock
可重入锁
ReentrantReadWriteLock
Collection
ConcurrentHashMap
JDK1.8使用数组+链表+红黑树数据结构和CAS原子操作实现
CopyOnWriteArrayList
CopyOnWriteArrayList是ArrayList 的一个线程安全的变体,其中所有可变操作(add、set 等等)都是通过对底层数组进行一次新的拷贝来实现的。COW模式的体现。
CopyOnWriteArrayList的缺陷和使用场景
CopyOnWriteArrayList 有几个缺点:
- 由于写操作的时候,需要拷贝数组,会消耗内存,如果原数组的内容比较多的情况下,可能导致young gc或者full gc
- 不能用于实时读的场景,像拷贝数组、新增元素都需要时间,所以调用一个set操作后,读取到数据可能还是旧的,虽然CopyOnWriteArrayList 能做到最终一致性,但是还是没法满足实时性要求;
CopyOnWriteArrayList 合适读多写少的场景,不过这类慎用
因为谁也没法保证CopyOnWriteArrayList 到底要放置多少数据,万一数据稍微有点多,每次add/set都要重新复制数组,这个代价实在太高昂了。在高性能的互联网应用中,这种操作分分钟引起故障。
ThreadPoolExecutor
ThreadPoolExecutor
ScheduledThreadPoolExecutor
Tools
CountDownLatch
让一个或多个线程等待一组任务完成。
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) {
// 初始化 CountDownLatch,参数为需要等待的线程数量
final CountDownLatch latch = new CountDownLatch(3);
// 创建并启动三个线程
for (int i = 0; i < 3; i++) {
new Thread(new Worker(latch, i)).start();
}
try {
// 主线程等待所有线程完成
latch.await();
System.out.println("所有任务完成!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 工作线程
static class Worker implements Runnable {
private final CountDownLatch latch;
private final int taskNumber;
public Worker(CountDownLatch latch, int taskNumber) {
this.latch = latch;
this.taskNumber = taskNumber;
}
@Override
public void run() {
try {
// 模拟任务执行时间
Thread.sleep(1000);
System.out.println("任务 " + taskNumber + " 完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 任务完成后调用countDown()方法
latch.countDown();
}
}
}
}
CountDownLatch被初始化为 3,意味着主线程将等待 3 个工作线程完成。- 创建了三个工作线程,每个线程执行
Worker任务。 - 每个
Worker执行一个任务,完成后调用latch.countDown()来减少CountDownLatch的计数。 - 主线程调用
latch.await()等待所有工作线程完成。当CountDownLatch的计数达到 0 时,await方法返回,主线程继续执行。
CyclicBarrier
一组线程互相等待,直到所有线程都到达某个公共屏障点(barrier)。在所有线程都到达后,这些线程才会继续执行。CyclicBarrier 可以重复使用,因此被称为循环屏障。适合于需要多个线程协同工作,并且需要等待所有线程到达某个点后再一起继续执行的场景。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
public static void main(String[] args) {
// 初始化 CyclicBarrier,参数为需要等待的线程数量
final CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
@Override
public void run() {
System.out.println("所有线程都到达了屏障点,可以继续执行下一步操作了!");
}
});
// 创建并启动三个线程
for (int i = 0; i < 3; i++) {
new Thread(new Worker(barrier, i)).start();
}
}
// 工作线程
static class Worker implements Runnable {
private final CyclicBarrier barrier;
private final int taskNumber;
public Worker(CyclicBarrier barrier, int taskNumber) {
this.barrier = barrier;
this.taskNumber = taskNumber;
}
@Override
public void run() {
try {
// 模拟任务执行时间
System.out.println("线程 " + taskNumber + " 正在执行任务");
Thread.sleep((long) (Math.random() * 3000));
System.out.println("线程 " + taskNumber + " 到达了屏障点");
// 等待其他线程到达屏障点
barrier.await();
// 所有线程都到达屏障点后执行的代码
System.out.println("线程 " + taskNumber + " 继续执行");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
CyclicBarrier被初始化为 3,意味着需要等待 3 个工作线程到达屏障点。CyclicBarrier的第二个参数是一个Runnable对象,当所有线程都到达屏障点时,这个Runnable会被执行。- 创建了三个工作线程,每个线程执行
Worker任务。 - 每个
Worker执行一个任务,到达屏障点后调用barrier.await()来等待其他线程。 - 当所有线程都到达屏障点时,
CyclicBarrier内部的Runnable被执行,然后所有线程继续执行。
Semaphore
适合于需要限制对共享资源并发访问数量的场景。
import java.util.concurrent.Semaphore;
public class SemaphoreExample {
public static void main(String[] args) {
// 创建一个Semaphore对象,参数为允许的并发访问数量
final Semaphore semaphore = new Semaphore(2);
// 创建并启动多个线程
for (int i = 0; i < 5; i++) {
new Thread(new Worker(semaphore), "Thread-" + i).start();
}
}
// 工作线程
static class Worker implements Runnable {
private final Semaphore semaphore;
public Worker(Semaphore semaphore) {
this.semaphore = semaphore;
}
@Override
public void run() {
try {
// 请求一个许可
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " 获得了许可,开始执行任务");
// 模拟任务执行时间
Thread.sleep((long) (Math.random() * 3000));
System.out.println(Thread.currentThread().getName() + " 任务执行完毕,释放许可");
// 释放许可
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Semaphore被初始化为 2,意味着同时只能有 2 个线程访问共享资源。- 创建了五个工作线程,每个线程执行
Worker任务。 - 每个
Worker执行一个任务,在开始任务之前调用semaphore.acquire()来请求一个许可。 - 如果没有可用的许可,线程将等待,直到其他线程释放许可。
- 任务完成后,调用
semaphore.release()来释放许可,这样其他等待的线程就可以继续执行。