京东6.18大促主会场领京享红包更优惠

 找回密码
 立即注册

QQ登录

只需一步,快速开始

Java 并发编程使用指南:从入门到精通(基于 JDK 21)

2025-12-8 19:45| 发布者: AI陈绪元| 查看: 49| 评论: 0

摘要: 一、并发编程基础1.1 什么是并发编程?并发编程是指在同一时间段内处理多个任务的编程技术。在现代应用中,并发编程可以:提升性能:充分利用多核 CPU 资源提高响应速度:避免阻塞操作影响用户体验优化资源利用:合


一、并发编程基础

1.1 什么是并发编程?

并发编程是指在同一时间段内处理多个任务的编程技术。在现代应用中,并发编程可以:

  • 提升性能:充分利用多核 CPU 资源
  • 提高响应速度:避免阻塞操作影响用户体验
  • 优化资源利用:合理分配系统资源

并发 vs 并行:

并发(Concurrency):多个任务交替执行CPU 核心: ──A1──B1──A2──C1──B2──A3──C2──并行(Parallelism):多个任务真正同时执行CPU 核心1: ──A1──A2──A3──CPU 核心2: ──B1──B2──B3──CPU 核心3: ──C1──C2──C3──

1.2 线程安全问题示例

public class RaceConditionDemo {    private int count = 0;    public void increment() {        count++;  // 非原子操作!    }    public static void main(String[] args) throws InterruptedException {        RaceConditionDemo demo = new RaceConditionDemo();                Thread[] threads = new Thread[1000];        for (int i = 0; i < threads.length; i++) {            threads[i] = new Thread(() -> {                for (int j = 0; j < 1000; j++) {                    demo.increment();                }            });            threads[i].start();        }        for (Thread thread : threads) {            thread.join();        }        System.out.println("最终结果: " + demo.count);  // 期望1000000,实际小于    }}

二、Java 内存模型(JMM)

2.1 JMM 核心概念

Java 内存模型定义了线程如何与主内存交互。

┌─────────────────────────────────────────┐│           主内存(Main Memory)           ││     存放所有共享变量和对象实例            │└────────┬─────────────────────┬──────────┘         │                     │    读取  │  刷新          读取  │  刷新         ▼                     ▼┌─────────────────┐   ┌─────────────────┐│  工作内存1       │   │  工作内存2       ││  (线程私有)     │   │  (线程私有)     ││  ┌───────────┐  │   │  ┌───────────┐  ││  │ 变量副本   │  │   │  │ 变量副本   │  ││  └───────────┘  │   │  └───────────┘  │└─────────────────┘   └─────────────────┘        ▲                     ▲        │                     │    ┌───┴───┐             ┌───┴───┐    │ 线程1  │             │ 线程2  │    └───────┘             └───────┘

2.2 volatile 关键字

volatile 保证可见性和有序性,但不保证原子性。

public class VolatileDemo {    private volatile boolean flag = false;  // volatile 保证可见性    private int count = 0;    public void writer() {        count = 100;       // 1        flag = true;       // 2 (volatile写)    }    public void reader() {        if (flag) {        // 3 (volatile读)            int i = count; // 4,i一定等于100(happens-before规则)            System.out.println("count = " + i);        }    }    public static void main(String[] args) {        VolatileDemo demo = new VolatileDemo();                new Thread(demo::writer).start();        new Thread(demo::reader).start();    }}

volatile 使用场景:

  1. 状态标志(如上例的 flag)
  2. 双重检查锁定(DCL)单例模式
  3. 读多写少的场景

⚠️ volatile 不保证原子性:

public class VolatileNotAtomicDemo {    private volatile int count = 0;    public void increment() {        count++;  // 仍然不是原子操作!    }}

三、synchronized 关键字

3.1 synchronized 的三种用法

用法一:同步实例方法

public class SyncMethodDemo {    private int count = 0;    // 锁对象是 this    public synchronized void increment() {        count++;    }    public synchronized int getCount() {        return count;    }}

用法二:同步静态方法

public class SyncStaticMethodDemo {    private static int count = 0;    // 锁对象是 SyncStaticMethodDemo.class    public static synchronized void increment() {        count++;    }}

用法三:同步代码块

public class SyncBlockDemo {    private final Object lock = new Object();    private int count = 0;    public void increment() {        synchronized (lock) {  // 锁对象是 lock            count++;        }    }}

3.2 synchronized 实现原理

// Java 代码public void method() {    synchronized (this) {        count++;    }}// 字节码(简化)monitorenter  // 获取锁// ... 业务代码monitorexit   // 释放锁

synchronized 特性:

