Java JUC 详解:并发.util.concurrent 并发工具包指南

Java JUC 详解:并发.util.concurrent 并发工具包指南

目录

Java JUC 详解:并发.util.concurrent 并发工具包指南

线程池框架

核心接口与类

线程池示例

ThreadPoolExecutor 核心参数

并发集合

常用并发集合

ConcurrentHashMap 示例

同步工具类

CountDownLatch

CyclicBarrier

Semaphore

原子操作类

锁机制

Lock 接口

读写锁

实战案例:生产者消费者模型

总结

JUC(Java Util Concurrent)即 Java 并发工具包,是java.util.concurrent包及其子包的简称,自 Java 5 引入,为并发编程提供了高效、安全、可靠的工具类,极大简化了多线程编程的复杂度。

JUC 主要包含以下几类组件:

线程池框架(Executor Framework)并发集合(Concurrent Collections)同步工具(Synchronizers)原子操作类(Atomic Classes)锁机制(Locks)并发工具类(如 CountDownLatch、CyclicBarrier 等)

线程池框架

线程池通过重用线程来减少线程创建和销毁的开销,提高系统性能。

核心接口与类

Executor:最基本的线程池接口,定义了执行任务的方法ExecutorService:扩展了 Executor,提供了更丰富的线程池操作ThreadPoolExecutor:线程池的核心实现类Executors:线程池的工具类,提供了常用线程池的创建方法

线程池示例

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

public class ThreadPoolExample {

public static void main(String[] args) {

// 创建固定大小的线程池

ExecutorService executor = Executors.newFixedThreadPool(3);

// 提交任务

for (int i = 0; i < 10; i++) {

final int taskId = i;

executor.submit(() -> {

try {

System.out.println("任务 " + taskId + " 由线程 " +

Thread.currentThread().getName() + " 执行");

TimeUnit.SECONDS.sleep(1);

} catch (InterruptedException e) {

Thread.currentThread().interrupt();

}

});

}

// 关闭线程池

executor.shutdown();

try {

// 等待所有任务完成

if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {

// 超时后强制关闭

executor.shutdownNow();

}

} catch (InterruptedException e) {

executor.shutdownNow();

}

}

}

ThreadPoolExecutor 核心参数

手动创建线程池时,ThreadPoolExecutor的构造函数提供了最灵活的配置:

public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue workQueue,

ThreadFactory threadFactory,

RejectedExecutionHandler handler)

corePoolSize:核心线程数maximumPoolSize:最大线程数keepAliveTime:非核心线程的空闲超时时间workQueue:任务等待队列threadFactory:线程工厂handler:拒绝策略

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

public class CustomThreadPool {

public static void main(String[] args) {

// 自定义线程池配置

ThreadPoolExecutor executor = new ThreadPoolExecutor(

2, // 核心线程数

5, // 最大线程数

30, // 空闲时间

TimeUnit.SECONDS,

new ArrayBlockingQueue<>(10), // 有界队列

new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略

);

// 提交任务

for (int i = 0; i < 20; i++) {

final int taskId = i;

executor.execute(() -> {

try {

System.out.println("任务 " + taskId + " 由线程 " +

Thread.currentThread().getName() + " 执行");

TimeUnit.MILLISECONDS.sleep(500);

} catch (InterruptedException e) {

Thread.currentThread().interrupt();

}

});

}

executor.shutdown();

}

}

并发集合

JUC 提供了一系列线程安全的集合类,相比传统的同步集合,通常具有更好的性能。

常用并发集合

ConcurrentHashMap:线程安全的 HashMap 替代者CopyOnWriteArrayList:读多写少场景下的线程安全 ListCopyOnWriteArraySet:基于 CopyOnWriteArrayList 实现的 SetConcurrentLinkedQueue:高效的并发队列LinkedBlockingQueue:可阻塞的链表队列ArrayBlockingQueue:有界的数组队列PriorityBlockingQueue:支持优先级的阻塞队列

ConcurrentHashMap 示例

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

public class ConcurrentHashMapExample {

public static void main(String[] args) throws InterruptedException {

Map concurrentMap = new ConcurrentHashMap<>();

ExecutorService executor = Executors.newFixedThreadPool(4);

// 并发写入

for (int i = 0; i < 1000; i++) {

final int num = i;

executor.submit(() -> {

String key = "key" + (num % 10);

// 原子操作:计算并替换

concurrentMap.compute(key, (k, v) -> v == null ? 1 : v + 1);

});

}

executor.shutdown();

executor.awaitTermination(1, TimeUnit.MINUTES);

// 输出结果

concurrentMap.forEach((k, v) -> System.out.println(k + ": " + v));

}

}

