怎么连接JAVA高并发的线程和线程池

发布时间:2021-10-29 16:25:02 作者:iii
来源:亿速云 阅读:133

这篇文章主要讲解了“怎么连接JAVA高并发的线程和线程池”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“怎么连接JAVA高并发的线程和线程池”吧!

1 JAVA线程的实现原理

怎么连接JAVA高并发的线程和线程池

2 JAVA线程的生命周期

怎么连接JAVA高并发的线程和线程池
怎么连接JAVA高并发的线程和线程池

3 JAVA线程的几种常用方法

「线程启动函数」

//Thread.java //调用start启动线程,进入Runnable状态,等待系统调度执行 public synchronized void start(){//synchronized同步执行     if (threadStatus != 0) //0 代表new状态,非0则抛出错误             throw new IllegalThreadStateException();     ...     start0(); //本地方法方法 private native void start0()     ... } //Running状态,新线程执行的代码方法,可被子类重写 public void run() {     if (target != null) {         //target是Runnable,new Thread(Runnable)时传入         target.run();      } }

「线程终止函数」

//Thread.java @Deprecated public final void stop(); //中断线程 public void interrupt() //判断的是当前线程是否处于中断状态 public static boolean interrupted()
//Thread.java //阻塞等待其他线程 public final synchronized void join(final long millis) //暂时让出CPU执行 public static native void yield(); //休眠一段时间 public static native void sleep(long millis) throws InterruptedException;

start与run方法的区别

4  线程池及其优点

5 JDK封装的线程池

怎么连接JAVA高并发的线程和线程池

//ThreadPoolExecutor.java public ThreadPoolExecutor(     int corePoolSize,      int maximumPoolSize,     long keepAliveTime,     TimeUnit unit,     BlockingQueue<Runnable> workQueue,     ThreadFactory threadFactory,     RejectedExecutionHandler handler)

6 线程池原理之执行流程

怎么连接JAVA高并发的线程和线程池

//ThreadPoolExecutor.java public void execute(Runnable command) {     ...     if (workerCountOf(c) < corePoolSize) { //plan A         if (addWorker(command, true))               return;         c = ctl.get();     }     if (isRunning(c) && workQueue.offer(command)) { //plan B         int recheck = ctl.get();         if (! isRunning(recheck) && remove(command))             reject(command);         else if (workerCountOf(recheck) == 0)             addWorker(null, false);     }     //addWorker(command, false) false代表可创建非核心线程执行任务     else if (!addWorker(command, false)) //plan C         reject(command);    // //plan D }

plan A:任务的execute,先判断核心线程数量达到上限;否,则创建核心线程来执行任务;是,则执行plan B

plan B:当任务数大于核心数时,任务被加入阻塞队列,如果超过阻塞队列的容量上限,执行C

plan C: 阻塞队列不能接受任务时,且设置的maximumPoolSize大于corePoolSize,创建新的非核心线程执行任务

plan D:当plan A、B、C都无能为力时,使用拒绝策略处理

7 阻塞队列的简单了解

操作方法抛出异常阻塞线程返回特殊值超时退出
插入元素add(e)put(e)offer(e)offer(e, timeout, unit)
移除元素remove()take()poll()pull(timeout, unit)
检查element()peek()

「ArrayBlockingQueue」

「LinkedBlockingQueue」

「PriorityBlockingQueue」

「DelayQueue」

「SynchronousQueue」

「LinkedTransferQueue」

「LinkedBlockingDeque」

8 Executors的四种线程池浅析

「newFixedThreadPool」

指定核心线程数,队列是LinkedBlockingQueue无界阻塞队列,永远不可能拒绝任务;适合用在稳定且固定的并发场景,建议线程设置为CPU核数

//Executors.java public static ExecutorService newFixedThreadPool(int nThreads) {     return new ThreadPoolExecutor(nThreads, nThreads,                         0L, TimeUnit.MILLISECONDS,                         new LinkedBlockingQueue<Runnable>()); }

「newCachedThreadPool」

核心池大小为0,线程池最大线程数为最大整型,任务提交先加入到阻塞队列中,非核心线程60s没任务执行则销毁,阻塞队列为SynchronousQueue。newCachedThreadPool会不断的创建新线程来执行任务,不建议用

//Executors.java public static ExecutorService newCachedThreadPool() {     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                   60L, TimeUnit.SECONDS,                   new SynchronousQueue<Runnable>()); }

「newScheduledThreadPool」

ScheduledThreadPoolExecutor(STPE)其实是ThreadPoolExecutor的子类,可指定核心线程数,队列是STPE的内部类DelayedWorkQueue。「STPE的好处是  A 延时可执行任务,B 可执行带有返回值的任务」

