Java如何实现线程通信

发布时间:2022-05-26 15:48:39 作者:iii
来源:亿速云 阅读:96

Java如何实现线程通信

在多线程编程中,线程通信是一个非常重要的概念。线程通信指的是多个线程之间通过某种机制来交换信息或协调工作。Java提供了多种方式来实现线程通信,本文将介绍几种常见的方法。

1. 使用wait()notify()/notifyAll()

wait()notify()notifyAll()是Java中用于线程通信的基本方法。这些方法必须在同步块或同步方法中使用,因为它们依赖于对象的监视器锁。

1.1 wait()

wait()方法使当前线程进入等待状态,直到其他线程调用notify()notifyAll()方法唤醒它。调用wait()后,当前线程会释放对象的锁。

synchronized (obj) {
    while (conditionIsNotMet) {
        obj.wait();
    }
    // 执行操作
}

1.2 notify()notifyAll()

notify()方法唤醒在此对象监视器上等待的单个线程,而notifyAll()方法唤醒所有等待的线程。

synchronized (obj) {
    // 改变条件
    obj.notify();  // 或者 obj.notifyAll();
}

1.3 示例

class SharedResource {
    private boolean flag = false;

    public synchronized void produce() {
        while (flag) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        flag = true;
        System.out.println("Produced");
        notify();
    }

    public synchronized void consume() {
        while (!flag) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        flag = false;
        System.out.println("Consumed");
        notify();
    }
}

public class WaitNotifyExample {
    public static void main(String[] args) {
        SharedResource resource = new SharedResource();

        Thread producer = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                resource.produce();
            }
        });

        Thread consumer = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                resource.consume();
            }
        });

        producer.start();
        consumer.start();
    }
}

2. 使用BlockingQueue

BlockingQueue是Java并发包中的一个接口,它提供了线程安全的队列操作。BlockingQueue可以用于实现生产者-消费者模式,从而简化线程通信。

2.1 示例

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

class Producer implements Runnable {
    private BlockingQueue<Integer> queue;

    public Producer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                queue.put(i);
                System.out.println("Produced: " + i);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Consumer implements Runnable {
    private BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                int value = queue.take();
                System.out.println("Consumed: " + value);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class BlockingQueueExample {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);

        Thread producer = new Thread(new Producer(queue));
        Thread consumer = new Thread(new Consumer(queue));

        producer.start();
        consumer.start();
    }
}

3. 使用LockCondition

LockCondition是Java 5引入的并发工具,提供了比synchronized更灵活的线程通信机制。

3.1 示例

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class SharedResource {
    private boolean flag = false;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void produce() {
        lock.lock();
        try {
            while (flag) {
                condition.await();
            }
            flag = true;
            System.out.println("Produced");
            condition.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void consume() {
        lock.lock();
        try {
            while (!flag) {
                condition.await();
            }
            flag = false;
            System.out.println("Consumed");
            condition.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

public class LockConditionExample {
    public static void main(String[] args) {
        SharedResource resource = new SharedResource();

        Thread producer = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                resource.produce();
            }
        });

        Thread consumer = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                resource.consume();
            }
        });

        producer.start();
        consumer.start();
    }
}

4. 使用Semaphore

Semaphore是一种计数信号量,用于控制对共享资源的访问。它可以用于实现线程间的通信和同步。

4.1 示例

import java.util.concurrent.Semaphore;

class SharedResource {
    private Semaphore semaphore = new Semaphore(1);

    public void produce() {
        try {
            semaphore.acquire();
            System.out.println("Produced");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release();
        }
    }

    public void consume() {
        try {
            semaphore.acquire();
            System.out.println("Consumed");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release();
        }
    }
}

public class SemaphoreExample {
    public static void main(String[] args) {
        SharedResource resource = new SharedResource();

        Thread producer = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                resource.produce();
            }
        });

        Thread consumer = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                resource.consume();
            }
        });

        producer.start();
        consumer.start();
    }
}

5. 使用CountDownLatch

CountDownLatch是一种同步工具,允许一个或多个线程等待其他线程完成操作。

5.1 示例

import java.util.concurrent.CountDownLatch;

class Worker implements Runnable {
    private CountDownLatch latch;

    public Worker(CountDownLatch latch) {
        this.latch = latch;
    }

    @Override
    public void run() {
        System.out.println("Worker is working");
        latch.countDown();
    }
}

