10.并发编程

予早 2025-02-21 01:08:21
Categories: Tags:

JUC

CAS,compare and swap

比较并交换,是一条CPU原子指令cmprxchg,有三个操作数,分别为内存位置、预期原值及更新值,其作用为比较该内存位置的值是否等于预期值,若等于预期值,内存位置的值变为更新值,若不等于预期值,则不进行更新,指令的返回结果是内存位置当前的实际最新值,整个过程是原子操作。

Java中unsafe封装的本地方法使用了CAS,而原子类如AtomicInteger等使用了unsafe相关的本地方法。

CAS本质是一种乐观锁,是一种并发编程的常用方式。

SQL中的条件更新与CAS有异曲同工之妙:update set score=score+1 from table where id=3 and score=59,若多个线程执行该语句,所有语句均可执行成功,但只有第一条执行成功的语句成功更新了score字段的值。

ABA问题

CAS会比较内存地址中的值与预期值是否一致,一致则更新,但若该值原本为A,被另外线程更新为B,然后被又一个线程更新为A,则此时CSA比较时发现符合预期,但实质上发生过变化。

解决方式是使用版本号在变量前面追加上版本号,每次变量更新的时候把版本号加1,那么A->B->A就会变成1A->2B->3A。

因为CAS需要在操作值的时候,检查值有没有发生变化,比如没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时则会发现它的值没有发生变化,但是实际上却变化了。

ABA问题的解决思路就是使用版本号。JDK的Atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet方法的作用是首先检查当前引用是否等于预期引用,并且检查当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。

Atomic

AtomicInteger

Lock

AQS

AQS,AbstractQuenedSynchronizer,抽象队列同步,java.util.concurrent.locks.AbstractQuenedSynchronizer。

AQS是一个抽象类,其实现了通用的线程同步机制,并基于模板方法设计模式留白获取资源和释放资源方法的实现,ReentrantLock,Semaphore等同步器都是AQS的子类,可以通过继承AQS实现自定义同步器。

核心思想

AQS持有资源private volatile int private volatile int state,当线程要获取资源时,基于CAS方式尝试获取state,获取到就会修改state,获取不到线程进入sync queue排队等待资源释放。

sync queue基于CLH实现,CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每个请求资源的线程封装成CLH队列的一个结点(Node)来实现资源的等待分配。

ReentrantLock

可重入锁

ReentrantReadWriteLock

Collection

ConcurrentHashMap

JDK1.8使用数组+链表+红黑树数据结构和CAS原子操作实现

CopyOnWriteArrayList

CopyOnWriteArrayList是ArrayList 的一个线程安全的变体,其中所有可变操作(add、set 等等)都是通过对底层数组进行一次新的拷贝来实现的。COW模式的体现。

CopyOnWriteArrayList的缺陷和使用场景

CopyOnWriteArrayList 有几个缺点:

CopyOnWriteArrayList 合适读多写少的场景,不过这类慎用

因为谁也没法保证CopyOnWriteArrayList 到底要放置多少数据,万一数据稍微有点多,每次add/set都要重新复制数组,这个代价实在太高昂了。在高性能的互联网应用中,这种操作分分钟引起故障。

ThreadPoolExecutor

ThreadPoolExecutor

ScheduledThreadPoolExecutor

Tools

CountDownLatch

让一个或多个线程等待一组任务完成。

