Java 多线程开发
本文档整理 Java 多线程开发的常用代码实践,配套理论知识请参考 JUC 基础。
一、线程的创建
1.1 继承 Thread 类
class MyThread extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ": hello world");
}
}
public class Demo {
public static void main(String[] args) {
new MyThread().start();
}
}
1.2 实现 Runnable 接口
public class Demo {
public static void main(String[] args) {
new Thread(() -> System.out.println("hello world")).start();
}
}
1.3 实现 Callable 接口(有返回值)
public class Demo {
public static void main(String[] args) throws Exception {
FutureTask<String> task = new FutureTask<>(() -> {
Thread.sleep(1000);
return "callable result";
});
new Thread(task).start();
System.out.println(task.get()); // 阻塞获取结果
}
}
1.4 线程池创建
// 使用线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数
5, // 最大线程数
1, // 线程空闲时间
TimeUnit.SECONDS, // 时间单位
new ArrayBlockingQueue<>(3), // 阻塞队列
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.DiscardOldestPolicy() // 拒绝策略
);
// 使用 execute
executor.execute(() -> {
System.out.println("hello world 1");
});
// 使用 submit
Future<Integer> submitted = executor.submit(() -> {
return 1;
});
System.out.println(submitted.get());
// 关闭线程池
executor.shutdown();
二、线程同步与锁
2.1 synchronized 关键字
public class SynchronizedDemo {
private int count = 0;
// 同步实例方法 - 锁 this
public synchronized void increment() {
count++;
}
// 同步代码块 - 锁指定对象
public void decrement() {
synchronized (this) {
count--;
}
}
// 同步静态方法 - 锁 Class 对象
public static synchronized void staticMethod() {
System.out.println("static synchronized method");
}
public static void main(String[] args) throws InterruptedException {
SynchronizedDemo demo = new SynchronizedDemo();
// 多线程并发修改
Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) demo.increment();
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 1000; i++) demo.increment();
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("count = " + demo.count); // 输出 2000
}
}
2.2 ReentrantLock
基本使用
public class ReentrantLockDemo {
private final ReentrantLock lock = new ReentrantLock();
private int count = 0;
public void increment() {
lock.lock(); // 手动加锁
try {
count++;
} finally {
lock.unlock(); // 必须在 finally 中释放锁
}
}
// 尝试获取锁,不阻塞
public void tryLockDemo() {
if (lock.tryLock()) {
try {
System.out.println("获取锁成功");
} finally {
lock.unlock();
}
} else {
System.out.println("获取锁失败,做其他事情");
}
}
// 带超时的尝试获取锁
public void tryLockWithTimeout() throws InterruptedException {
if (lock.tryLock(3, TimeUnit.SECONDS)) {
try {
System.out.println("3秒内获取锁成功");
} finally {
lock.unlock();
}
} else {
System.out.println("超时未获取到锁");
}
}
}
死锁演示与解决
public class DeadlockDemo {
private static final Object lockA = new Object();
private static final Object lockB = new Object();
public static void main(String[] args) {
// 死锁场景:线程1持有A等B,线程2持有B等A
new Thread(() -> {
synchronized (lockA) {
System.out.println("Thread-1 持有 lockA,等待 lockB");
try { Thread.sleep(100); } catch (InterruptedException e) {}
synchronized (lockB) { // 等待 lockB,但被 Thread-2 持有
System.out.println("Thread-1 获取 lockB");
}
}
}, "Thread-1").start();
new Thread(() -> {
synchronized (lockB) {
System.out.println("Thread-2 持有 lockB,等待 lockA");
try { Thread.sleep(100); } catch (InterruptedException e) {}
synchronized (lockA) { // 等待 lockA,但被 Thread-1 持有
System.out.println("Thread-2 获取 lockA");
}
}
}, "Thread-2").start();
}
}
使用 ReentrantLock 解决死锁(tryLock 超时机制):
public class DeadlockSolution {
private static final ReentrantLock lockA = new ReentrantLock();
private static final ReentrantLock lockB = new ReentrantLock();
public static void main(String[] args) {
new Thread(() -> {
try {
if (lockA.tryLock(1, TimeUnit.SECONDS)) {
try {
System.out.println("Thread-1 持有 lockA");
Thread.sleep(100);
if (lockB.tryLock(1, TimeUnit.SECONDS)) {
try {
System.out.println("Thread-1 获取 lockB,执行业务");
} finally {
lockB.unlock();
}
} else {
System.out.println("Thread-1 获取 lockB 失败,放弃");
}
} finally {
lockA.unlock();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Thread-1").start();
new Thread(() -> {
try {
if (lockB.tryLock(1, TimeUnit.SECONDS)) {
try {
System.out.println("Thread-2 持有 lockB");
Thread.sleep(100);
if (lockA.tryLock(1, TimeUnit.SECONDS)) {
try {
System.out.println("Thread-2 获取 lockA,执行业务");
} finally {
lockA.unlock();
}
} else {
System.out.println("Thread-2 获取 lockA 失败,放弃");
}
} finally {
lockB.unlock();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Thread-2").start();
}
}
2.3 读写锁 ReentrantReadWriteLock
public class ReadWriteLockDemo {
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock(); // 共享锁
private final Lock writeLock = rwLock.writeLock(); // 独占锁
private int value = 0;
// 读操作 - 多个线程可同时读
public int read() {
readLock.lock();
try {
System.out.println(Thread.currentThread().getName() + " 正在读取: " + value);
return value;
} finally {
readLock.unlock();
}
}
// 写操作 - 同一时间只能有一个线程写
public void write(int newValue) {
writeLock.lock();
try {
System.out.println(Thread.currentThread().getName() + " 正在写入: " + newValue);
value = newValue;
} finally {
writeLock.unlock();
}
}
// 锁降级:写锁 -> 读锁
public void writeThenRead() {
writeLock.lock();
try {
value = 100; // 先写
readLock.lock(); // 获取读锁
try {
System.out.println("锁降级后读取: " + value);
} finally {
readLock.unlock();
}
} finally {
writeLock.unlock(); // 最后释放写锁
}
}
public static void main(String[] args) {
ReadWriteLockDemo demo = new ReadWriteLockDemo();
// 5个线程并发读
for (int i = 0; i < 5; i++) {
new Thread(demo::read, "Reader-" + i).start();
}
// 2个线程写
for (int i = 0; i < 2; i++) {
final int v = i * 10;
new Thread(() -> demo.write(v), "Writer-" + i).start();
}
}
}
三、线程间通信
3.1 wait/notify
两个线程交替打印
public class AlternatePrint {
private int num = 1;
private final Object lock = new Object();
public static void main(String[] args) {
AlternatePrint demo = new AlternatePrint();
new Thread(() -> demo.printOdd(), "Odd").start();
new Thread(() -> demo.printEven(), "Even").start();
}
public void printOdd() {
synchronized (lock) {
while (num <= 10) {
if (num % 2 == 1) {
System.out.println(Thread.currentThread().getName() + ": " + num++);
lock.notify(); // 唤醒另一个线程
try {
if (num <= 10) lock.wait(); // 释放锁并等待
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {
try {
lock.wait(); // 不是自己的回合,等待
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}
public void printEven() {
synchronized (lock) {
while (num <= 10) {
if (num % 2 == 0) {
System.out.println(Thread.currentThread().getName() + ": " + num++);
lock.notify();
try {
if (num <= 10) lock.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {
try {
lock.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}
}
生产者消费者(wait/notify 实现)
public class ProducerConsumer {
private final LinkedList<Integer> buffer = new LinkedList<>();
private final int capacity = 5;
public static void main(String[] args) {
ProducerConsumer pc = new ProducerConsumer();
new Thread(pc::produce, "Producer").start();
new Thread(pc::consume, "Consumer").start();
}
public void produce() {
int value = 0;
while (true) {
synchronized (buffer) {
while (buffer.size() == capacity) { // 队列满,等待
try {
System.out.println("队列已满,生产者等待...");
buffer.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
buffer.add(value);
System.out.println("生产: " + value++);
buffer.notifyAll(); // 唤醒消费者
}
try { Thread.sleep(500); } catch (InterruptedException e) {}
}
}
public void consume() {
while (true) {
synchronized (buffer) {
while (buffer.isEmpty()) { // 队列空,等待
try {
System.out.println("队列已空,消费者等待...");
buffer.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
int value = buffer.removeFirst();
System.out.println("消费: " + value);
buffer.notifyAll(); // 唤醒生产者
}
try { Thread.sleep(800); } catch (InterruptedException e) {}
}
}
}
3.2 Condition
基本使用(对应关系)
| Object 方法 | Condition 方法 | 说明 |
|---|---|---|
wait() |
await() |
释放锁并等待 |
notify() |
signal() |
唤醒一个线程 |
notifyAll() |
signalAll() |
唤醒所有线程 |
三个线程顺序输出 ABC(Condition 精准通知)
public class ConditionABC {
private final ReentrantLock lock = new ReentrantLock();
private final Condition conditionA = lock.newCondition();
private final Condition conditionB = lock.newCondition();
private final Condition conditionC = lock.newCondition();
private int flag = 1; // 1->A, 2->B, 3->C
public static void main(String[] args) {
ConditionABC demo = new ConditionABC();
new Thread(() -> demo.printA(), "Thread-A").start();
new Thread(() -> demo.printB(), "Thread-B").start();
new Thread(() -> demo.printC(), "Thread-C").start();
}
public void printA() {
lock.lock();
try {
for (int i = 0; i < 5; i++) {
while (flag != 1) { // 不是 A 的回合,等待
conditionA.await();
}
System.out.print("A");
flag = 2;
conditionB.signal(); // 精准唤醒 B
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
public void printB() {
lock.lock();
try {
for (int i = 0; i < 5; i++) {
while (flag != 2) {
conditionB.await();
}
System.out.print("B");
flag = 3;
conditionC.signal(); // 精准唤醒 C
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
public void printC() {
lock.lock();
try {
for (int i = 0; i < 5; i++) {
while (flag != 3) {
conditionC.await();
}
System.out.print("C");
flag = 1;
conditionA.signal(); // 精准唤醒 A
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
}
// 输出: ABCABCABCABCABC
Condition 实现生产者消费者(精准唤醒)
public class ProducerConsumerCondition {
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition(); // 生产者等待队列
private final Condition notEmpty = lock.newCondition(); // 消费者等待队列
private final LinkedList<Integer> buffer = new LinkedList<>();
private final int capacity = 5;
public static void main(String[] args) {
ProducerConsumerCondition pc = new ProducerConsumerCondition();
new Thread(pc::produce, "Producer").start();
new Thread(pc::consume, "Consumer").start();
}
public void produce() {
int value = 0;
while (true) {
lock.lock();
try {
while (buffer.size() == capacity) {
System.out.println("队列已满,生产者等待...");
notFull.await(); // 生产者进入 notFull 队列等待
}
buffer.add(value);
System.out.println("生产: " + value++);
notEmpty.signal(); // 只唤醒消费者,不唤醒其他生产者
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} finally {
lock.unlock();
}
try { Thread.sleep(500); } catch (InterruptedException e) {}
}
}
public void consume() {
while (true) {
lock.lock();
try {
while (buffer.isEmpty()) {
System.out.println("队列已空,消费者等待...");
notEmpty.await(); // 消费者进入 notEmpty 队列等待
}
int value = buffer.removeFirst();
System.out.println("消费: " + value);
notFull.signal(); // 只唤醒生产者
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} finally {
lock.unlock();
}
try { Thread.sleep(800); } catch (InterruptedException e) {}
}
}
}
Condition 相比 wait/notify 的优势: - 支持多个等待队列,可以精准唤醒指定类型的线程 - 生产者唤醒消费者,消费者唤醒生产者,避免无效唤醒