您好,登录后才能下订单哦!
在多线程编程中,线程通信是一个非常重要的概念。线程通信指的是多个线程之间通过某种机制来交换信息或协调工作。Java提供了多种方式来实现线程通信,本文将介绍几种常见的方法。
wait()
和notify()
/notifyAll()
wait()
、notify()
和notifyAll()
是Java中用于线程通信的基本方法。这些方法必须在同步块或同步方法中使用,因为它们依赖于对象的监视器锁。
wait()
wait()
方法使当前线程进入等待状态,直到其他线程调用notify()
或notifyAll()
方法唤醒它。调用wait()
后,当前线程会释放对象的锁。
synchronized (obj) {
while (conditionIsNotMet) {
obj.wait();
}
// 执行操作
}
notify()
和notifyAll()
notify()
方法唤醒在此对象监视器上等待的单个线程,而notifyAll()
方法唤醒所有等待的线程。
synchronized (obj) {
// 改变条件
obj.notify(); // 或者 obj.notifyAll();
}
class SharedResource {
private boolean flag = false;
public synchronized void produce() {
while (flag) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
flag = true;
System.out.println("Produced");
notify();
}
public synchronized void consume() {
while (!flag) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
flag = false;
System.out.println("Consumed");
notify();
}
}
public class WaitNotifyExample {
public static void main(String[] args) {
SharedResource resource = new SharedResource();
Thread producer = new Thread(() -> {
for (int i = 0; i < 5; i++) {
resource.produce();
}
});
Thread consumer = new Thread(() -> {
for (int i = 0; i < 5; i++) {
resource.consume();
}
});
producer.start();
consumer.start();
}
}
BlockingQueue
BlockingQueue
是Java并发包中的一个接口,它提供了线程安全的队列操作。BlockingQueue
可以用于实现生产者-消费者模式,从而简化线程通信。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
class Producer implements Runnable {
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
queue.put(i);
System.out.println("Produced: " + i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable {
private BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
int value = queue.take();
System.out.println("Consumed: " + value);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class BlockingQueueExample {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
Thread producer = new Thread(new Producer(queue));
Thread consumer = new Thread(new Consumer(queue));
producer.start();
consumer.start();
}
}
Lock
和Condition
Lock
和Condition
是Java 5引入的并发工具,提供了比synchronized
更灵活的线程通信机制。
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class SharedResource {
private boolean flag = false;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void produce() {
lock.lock();
try {
while (flag) {
condition.await();
}
flag = true;
System.out.println("Produced");
condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void consume() {
lock.lock();
try {
while (!flag) {
condition.await();
}
flag = false;
System.out.println("Consumed");
condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class LockConditionExample {
public static void main(String[] args) {
SharedResource resource = new SharedResource();
Thread producer = new Thread(() -> {
for (int i = 0; i < 5; i++) {
resource.produce();
}
});
Thread consumer = new Thread(() -> {
for (int i = 0; i < 5; i++) {
resource.consume();
}
});
producer.start();
consumer.start();
}
}
Semaphore
Semaphore
是一种计数信号量,用于控制对共享资源的访问。它可以用于实现线程间的通信和同步。
import java.util.concurrent.Semaphore;
class SharedResource {
private Semaphore semaphore = new Semaphore(1);
public void produce() {
try {
semaphore.acquire();
System.out.println("Produced");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
public void consume() {
try {
semaphore.acquire();
System.out.println("Consumed");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
}
public class SemaphoreExample {
public static void main(String[] args) {
SharedResource resource = new SharedResource();
Thread producer = new Thread(() -> {
for (int i = 0; i < 5; i++) {
resource.produce();
}
});
Thread consumer = new Thread(() -> {
for (int i = 0; i < 5; i++) {
resource.consume();
}
});
producer.start();
consumer.start();
}
}
CountDownLatch
CountDownLatch
是一种同步工具,允许一个或多个线程等待其他线程完成操作。
import java.util.concurrent.CountDownLatch;
class Worker implements Runnable {
private CountDownLatch latch;
public Worker(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
System.out.println("Worker is working");
latch.countDown();
}
}
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
Thread worker1 = new Thread(new Worker(latch));
Thread worker2 = new Thread(new Worker(latch));
Thread worker3 = new Thread(new Worker(latch));
worker1.start();
worker2.start();
worker3.start();
latch.await();
System.out.println("All workers have finished");
}
}
CyclicBarrier
CyclicBarrier
是一种同步工具,允许一组线程互相等待,直到所有线程都到达某个屏障点。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
class Task implements Runnable {
private CyclicBarrier barrier;
public Task(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
System.out.println("Task is running");
barrier.await();
System.out.println("Task has finished");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
public class CyclicBarrierExample {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("All tasks have reached the barrier");
});
Thread task1 = new Thread(new Task(barrier));
Thread task2 = new Thread(new Task(barrier));
Thread task3 = new Thread(new Task(barrier));
task1.start();
task2.start();
task3.start();
}
}
Exchanger
Exchanger
是一种同步工具,允许两个线程在某个点交换数据。
import java.util.concurrent.Exchanger;
class Producer implements Runnable {
private Exchanger<String> exchanger;
public Producer(Exchanger<String> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
try {
String data = "Produced Data";
System.out.println("Producer is sending: " + data);
String receivedData = exchanger.exchange(data);
System.out.println("Producer received: " + receivedData);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable {
private Exchanger<String> exchanger;
public Consumer(Exchanger<String> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
try {
String data = "Consumed Data";
System.out.println("Consumer is sending: " + data);
String receivedData = exchanger.exchange(data);
System.out.println("Consumer received: " + receivedData);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ExchangerExample {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
Thread producer = new Thread(new Producer(exchanger));
Thread consumer = new Thread(new Consumer(exchanger));
producer.start();
consumer.start();
}
}
Phaser
Phaser
是一种灵活的同步工具,允许线程分阶段地执行任务。
import java.util.concurrent.Phaser;
class Task implements Runnable {
private Phaser phaser;
public Task(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
System.out.println("Task is running");
phaser.arriveAndAwaitAdvance();
System.out.println("Task has finished");
}
}
public class PhaserExample {
public static void main(String[] args) {
Phaser phaser = new Phaser(3);
Thread task1 = new Thread(new Task(phaser));
Thread task2 = new Thread(new Task(phaser));
Thread task3 = new Thread(new Task(phaser));
task1.start();
task2.start();
task3.start();
}
}
Future
和Callable
Future
和Callable
是Java中用于异步计算的工具。Callable
可以返回一个结果,而Future
用于获取该结果。
import java.util.concurrent.*;
class Task implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return "Task Result";
}
}
public class FutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(new Task());
System.out.println("Waiting for result...");
String result = future.get();
System.out.println("Result: " + result);
executor.shutdown();
}
}
CompletableFuture
CompletableFuture
是Java 8引入的异步编程工具,提供了更强大的功能和更灵活的线程通信机制。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Task Result";
});
System.out.println("Waiting for result...");
String result = future.get();
System.out.println("Result: " + result);
}
}
Java提供了多种方式来实现线程通信,每种方式都有其适用的场景。wait()
和notify()
是最基本的线程通信机制,适用于简单的同步场景。BlockingQueue
、Lock
和Condition
、Semaphore
、CountDownLatch
、CyclicBarrier
、Exchanger
、Phaser
、Future
和Callable
、CompletableFuture
等工具则提供了更高级的线程通信和同步功能,适用于更复杂的并发场景。
在实际开发中,选择合适的线程通信机制可以大大提高程序的性能和可维护性。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。