同步工具类

JUC 提供了多种同步工具,用于协调多个线程之间的协作。

CountDownLatch

允许一个或多个线程等待其他线程完成操作。

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

public class CountDownLatchExample {

public static void main(String[] args) throws InterruptedException {

// 计数器为3

CountDownLatch latch = new CountDownLatch(3);

ExecutorService executor = Executors.newFixedThreadPool(3);

for (int i = 0; i < 3; i++) {

final int taskId = i;

executor.submit(() -> {

try {

System.out.println("任务 " + taskId + " 开始执行");

Thread.sleep(1000 + taskId * 500);

System.out.println("任务 " + taskId + " 执行完成");

} catch (InterruptedException e) {

Thread.currentThread().interrupt();

} finally {

// 计数器减1

latch.countDown();

}

});

}

System.out.println("等待所有任务完成...");

// 等待计数器变为0

latch.await();

System.out.println("所有任务已完成,继续执行主线程");

executor.shutdown();

}

}

CyclicBarrier

让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障,所有被阻塞的线程才会继续执行。

import java.util.concurrent.BrokenBarrierException;

import java.util.concurrent.CyclicBarrier;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

public class CyclicBarrierExample {

public static void main(String[] args) {

// 3个线程到达屏障后,执行Runnable任务

CyclicBarrier barrier = new CyclicBarrier(3, () ->

System.out.println("所有线程已到达屏障,开始下一步操作"));

ExecutorService executor = Executors.newFixedThreadPool(3);

for (int i = 0; i < 3; i++) {

final int threadId = i;

executor.submit(() -> {

try {

System.out.println("线程 " + threadId + " 正在执行任务");

Thread.sleep(1000 + threadId * 500);

System.out.println("线程 " + threadId + " 到达屏障");

// 等待其他线程到达

barrier.await();

System.out.println("线程 " + threadId + " 继续执行");

} catch (InterruptedException | BrokenBarrierException e) {

Thread.currentThread().interrupt();

}

});

}

executor.shutdown();

}

}

Semaphore

信号量,用于控制同时访问特定资源的线程数量。

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Semaphore;

import java.util.concurrent.TimeUnit;

public class SemaphoreExample {

public static void main(String[] args) {

// 允许3个线程同时访问

Semaphore semaphore = new Semaphore(3);

ExecutorService executor = Executors.newFixedThreadPool(5);

for (int i = 0; i < 10; i++) {

final int taskId = i;

executor.submit(() -> {

try {

// 获取许可

semaphore.acquire();

System.out.println("任务 " + taskId + " 获得许可,开始执行");

TimeUnit.SECONDS.sleep(2);

System.out.println("任务 " + taskId + " 执行完成,释放许可");

} catch (InterruptedException e) {

Thread.currentThread().interrupt();

} finally {

// 释放许可

semaphore.release();

}

});

}

executor.shutdown();

}

}

原子操作类

JUC 提供了一系列原子操作类,用于在不使用锁的情况下实现线程安全的原子操作。

主要原子类包括:

基本类型:AtomicInteger、AtomicLong、AtomicBoolean数组类型:AtomicIntegerArray、AtomicLongArray等引用类型:AtomicReference、AtomicStampedReference等字段更新器:AtomicIntegerFieldUpdater等

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicExample {

private static AtomicInteger counter = new AtomicInteger(0);

public static void main(String[] args) throws InterruptedException {

ExecutorService executor = Executors.newFixedThreadPool(10);

// 10个线程,每个线程自增1000次

for (int i = 0; i < 10; i++) {

executor.submit(() -> {

for (int j = 0; j < 1000; j++) {

// 原子自增操作

counter.incrementAndGet();

}

});

}

executor.shutdown();

executor.awaitTermination(1, TimeUnit.MINUTES);

// 结果应该是10000

System.out.println("最终计数: " + counter.get());

}

}

锁机制

JUC 的java.util.concurrent.locks包提供了比synchronized更灵活的锁机制。

Lock 接口

Lock接口是所有锁的父接口,主要实现类有:

ReentrantLock:可重入锁ReentrantReadWriteLock:可重入读写锁

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockExample {

private static int count = 0;

// 创建可重入锁

private static Lock lock = new ReentrantLock();

public static void main(String[] args) throws InterruptedException {

ExecutorService executor = Executors.newFixedThreadPool(5);

for (int i = 0; i < 1000; i++) {

executor.submit(() -> {

// 获取锁

lock.lock();

try {

count++;

} finally {

// 确保锁被释放

lock.unlock();

}

});

}

executor.shutdown();

executor.awaitTermination(1, TimeUnit.MINUTES);

System.out.println("最终计数: " + count);

}

}