  • 互斥性:同一时刻只有一个线程可以执行
  • 可重入性:同一线程可以多次获取同一个锁
  • 可见性:释放锁时刷新到主内存

3.3 生产者-消费者模式

import java.util.LinkedList;import java.util.Queue;public class ProducerConsumer {    private final Queue<Integer> queue = new LinkedList<>();    private final int MAX_SIZE = 10;    // 生产者    public synchronized void produce(int value) throws InterruptedException {        while (queue.size() == MAX_SIZE) {            wait();  // 队列满,等待        }        queue.offer(value);        System.out.println("生产: " + value);        notifyAll();  // 唤醒消费者    }    // 消费者    public synchronized int consume() throws InterruptedException {        while (queue.isEmpty()) {            wait();  // 队列空,等待        }        int value = queue.poll();        System.out.println("消费: " + value);        notifyAll();  // 唤醒生产者        return value;    }    public static void main(String[] args) {        ProducerConsumer pc = new ProducerConsumer();        // 生产者线程        new Thread(() -> {            for (int i = 0; i < 20; i++) {                try {                    pc.produce(i);                    Thread.sleep(100);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        }).start();        // 消费者线程        new Thread(() -> {            for (int i = 0; i < 20; i++) {                try {                    pc.consume();                    Thread.sleep(200);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        }).start();    }}

四、Lock 接口

4.1 ReentrantLock 基本使用

import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class ReentrantLockDemo {    private final Lock lock = new ReentrantLock();    private int count = 0;    public void increment() {        lock.lock();  // 获取锁        try {            count++;        } finally {            lock.unlock();  // 必须在 finally 中释放锁        }    }    public int getCount() {        lock.lock();        try {            return count;        } finally {            lock.unlock();        }    }}

4.2 tryLock - 尝试获取锁

import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class TryLockDemo {    private final Lock lock = new ReentrantLock();    public void method1() {        if (lock.tryLock()) {  // 尝试获取锁,不阻塞            try {                System.out.println(Thread.currentThread().getName() + " 获取锁成功");                Thread.sleep(2000);            } catch (InterruptedException e) {                e.printStackTrace();            } finally {                lock.unlock();            }        } else {            System.out.println(Thread.currentThread().getName() + " 获取锁失败");        }    }    public void method2() {        try {            if (lock.tryLock(1, TimeUnit.SECONDS)) {  // 超时获取锁                try {                    System.out.println(Thread.currentThread().getName() + " 获取锁成功");                } finally {                    lock.unlock();                }            } else {                System.out.println(Thread.currentThread().getName() + " 获取锁超时");            }        } catch (InterruptedException e) {            e.printStackTrace();        }    }    public static void main(String[] args) {        TryLockDemo demo = new TryLockDemo();                new Thread(demo::method1, "线程A").start();        new Thread(demo::method1, "线程B").start();                try {            Thread.sleep(100);        } catch (InterruptedException e) {            e.printStackTrace();        }                new Thread(demo::method2, "线程C").start();    }}

4.3 ReadWriteLock - 读写锁

import java.util.HashMap;import java.util.Map;import java.util.concurrent.locks.ReadWriteLock;import java.util.concurrent.locks.ReentrantReadWriteLock;public class ReadWriteLockDemo {    private final Map<String, String> cache = new HashMap<>();    private final ReadWriteLock lock = new ReentrantReadWriteLock();    // 读操作(共享锁)    public String get(String key) {        lock.readLock().lock();        try {            Thread.sleep(100);  // 模拟读取延迟            return cache.get(key);        } catch (InterruptedException e) {            return null;        } finally {            lock.readLock().unlock();        }    }    // 写操作(独占锁)    public void put(String key, String value) {        lock.writeLock().lock();        try {            Thread.sleep(100);  // 模拟写入延迟            cache.put(key, value);            System.out.println("写入: " + key + " = " + value);        } catch (InterruptedException e) {            e.printStackTrace();        } finally {            lock.writeLock().unlock();        }    }    public static void main(String[] args) {        ReadWriteLockDemo demo = new ReadWriteLockDemo();        demo.put("key1", "value1");        // 创建多个读线程        for (int i = 0; i < 5; i++) {            new Thread(() -> {                String value = demo.get("key1");                System.out.println(Thread.currentThread().getName() + " 读取: " + value);            }, "读线程" + i).start();        }        // 创建写线程        new Thread(() -> demo.put("key2", "value2"), "写线程").start();    }}

五、原子类(Atomic)

5.1 AtomicInteger 详解

import java.util.concurrent.atomic.AtomicInteger;public class AtomicIntegerDemo {    public static void main(String[] args) {        AtomicInteger atomic = new AtomicInteger(0);        // 基本操作        System.out.println("初始值: " + atomic.get());  // 0        System.out.println("getAndIncrement: " + atomic.getAndIncrement());  // 返回0,然后+1        System.out.println("当前值: " + atomic.get());  // 1        System.out.println("incrementAndGet: " + atomic.incrementAndGet());  // 先+1,然后返回2        System.out.println("addAndGet(5): " + atomic.addAndGet(5));  // 先+5,然后返回7                // CAS 操作        boolean success = atomic.compareAndSet(7, 100);  // 期望值7,更新为100        System.out.println("CAS 操作结果: " + success);  // true        System.out.println("当前值: " + atomic.get());  // 100                // Lambda 表达式更新        atomic.updateAndGet(value -> value * 2);  // 100 * 2 = 200        System.out.println("updateAndGet: " + atomic.get());  // 200                atomic.accumulateAndGet(50, (left, right) -> left + right);  // 200 + 50 = 250        System.out.println("accumulateAndGet: " + atomic.get());  // 250    }}

5.2 AtomicStampedReference - 解决 ABA 问题

import java.util.concurrent.atomic.AtomicStampedReference;public class AtomicStampedReferenceDemo {    // 初始值100,版本号1    private static AtomicStampedReference<Integer> atomicRef =         new AtomicStampedReference<>(100, 1);    public static void main(String[] args) throws InterruptedException {        // 线程1        Thread t1 = new Thread(() -> {            int stamp = atomicRef.getStamp();            System.out.println("线程1 版本号: " + stamp);                        try {                Thread.sleep(1000);  // 让线程2先执行            } catch (InterruptedException e) {                e.printStackTrace();            }                        // 尝试CAS,期望版本号仍为1            boolean success = atomicRef.compareAndSet(100, 200, stamp, stamp + 1);            System.out.println("线程1 CAS 结果: " + success);  // false,因为版本号已变化        });        // 线程2:执行 ABA 操作        Thread t2 = new Thread(() -> {            int stamp = atomicRef.getStamp();            System.out.println("线程2 版本号: " + stamp);                        // A -> B            atomicRef.compareAndSet(100, 50, stamp, stamp + 1);            System.out.println("线程2 修改为: " + atomicRef.getReference());                        stamp = atomicRef.getStamp();            // B -> A            atomicRef.compareAndSet(50, 100, stamp, stamp + 1);            System.out.println("线程2 修改回: " + atomicRef.getReference());        });        t1.start();        t2.start();        t1.join();        t2.join();                System.out.println("最终值: " + atomicRef.getReference());        System.out.println("最终版本号: " + atomicRef.getStamp());    }}

六、线程池(ThreadPoolExecutor)

6.1 ThreadPoolExecutor 核心参数

import java.util.concurrent.*;public class ThreadPoolDemo {    public static void main(String[] args) {        ThreadPoolExecutor executor = new ThreadPoolExecutor(            5,                       // corePoolSize: 核心线程数            10,                      // maximumPoolSize: 最大线程数            60L,                     // keepAliveTime: 空闲线程存活时间            TimeUnit.SECONDS,        // unit: 时间单位            new LinkedBlockingQueue<>(100),  // workQueue: 任务队列            Executors.defaultThreadFactory(),  // threadFactory: 线程工厂            new ThreadPoolExecutor.AbortPolicy()  // handler: 拒绝策略        );        // 提交任务        for (int i = 1; i <= 20; i++) {            final int taskId = i;            executor.execute(() -> {                System.out.println("任务 " + taskId + " 执行,线程: " +                                  Thread.currentThread().getName());                try {                    Thread.sleep(2000);                } catch (InterruptedException e) {                    e.printStackTrace();                }            });        }        executor.shutdown();    }}

6.2 四种拒绝策略

// 1. AbortPolicy:抛出异常(默认)new ThreadPoolExecutor.AbortPolicy()// 2. CallerRunsPolicy:调用者线程执行new ThreadPoolExecutor.CallerRunsPolicy()// 3. DiscardOldestPolicy:丢弃队列中最旧的任务new ThreadPoolExecutor.DiscardOldestPolicy()// 4. DiscardPolicy:直接丢弃新任务new ThreadPoolExecutor.DiscardPolicy()

6.3 线程池最佳实践

public class ThreadPoolBestPractice {    public static void main(String[] args) {        int cpuCores = Runtime.getRuntime().availableProcessors();                // CPU 密集型:线程数 = CPU 核心数 + 1        ThreadPoolExecutor cpuBoundPool = new ThreadPoolExecutor(            cpuCores + 1,            cpuCores + 1,            0L,            TimeUnit.MILLISECONDS,            new LinkedBlockingQueue<>(1000),            new ThreadPoolExecutor.CallerRunsPolicy()        );        // IO 密集型:线程数 = CPU 核心数 * 2        ThreadPoolExecutor ioBoundPool = new ThreadPoolExecutor(            cpuCores * 2,            cpuCores * 2,            60L,            TimeUnit.SECONDS,            new LinkedBlockingQueue<>(1000),            new ThreadPoolExecutor.CallerRunsPolicy()        );        System.out.println("CPU 核心数: " + cpuCores);        System.out.println("CPU 密集型线程数: " + (cpuCores + 1));        System.out.println("IO 密集型线程数: " + (cpuCores * 2));    }}

七、CompletableFuture 异步编程

7.1 创建 CompletableFuture

import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class CompletableFutureBasic {    public static void main(String[] args) throws ExecutionException, InterruptedException {        // 方式1:runAsync(无返回值)        CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {            System.out.println("runAsync 执行,线程: " + Thread.currentThread().getName());        });        future1.get();        // 方式2:supplyAsync(有返回值)        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {            System.out.println("supplyAsync 执行,线程: " + Thread.currentThread().getName());            return "Hello CompletableFuture";        });        System.out.println("结果: " + future2.get());    }}

7.2 链式调用

import java.util.concurrent.CompletableFuture;public class CompletableFutureChain {    public static void main(String[] args) {        CompletableFuture.supplyAsync(() -> {            System.out.println("第一步: 获取用户ID");            return "userId:12345";        }).thenApply(userId -> {            System.out.println("第二步: 根据用户ID查询用户信息");            return "name=张三,age=25";        }).thenApply(userInfo -> {            System.out.println("第三步: 格式化用户信息");            return "用户信息: " + userInfo;        }).thenAccept(result -> {            System.out.println("第四步: 输出结果");            System.out.println(result);        }).join();    }}

7.3 组合多个 CompletableFuture

import java.util.concurrent.CompletableFuture;public class CompletableFutureCombine {    public static void main(String[] args) {        // thenCombine:组合两个 Future,都完成后执行        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {            sleep(1000);            return "结果1";        });        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {            sleep(2000);            return "结果2";        });        CompletableFuture<String> combined = future1.thenCombine(future2, (r1, r2) -> {            return r1 + " + " + r2;        });        System.out.println("thenCombine: " + combined.join());        // allOf:等待所有 Future 完成        CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2);        allOf.join();        System.out.println("allOf 完成");        // anyOf:任意一个 Future 完成即返回        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2);        System.out.println("anyOf: " + anyOf.join());    }    private static void sleep(long millis) {        try {            Thread.sleep(millis);        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

7.4 异常处理

import java.util.concurrent.CompletableFuture;public class CompletableFutureException {    public static void main(String[] args) {        // exceptionally:捕获异常并返回默认值        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {            if (Math.random() > 0.5) {                throw new RuntimeException("模拟异常");            }            return "正常结果";        }).exceptionally(ex -> {            System.out.println("捕获异常: " + ex.getMessage());            return "默认值";        });        System.out.println("exceptionally: " + future1.join());        // handle:统一处理结果和异常        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {            throw new RuntimeException("异常");        }).handle((result, ex) -> {            if (ex != null) {                return "异常处理: " + ex.getMessage();            }            return result;        });        System.out.println("handle: " + future2.join());    }}

7.5 实战案例:并行查询商品价格

import java.util.Arrays;import java.util.List;import java.util.concurrent.CompletableFuture;import java.util.stream.Collectors;public class CompletableFutureShopDemo {    public static void main(String[] args) {        List<String> products = Arrays.asList("iPhone 15", "MacBook Pro", "iPad Air",                                              "AirPods Pro", "Apple Watch");                long start = System.currentTimeMillis();                // 并行查询价格        List<CompletableFuture<String>> futures = products.stream()            .map(product -> CompletableFuture.supplyAsync(() -> getPrice(product)))            .collect(Collectors.toList());        List<String> prices = futures.stream()            .map(CompletableFuture::join)            .collect(Collectors.toList());        long end = System.currentTimeMillis();                prices.forEach(System.out::println);        System.out.println("总耗时: " + (end - start) + "ms");    }    private static String getPrice(String product) {        try {            Thread.sleep(1000);  // 模拟网络请求        } catch (InterruptedException e) {            e.printStackTrace();        }        double price = Math.random() * 1000 + 5000;        return String.format("%s - %.2f元", product, price);    }}

八、虚拟线程(JDK 21+)

8.1 虚拟线程简介

虚拟线程是 JDK 21 引入的革命性特性,也称为轻量级线程。

传统线程 vs 虚拟线程:

特性

平台线程

虚拟线程

映射关系

1:1 映射到 OS 线程

M:N 调度到平台线程

栈大小

默认 1 MB

初始几百字节

创建成本

约 1 µs + 1 MB

约 1 µs + 400 B

最大并发

几千

百万级

阻塞行为

阻塞 OS 线程

只阻塞虚拟线程

8.2 创建虚拟线程

public class VirtualThreadDemo {    public static void main(String[] args) throws InterruptedException {        // 方式1:Thread.startVirtualThread()        Thread vThread1 = Thread.startVirtualThread(() -> {            System.out.println("虚拟线程1: " + Thread.currentThread());        });        vThread1.join();        // 方式2:Thread.ofVirtual().start()        Thread vThread2 = Thread.ofVirtual().start(() -> {            System.out.println("虚拟线程2: " + Thread.currentThread());        });        vThread2.join();        // 方式3:Thread.Builder        Thread.Builder builder = Thread.ofVirtual().name("vThread-", 0);        Thread vThread3 = builder.start(() -> {            System.out.println("虚拟线程3: " + Thread.currentThread());        });        vThread3.join();        // 方式4:Executors.newVirtualThreadPerTaskExecutor()        var executor = Executors.newVirtualThreadPerTaskExecutor();        executor.submit(() -> {            System.out.println("虚拟线程4: " + Thread.currentThread());        });        executor.close();    }}

8.3 虚拟线程性能测试

import java.time.Duration;import java.util.concurrent.Executors;import java.util.stream.IntStream;public class VirtualThreadPerformance {    public static void main(String[] args) {        long start = System.currentTimeMillis();        // 创建 100 万个虚拟线程        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {            IntStream.range(0, 1_000_000).forEach(i -> {                executor.submit(() -> {                    try {                        Thread.sleep(Duration.ofSeconds(1));                    } catch (InterruptedException e) {                        Thread.currentThread().interrupt();                    }                    return i;                });            });        }        long end = System.currentTimeMillis();        System.out.println("100 万个虚拟线程执行完成,耗时: " + (end - start) + "ms");    }}

输出示例:

100 万个虚拟线程执行完成,耗时: 2341ms

8.4 虚拟线程最佳实践

✅ 适合使用虚拟线程的场景:

  1. IO 密集型任务(HTTP 请求、数据库查询)
  2. 高并发场景(百万级连接)
  3. 简化异步代码(用同步代码实现异步效果)

❌ 不适合使用虚拟线程的场景:

  1. CPU 密集型任务
  2. 使用 synchronized 大量阻塞
  3. 使用 ThreadLocal 存储大量数据
// ✅ 推荐:虚拟线程处理 IO 密集型任务try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {    List<String> urls = List.of("http://api1.com", "http://api2.com", ...);    urls.forEach(url -> executor.submit(() -> fetchData(url)));}// ❌ 不推荐:虚拟线程处理 CPU 密集型任务// CPU 密集型仍建议使用平台线程池

九、并发工具类

9.1 CountDownLatch - 倒计时门闩

import java.util.concurrent.CountDownLatch;public class CountDownLatchDemo {    public static void main(String[] args) throws InterruptedException {        int threadCount = 5;        CountDownLatch latch = new CountDownLatch(threadCount);        for (int i = 1; i <= threadCount; i++) {            final int taskId = i;            new Thread(() -> {                System.out.println("任务 " + taskId + " 开始执行");                try {                    Thread.sleep((long) (Math.random() * 2000));                } catch (InterruptedException e) {                    e.printStackTrace();                }                System.out.println("任务 " + taskId + " 执行完成");                latch.countDown();  // 计数器减 1            }).start();        }        System.out.println("等待所有任务完成...");        latch.await();  // 阻塞,直到计数器为 0        System.out.println("所有任务已完成!");    }}

9.2 CyclicBarrier - 循环栅栏

import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;public class CyclicBarrierDemo {    public static void main(String[] args) {        int threadCount = 3;        CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {            System.out.println("所有线程已到达屏障,执行后续任务...\n");        });        for (int i = 1; i <= threadCount; i++) {            final int taskId = i;            new Thread(() -> {                for (int round = 1; round <= 3; round++) {                    try {                        System.out.println("任务 " + taskId + " 第 " + round + " 轮执行");                        Thread.sleep((long) (Math.random() * 2000));                        System.out.println("任务 " + taskId + " 第 " + round + " 轮完成,等待其他线程...");                        barrier.await();  // 等待其他线程                    } catch (InterruptedException | BrokenBarrierException e) {                        e.printStackTrace();                    }                }            }).start();        }    }}

9.3 Semaphore - 信号量

import java.util.concurrent.Semaphore;public class SemaphoreDemo {    public static void main(String[] args) {        // 3 个许可证(停车位)        Semaphore semaphore = new Semaphore(3);        for (int i = 1; i <= 10; i++) {            final int carId = i;            new Thread(() -> {                try {                    System.out.println("车辆 " + carId + " 到达停车场");                    semaphore.acquire();  // 获取许可证                    System.out.println("车辆 " + carId + " 进入停车场,停车中...");                    Thread.sleep((long) (Math.random() * 3000));                    System.out.println("车辆 " + carId + " 离开停车场");                } catch (InterruptedException e) {                    e.printStackTrace();                } finally {                    semaphore.release();  // 释放许可证                }            }).start();        }    }}

十、并发集合

10.1 ConcurrentHashMap

import java.util.concurrent.ConcurrentHashMap;public class ConcurrentHashMapDemo {    public static void main(String[] args) {        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();        // 并发写入        for (int i = 0; i < 10; i++) {            final int value = i;            new Thread(() -> {                map.put("key" + value, value);                System.out.println(Thread.currentThread().getName() + " 写入: key" + value);            }).start();        }        try {            Thread.sleep(1000);        } catch (InterruptedException e) {            e.printStackTrace();        }        // 原子操作        map.putIfAbsent("key1", 100);  // key1 已存在,不会更新        map.compute("key2", (k, v) -> v == null ? 1 : v + 1);  // key2 的值加 1        map.merge("key3", 10, Integer::sum);  // key3 的值加 10        System.out.println("\n最终结果: " + map);    }}

10.2 CopyOnWriteArrayList

import java.util.concurrent.CopyOnWriteArrayList;public class CopyOnWriteArrayListDemo {    public static void main(String[] args) throws InterruptedException {        CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();        // 写线程        Thread writer = new Thread(() -> {            for (int i = 0; i < 5; i++) {                list.add("元素" + i);                System.out.println("写入: 元素" + i);                try {                    Thread.sleep(100);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        });        // 读线程        Thread reader = new Thread(() -> {            for (int i = 0; i < 10; i++) {                System.out.println("读取: " + list);                try {                    Thread.sleep(50);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        });        writer.start();        reader.start();        writer.join();        reader.join();        System.out.println("\n最终结果: " + list);    }}

十一、最佳实践与经验总结

11.1 线程安全的单例模式

方式一:饿汉式(推荐)

public class Singleton {    private static final Singleton INSTANCE = new Singleton();    private Singleton() {    }    public static Singleton getInstance() {        return INSTANCE;    }}

方式二:双重检查锁(DCL)

public class Singleton {    private static volatile Singleton instance;  // 必须 volatile    private Singleton() {    }    public static Singleton getInstance() {        if (instance == null) {            synchronized (Singleton.class) {                if (instance == null) {                    instance = new Singleton();                }            }        }        return instance;    }}

方式三:静态内部类(推荐)

public class Singleton {    private Singleton() {    }    private static class Holder {        private static final Singleton INSTANCE = new Singleton();    }    public static Singleton getInstance() {        return Holder.INSTANCE;    }}

11.2 如何选择并发工具

需要原子操作?├─ 是 → AtomicInteger/AtomicLong/AtomicReference└─ 否 ↓需要加锁?├─ 简单互斥 → synchronized├─ 需要 tryLock/超时 → ReentrantLock├─ 读多写少 → ReadWriteLock└─ 否 ↓需要线程池?├─ CPU 密集型 → 固定大小线程池(核心数 + 1)├─ IO 密集型 → 较大线程池(核心数 * 2)└─ 高并发 IO → 虚拟线程(JDK 21+)需要异步编程?├─ 简单异步 → Future/FutureTask├─ 复杂异步编排 → CompletableFuture└─ 响应式编程 → Project Reactor/RxJava需要线程协作?├─ 等待所有任务完成 → CountDownLatch├─ 循环等待 → CyclicBarrier└─ 资源限流 → Semaphore

11.3 常见并发问题排查

问题一:线程死锁

// 检测死锁public class DeadlockDetection {    public static void main(String[] args) {        ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();        long[] deadlockedThreads = threadMXBean.findDeadlockedThreads();                if (deadlockedThreads != null) {            ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(deadlockedThreads);            for (ThreadInfo info : threadInfos) {                System.out.println("发现死锁: " + info.getThreadName());                System.out.println("锁信息: " + info.getLockInfo());            }        }    }}

问题二:内存泄漏(ThreadLocal)

// ❌ 错误示例public class ThreadLocalLeak {    private static ThreadLocal<byte[]> threadLocal = new ThreadLocal<>();    public void process() {        threadLocal.set(new byte[1024 * 1024]);  // 1MB        // 忘记 remove(),导致内存泄漏    }}// ✅ 正确示例public class ThreadLocalCorrect {    private static ThreadLocal<byte[]> threadLocal = new ThreadLocal<>();    public void process() {        try {            threadLocal.set(new byte[1024 * 1024]);            // 业务逻辑        } finally {            threadLocal.remove();  // 必须清理        }    }}

11.4 性能优化建议

  1. 减少锁的粒度:只锁必要的代码块
  2. 使用读写锁:读多写少场景使用 ReadWriteLock
  3. 使用无锁数据结构:AtomicInteger、ConcurrentHashMap
  4. 使用虚拟线程:JDK 21+ 的 IO 密集型场景
  5. 避免上下文切换:减少线程数量
  6. 使用对象池:减少对象创建开销
Java 并发编程使用指南:从入门到精通(基于 JDK 21)


查看详情:https://www.toutiao.com/article/7578855858382193152
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

QQ|手机版|小黑屋|梦想之都-俊月星空 ( 粤ICP备18056059号 )|网站地图

GMT+8, 2025-12-14 16:24 , Processed in 0.030524 second(s), 18 queries .

Powered by Mxzdjyxk! X3.5

© 2001-2025 Discuz! Team.

返回顶部