import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample {
    public static void main(String[] args) {
        // 初始化 CountDownLatch,参数为需要等待的线程数量
        final CountDownLatch latch = new CountDownLatch(3);

        // 创建并启动三个线程
        for (int i = 0; i < 3; i++) {
            new Thread(new Worker(latch, i)).start();
        }

        try {
            // 主线程等待所有线程完成
            latch.await();
            System.out.println("所有任务完成!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 工作线程
    static class Worker implements Runnable {
        private final CountDownLatch latch;
        private final int taskNumber;

        public Worker(CountDownLatch latch, int taskNumber) {
            this.latch = latch;
            this.taskNumber = taskNumber;
        }

        @Override
        public void run() {
            try {
                // 模拟任务执行时间
                Thread.sleep(1000);
                System.out.println("任务 " + taskNumber + " 完成");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 任务完成后调用countDown()方法
                latch.countDown();
            }
        }
    }
}
  1. CountDownLatch 被初始化为 3,意味着主线程将等待 3 个工作线程完成。
  2. 创建了三个工作线程,每个线程执行 Worker 任务。
  3. 每个 Worker 执行一个任务,完成后调用 latch.countDown() 来减少 CountDownLatch 的计数。
  4. 主线程调用 latch.await() 等待所有工作线程完成。当 CountDownLatch 的计数达到 0 时,await 方法返回,主线程继续执行。

CyclicBarrier

一组线程互相等待,直到所有线程都到达某个公共屏障点(barrier)。在所有线程都到达后,这些线程才会继续执行。CyclicBarrier 可以重复使用,因此被称为循环屏障。适合于需要多个线程协同工作,并且需要等待所有线程到达某个点后再一起继续执行的场景。

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {
    public static void main(String[] args) {
        // 初始化 CyclicBarrier,参数为需要等待的线程数量
        final CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
            @Override
            public void run() {
                System.out.println("所有线程都到达了屏障点,可以继续执行下一步操作了!");
            }
        });

        // 创建并启动三个线程
        for (int i = 0; i < 3; i++) {
            new Thread(new Worker(barrier, i)).start();
        }
    }

    // 工作线程
    static class Worker implements Runnable {
        private final CyclicBarrier barrier;
        private final int taskNumber;

        public Worker(CyclicBarrier barrier, int taskNumber) {
            this.barrier = barrier;
            this.taskNumber = taskNumber;
        }

        @Override
        public void run() {
            try {
                // 模拟任务执行时间
                System.out.println("线程 " + taskNumber + " 正在执行任务");
                Thread.sleep((long) (Math.random() * 3000));
                System.out.println("线程 " + taskNumber + " 到达了屏障点");
                // 等待其他线程到达屏障点
                barrier.await();
                // 所有线程都到达屏障点后执行的代码
                System.out.println("线程 " + taskNumber + " 继续执行");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}
  1. CyclicBarrier 被初始化为 3,意味着需要等待 3 个工作线程到达屏障点。
  2. CyclicBarrier 的第二个参数是一个 Runnable 对象,当所有线程都到达屏障点时,这个 Runnable 会被执行。
  3. 创建了三个工作线程,每个线程执行 Worker 任务。
  4. 每个 Worker 执行一个任务,到达屏障点后调用 barrier.await() 来等待其他线程。
  5. 当所有线程都到达屏障点时,CyclicBarrier 内部的 Runnable 被执行,然后所有线程继续执行。

Semaphore

适合于需要限制对共享资源并发访问数量的场景。

import java.util.concurrent.Semaphore;

public class SemaphoreExample {
    public static void main(String[] args) {
        // 创建一个Semaphore对象,参数为允许的并发访问数量
        final Semaphore semaphore = new Semaphore(2);

        // 创建并启动多个线程
        for (int i = 0; i < 5; i++) {
            new Thread(new Worker(semaphore), "Thread-" + i).start();
        }
    }

    // 工作线程
    static class Worker implements Runnable {
        private final Semaphore semaphore;

        public Worker(Semaphore semaphore) {
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            try {
                // 请求一个许可
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName() + " 获得了许可,开始执行任务");
                // 模拟任务执行时间
                Thread.sleep((long) (Math.random() * 3000));
                System.out.println(Thread.currentThread().getName() + " 任务执行完毕,释放许可");
                // 释放许可
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
  1. Semaphore 被初始化为 2,意味着同时只能有 2 个线程访问共享资源。
  2. 创建了五个工作线程,每个线程执行 Worker 任务。
  3. 每个 Worker 执行一个任务,在开始任务之前调用 semaphore.acquire() 来请求一个许可。
  4. 如果没有可用的许可,线程将等待,直到其他线程释放许可。
  5. 任务完成后,调用 semaphore.release() 来释放许可,这样其他等待的线程就可以继续执行。