读写锁

ReentrantReadWriteLock提供了读锁和写锁分离,适合读多写少的场景:

import java.util.HashMap;

import java.util.Map;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.ReadWriteLock;

import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockExample {

private Map data = new HashMap<>();

private ReadWriteLock lock = new ReentrantReadWriteLock();

// 读操作使用读锁

public String get(String key) {

lock.readLock().lock();

try {

System.out.println("读取 key: " + key + ",线程: " + Thread.currentThread().getName());

return data.get(key);

} finally {

lock.readLock().unlock();

}

}

// 写操作使用写锁

public void put(String key, String value) {

lock.writeLock().lock();

try {

System.out.println("写入 key: " + key + ",线程: " + Thread.currentThread().getName());

data.put(key, value);

} finally {

lock.writeLock().unlock();

}

}

public static void main(String[] args) throws InterruptedException {

ReadWriteLockExample example = new ReadWriteLockExample();

ExecutorService executor = Executors.newFixedThreadPool(5);

// 添加写操作

executor.submit(() -> example.put("name", "Java"));

// 添加多个读操作

for (int i = 0; i < 4; i++) {

executor.submit(() -> {

for (int j = 0; j < 3; j++) {

example.get("name");

try {

TimeUnit.MILLISECONDS.sleep(100);

} catch (InterruptedException e) {

Thread.currentThread().interrupt();

}

}

});

}

executor.shutdown();

executor.awaitTermination(1, TimeUnit.MINUTES);

}

}

实战案例:生产者消费者模型

使用 JUC 的阻塞队列实现经典的生产者消费者模型:

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

public class ProducerConsumerExample {

// 容量为10的阻塞队列

private static BlockingQueue queue = new ArrayBlockingQueue<>(10);

private static final int MAX_ITEMS = 20;

// 生产者

static class Producer implements Runnable {

private int id;

public Producer(int id) {

this.id = id;

}

@Override

public void run() {

try {

for (int i = 0; i < MAX_ITEMS; i++) {

int item = id * 100 + i;

queue.put(item); // 放入队列,如果满了会阻塞

System.out.println("生产者 " + id + " 生产了: " + item +

",队列大小: " + queue.size());

TimeUnit.MILLISECONDS.sleep(100); // 模拟生产耗时

}

} catch (InterruptedException e) {

Thread.currentThread().interrupt();

}

}

}

// 消费者

static class Consumer implements Runnable {

private int id;

public Consumer(int id) {

this.id = id;

}

@Override

public void run() {

try {

for (int i = 0; i < MAX_ITEMS; i++) {

int item = queue.take(); // 从队列取,如果空了会阻塞

System.out.println("消费者 " + id + " 消费了: " + item +

",队列大小: " + queue.size());

TimeUnit.MILLISECONDS.sleep(150); // 模拟消费耗时

}

} catch (InterruptedException e) {

Thread.currentThread().interrupt();

}

}

}

public static void main(String[] args) throws InterruptedException {

ExecutorService executor = Executors.newFixedThreadPool(4);

// 创建2个生产者

executor.submit(new Producer(1));

executor.submit(new Producer(2));

// 创建2个消费者

executor.submit(new Consumer(1));

executor.submit(new Consumer(2));

executor.shutdown();

executor.awaitTermination(1, TimeUnit.MINUTES);

}

}

总结

JUC 为 Java 并发编程提供了强大的工具支持,大大简化了多线程程序的开发难度。掌握 JUC 的使用,能够帮助开发者编写高效、安全的并发程序,应对多线程环境下的各种挑战。在实际开发中,应根据具体场景选择合适的并发工具,同时注意线程安全和性能之间的平衡。通过不断实践和深入理解这些工具的原理,您将能够构建出更健壮、更高效的并发应用程序。

相关推荐

陕西:守护绿水青山 当好秦岭卫士中国网2024-04-22 13:42:58
义乌365人工客服电话多少

陕西:守护绿水青山 当好秦岭卫士中国网2024-04-22 13:42:58

⌛ 07-26 👁️ 7456
Keyshot-入门介绍及菜单基础设置
38365365.com打不开

Keyshot-入门介绍及菜单基础设置

⌛ 08-07 👁️ 6715
中国台湾
义乌365人工客服电话多少

中国台湾

⌛ 07-20 👁️ 5957