//Executors.java public ScheduledThreadPoolExecutor(int corePoolSize,                                    ThreadFactory threadFactory) {     super(corePoolSize, Integer.MAX_VALUE,           DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,           new DelayedWorkQueue(), threadFactory); } //指定延迟执行时间     public <V> ScheduledFuture<V>  schedule(Callable<V> callable, long delay, TimeUnit unit)

「newSingleThreadExecutor」

和newFixedThreadPool构造方法一致,不过线程数被设置为1了。SingleThreadExecutor比new个线程的好处是;「线程运行时抛出异常的时候会有新的线程加入线程池完成接下来的任务;阻塞队列可以保证任务按FIFO执行」

//Executors.java public static ExecutorService newSingleThreadExecutor() {     return new FinalizableDelegatedExecutorService         (new ThreadPoolExecutor(1, 1,                   0L, TimeUnit.MILLISECONDS,                   new LinkedBlockingQueue<Runnable>())); //无界队列 }

9  如果优雅地关闭线程池线程池

public List<Runnable> shutdownNow() {     ...     final ReentrantLock mainLock = this.mainLock;     mainLock.lock(); //加锁     try {         checkShutdownAccess();         advanceRunState(STOP);         interruptWorkers(); //interrupt关闭线程         tasks = drainQueue(); //未执行任务     ...

10 线程池为什么使用的是阻塞队列

先考虑下为啥线程池的线程不会被释放,它是怎么管理线程的生命周期的呢

//ThreadPoolExecutor.Worker.class final void runWorker(Worker w) {     ...     //工作线程会进入一个循环获取任务执行的逻辑     while (task != null || (task = getTask()) != null)     ... }  private Runnable getTask(){     ...     Runnable r = timed ?          workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)          : workQueue.take(); //线程会阻塞挂起等待任务,     ...     }

可以看出,无任务执行时,线程池其实是利用阻塞队列的take方法挂起,从而维持核心线程的存活

11 线程池的worker继承AQS的意义

//Worker class,一个worker一个线程 Worker(Runnable firstTask) {     //禁止新线程未开始就被中断     setState(-1); // inhibit interrupts until runWorker     this.firstTask = firstTask;     this.thread = getThreadFactory().newThread(this); }  final void runWorker(Worker w) {     ....     //对应构造Worker是的setState(-1)     w.unlock(); // allow interrupts     boolean completedAbruptly = true;         ....         w.lock(); //加锁同步         ....         try {             ...             task.run();             afterExecute(task, null);         } finally {             ....             w.unlock(); //释放锁         }

worker继承AQS的意义:A 禁止线程未开始就被中断;B 同步runWorker方法的处理逻辑

12 拒绝策略

❝A handler for rejected tasks that runs the rejected task directly in the  calling thread of the {@code execute} method, unless the executor has been shut  down, in which case the task is discarded.❞

如果任务被拒绝了,则由「提交任务的线程」执行此任务

13 ForkJoinPool了解一波

怎么连接JAVA高并发的线程和线程池
怎么连接JAVA高并发的线程和线程池
怎么连接JAVA高并发的线程和线程池

work-stealing工作窃取算法

怎么连接JAVA高并发的线程和线程池

当线程执行完自己deque的任务,且其他线程deque还有多的任务,则会启动窃取策略,从其他线程deque队尾获取线程

public class ForkJoinPoolTest {     public static void main(String[] args) throws ExecutionException, InterruptedException {         ForkJoinPool forkJoinPool = new ForkJoinPool();         for (int i = 0; i < 10; i++) {             ForkJoinTask task = forkJoinPool.submit(new Fibonacci(i));             System.out.println(task.get());         }     }     static class Fibonacci extends RecursiveTask<Integer> {         int n;         public Fibonacci(int n) {  this.n = n;  }         @Override         protected Integer compute() {             if (n <= 1) { return n; }             Fibonacci fib1 = new Fibonacci(n - 1);             fib1.fork(); //相当于开启新线程执行             Fibonacci fib2 = new Fibonacci(n - 2);             fib2.fork(); //相当于开启新线程执行             return fib1.join() + fib2.join(); //合并阻塞返回结果         }     } }

感谢各位的阅读,以上就是“怎么连接JAVA高并发的线程和线程池”的内容了,经过本文的学习后,相信大家对怎么连接JAVA高并发的线程和线程池这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!

推荐阅读:
  1. Java的线程池
  2. java高并发中进程和线程是什么

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

java

上一篇:如何解析ASP.NET Cache

下一篇:Mysql数据分组排名实现的示例分析

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》