public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(3);

        Thread worker1 = new Thread(new Worker(latch));
        Thread worker2 = new Thread(new Worker(latch));
        Thread worker3 = new Thread(new Worker(latch));

        worker1.start();
        worker2.start();
        worker3.start();

        latch.await();
        System.out.println("All workers have finished");
    }
}

6. 使用CyclicBarrier

CyclicBarrier是一种同步工具,允许一组线程互相等待,直到所有线程都到达某个屏障点。

6.1 示例

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

class Task implements Runnable {
    private CyclicBarrier barrier;

    public Task(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    public void run() {
        try {
            System.out.println("Task is running");
            barrier.await();
            System.out.println("Task has finished");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

public class CyclicBarrierExample {
    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(3, () -> {
            System.out.println("All tasks have reached the barrier");
        });

        Thread task1 = new Thread(new Task(barrier));
        Thread task2 = new Thread(new Task(barrier));
        Thread task3 = new Thread(new Task(barrier));

        task1.start();
        task2.start();
        task3.start();
    }
}

7. 使用Exchanger

Exchanger是一种同步工具,允许两个线程在某个点交换数据。

7.1 示例

import java.util.concurrent.Exchanger;

class Producer implements Runnable {
    private Exchanger<String> exchanger;

    public Producer(Exchanger<String> exchanger) {
        this.exchanger = exchanger;
    }

    @Override
    public void run() {
        try {
            String data = "Produced Data";
            System.out.println("Producer is sending: " + data);
            String receivedData = exchanger.exchange(data);
            System.out.println("Producer received: " + receivedData);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Consumer implements Runnable {
    private Exchanger<String> exchanger;

    public Consumer(Exchanger<String> exchanger) {
        this.exchanger = exchanger;
    }

    @Override
    public void run() {
        try {
            String data = "Consumed Data";
            System.out.println("Consumer is sending: " + data);
            String receivedData = exchanger.exchange(data);
            System.out.println("Consumer received: " + receivedData);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class ExchangerExample {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();

        Thread producer = new Thread(new Producer(exchanger));
        Thread consumer = new Thread(new Consumer(exchanger));

        producer.start();
        consumer.start();
    }
}

8. 使用Phaser

Phaser是一种灵活的同步工具,允许线程分阶段地执行任务。

8.1 示例

import java.util.concurrent.Phaser;

class Task implements Runnable {
    private Phaser phaser;

    public Task(Phaser phaser) {
        this.phaser = phaser;
    }

    @Override
    public void run() {
        System.out.println("Task is running");
        phaser.arriveAndAwaitAdvance();
        System.out.println("Task has finished");
    }
}

public class PhaserExample {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(3);

        Thread task1 = new Thread(new Task(phaser));
        Thread task2 = new Thread(new Task(phaser));
        Thread task3 = new Thread(new Task(phaser));

        task1.start();
        task2.start();
        task3.start();
    }
}

9. 使用FutureCallable

FutureCallable是Java中用于异步计算的工具。Callable可以返回一个结果,而Future用于获取该结果。

9.1 示例

import java.util.concurrent.*;

class Task implements Callable<String> {
    @Override
    public String call() throws Exception {
        Thread.sleep(1000);
        return "Task Result";
    }
}

public class FutureExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<String> future = executor.submit(new Task());

        System.out.println("Waiting for result...");
        String result = future.get();
        System.out.println("Result: " + result);

        executor.shutdown();
    }
}

10. 使用CompletableFuture

CompletableFuture是Java 8引入的异步编程工具,提供了更强大的功能和更灵活的线程通信机制。

10.1 示例

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Task Result";
        });

        System.out.println("Waiting for result...");
        String result = future.get();
        System.out.println("Result: " + result);
    }
}

总结

Java提供了多种方式来实现线程通信,每种方式都有其适用的场景。wait()notify()是最基本的线程通信机制,适用于简单的同步场景。BlockingQueueLockConditionSemaphoreCountDownLatchCyclicBarrierExchangerPhaserFutureCallableCompletableFuture等工具则提供了更高级的线程通信和同步功能,适用于更复杂的并发场景。

在实际开发中,选择合适的线程通信机制可以大大提高程序的性能和可维护性。

推荐阅读:
  1. java代码实现线程通信
  2. Java多线程-线程通信

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

java

上一篇:Android进入Activity时怎么禁止弹出软键盘输入法

下一篇:.Net行为型设计模式之备忘录模式怎么实现

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》