Java并发编程中线程池工作原理的示例分析

发布时间:2021-09-18 11:20:31 作者:柒染
来源:亿速云 阅读:132

Java并发编程中线程池工作原理的示例分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

线程池继承关系

Java并发编程中线程池工作原理的示例分析

ThreadPoolExecutor实现的顶层接口是Executor,在接口Executor中用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器Executor中,由Executor框架完成线程的调配和任务的执行部分。

ExecutorService接口增加了一些能力

AbstractExecutorService则是上层的抽象类

将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可

ThreadPoolExecutor实现最复杂的运行部分:

可以自动创建、管理和复用指定数量的一组线程,适用方只需提交任务即可线程安全,ThreadPoolExecutor内部有状态、核心线程数、非核心线程等属性,广泛使用了CAS和AQS锁机制避免并发带来的冲突问题

提供了核心线程、缓冲阻塞队列、非核心线程、抛弃策略的概念,可以根据实际应用场景进行组合使用

提供了beforeExecute 和afterExecute()可以支持对线程池的功能进行扩展

线程池的优点

降低线程创建和销毁线程造成的开销

构造函数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}


成员变量

/**
 * 任务阻塞队列 
 */
private final BlockingQueue<Runnable> workQueue; 
/**
 * 非公平的互斥锁(可重入锁)
 */
private final ReentrantLock mainLock = new ReentrantLock();
/**
 * 线程集合一个Worker对应一个线程,没有核心线程的说话,只有核心线程数
 */
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
 * 配合mainLock通过Condition能够更加精细的控制多线程的休眠与唤醒
 */
private final Condition termination = mainLock.newCondition();
/**
 * 线程池中线程数量曾经达到过的最大值。
 */
private int largestPoolSize;  
/**
 * 已完成任务数量
 */
private long completedTaskCount;
/**
 * ThreadFactory对象,用于创建线程。
 */
private volatile ThreadFactory threadFactory;  
/**
 * 拒绝策略的处理句柄
 * 现在默认提供了CallerRunsPolicy、AbortPolicy、DiscardOldestPolicy、DiscardPolicy
 */
private volatile RejectedExecutionHandler handler;
/**
 * 线程池维护线程(超过核心线程数)所允许的空闲时间
 */
private volatile long keepAliveTime;
/**
 * 允许线程池中的核心线程超时进行销毁
 */
private volatile boolean allowCoreThreadTimeOut;  
/**
 * 线程池维护线程的最小数量,哪怕是空闲的  
 */
private volatile int corePoolSize;
/**
 * 线程池维护的最大线程数量,线程数超过这个数量之后新提交的任务就需要进入阻塞队列
 */
private volatile int maximumPoolSize;

创建线程池

缓存程线程池(不会存放队列,一直创建线程)

核心线程数为0,总线程数量阈值为Integer.MAX_VALUE,即可以创建无限的非核心线程

newCachedThreadPool是一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。调用 execute() 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。注意,可以使用 ThreadPoolExecutor 构造方法创建具有类似属性但细节不同(例如超时参数)的线程池。

会出下面大量的线程对象,导致的OOM

public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                  60L, TimeUnit.SECONDS,
                  new SynchronousQueue<Runnable>());
}

执行流程

Java并发编程中线程池工作原理的示例分析


Java并发编程中线程池工作原理的示例分析

使用场景

执行大量短生命周期任务因为maximumPoolSize是无界的,所以提交任务的速度 > 线程池中线程处理任务的速度就要不断创建新线程;每次提交任务都会立即有线程去处理,因此CachedThreadPool适用于处理大量、耗时少的任务


单线程线程池(只会运行一个线程,否则一直会堆积到阻塞队列)

它适用于需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的应用场景,SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1,使用无界队列LinkedBlockingQueue作为线程池的工作队列

newSingleThreadExecutor 创建是一个单线程池,也就是该线程池只有一个线程在工作,所有的任务是串行执行的,如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它此线程池保证所有任务的执行顺序按照任务的提交顺序执行

使用场景:

**适用于串行执行任务场景**

会存在出现阻塞队列堆积过大,导致的OOM

public static ExecutorService newSingleThreadExecutor() {
  return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()));
}

固定大小线程池(会运行指定数量的线程,否则一直会堆积到阻塞队列)

 corePoolSize等于maximumPoolSize,所以线程池中只有核心线程,使用无界阻塞队列LinkedBlockingQueue作为工作队列

使用场景

适用于处理CPU密集型的任务,确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程,即适用执行长期的任务


newFixedThreadPool创建固定大小的线程池每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小,线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。当线程处于空闲状态时,他们并不会被回收,除非线程池被关闭。当所有的线程都处于活动状态时,新的任务都会处于等待状态,直到有线程空闲出来。

会存在出现阻塞队列堆积大,导致的OOM

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                  0L, TimeUnit.MILLISECONDS,
                  new LinkedBlockingQueue<Runnable>(),
                  threadFactory);
}

定时任务线程池(任务队列与最大值均为无限大小,一直堆积到阻塞队列)

newScheduledThreadPool 创建一个大小无限的线程池,此线程池支持定时以及周期性执行任务的需求。

线程总数阈值为Integer.MAX_VALUE,工作队列使用DelayedWorkQueue,非核心线程存活时间为0,所以线程池仅仅包含固定数目的核心线程。

会存在出现阻塞队列堆积过大,导致的OOM

public static ScheduledExecutorService newScheduledThreadPool(
    int corePoolSize, ThreadFactory threadFactory) {
  return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
                   ThreadFactory threadFactory) {
  super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
      new DelayedWorkQueue(), threadFactory);
}

可以看出来上面的方法一共使用了DelayedWorkQueueLinkedBlockingQueueSynchronousQueue。这个就是线程核心之一的阻塞队列。

两种方式提交任务:

任务阻塞队列

它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列;

SynchronousQueue

直接提交队列:设置为SynchronousQueue队列,SynchronousQueue是一个特殊的BlockingQueue,它没有容量,每执行一个插入操作就会阻塞,需要再执行一个删除操作才会被唤醒,反之每一个删除操作也都要等待对应的插入操作。

一个不存储元素的阻塞队列,每个插入操作,必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态


SynchronousQueue队列,提交的任务不会被保存,总是会马上提交执行。如果用于执行任务的线程数量小于maximumPoolSize,则尝试创建新的进程,如果达到maximumPoolSize设置的最大值,则根据你设置的handler执行拒绝策略。因此这种方式你提交的任务不会被缓存起来,而是会被马上执行,在这种情况下,你需要对你程序的并发量有个准确的评估,才能设置合适的maximumPoolSize数量,否则很容易就会执行拒绝策略;


ArrayBlockingQueue

有界的任务队列:有界的任务队列可以使用ArrayBlockingQueue实现,如下所示:
new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

使用ArrayBlockingQueue有界任务队列,若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,则会将新的任务加入到等待队列中。若等待队列已满,即超过ArrayBlockingQueue初始化的容量,则继续创建线程,直到线程数量达到maximumPoolSize设置的最大线程数量,若大于maximumPoolSize,则执行拒绝策略。


在这种情况下,线程数量的上限与有界任务队列的状态有直接关系,如果有界队列初始容量较大或者没有达到超负荷的状态,线程数将一直维持在corePoolSize以下,反之当任务队列已满时,则会以maximumPoolSize为最大线程数上限。

LinkedBlockingQueue

无界的任务队列:无界任务队列可以使用LinkedBlockingQueue实现,如下所示:
 new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

使用无界任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是你corePoolSize设置的数量,也就是说在这种情况下maximumPoolSize这个参数是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时,一定要注意你任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。 Java并发编程中线程池工作原理的示例分析


Java并发编程中线程池工作原理的示例分析

PriorityBlockingQueue

优先任务队列:优先任务队列通过PriorityBlockingQueue实现,使用平衡二叉树堆,实现的具有优先级的无界阻塞队列

任务会按优先级重新排列执行,且线程池的线程数一直为corePoolSize,也就是只有一个。

PriorityBlockingQueue其实是一个特殊的无界队列,它其中无论添加了多少个任务,线程池创建的线程数也不会超过corePoolSize的数量,只不过其他队列一般是按照先进先出的规则处理任务,而PriorityBlockingQueue队列可以自定义规则根据任务的优先级顺序先后执行

其实LinkedBlockingQueue也是可以设置界限的,它默认的界限是Integer.MAX_VALUE。同时也支持也支持构造的时候设置队列大小

DelayQueue

无界阻塞延迟队列,队列中每个元素均有过期时间,当从队列获取元素时,只有过期元素才会出队列。队列头元素是最块要过期的元素。

拒绝策略

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

当Executor已经关闭,即执行了executorService.shutdown()方法后,或者Executor将有限边界用于最大线程和工作队列容量,且已经饱和时。使用方法execute()提交的新任务将被拒绝. 在以上述情况下,execute方法将调用其RejectedExecutionHandler的rejectExecution()方法

RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor)

AbortPolicy(默认的拒绝策略)

也称为终止策略,遭到拒绝将抛出运行时RejectedExecutionException。业务方能通过捕获异常及时得到对本次任务提交的结果反馈。

public static class AbortPolicy implements RejectedExecutionHandler {
  public AbortPolicy() { }
  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
  }
}

CallerRunsPolicy

拥有自主反馈控制,让提交者执行提交任务,能够减缓新任务的提交速度。这种情况是需要让所有的任务都执行完毕。

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

DiscardPolicy

拒绝任务的处理程序,静默丢弃任务。使用此策略,我们可能无法感知系统的异常状态。慎用~!

public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

DiscardOldestPolicy

丢弃队列中最前面的任务,然后重新提交被拒绝的任务。是否要使用此策略需要看业务是否需要新老的替换,慎用~!(LRU)

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

运作执行流程

  1. 判断线程池中核心线程数是否已达阈值corePoolSize,若否,则创建一个新核心线程执行任务

  2. 若核心线程数已达阈值corePoolSize,判断阻塞队列workQueue是否已满,若未满,则将新任务添加进阻塞队列

  3. 若满,再判断,线程池中线程数是否达到阈值maximumPoolSize,若否,则新建一个非核心线程执行任务。若达到阈值,则执行线程池饱和策略。

线程池饱和策略分为一下几种:


Java并发编程中线程池工作原理的示例分析

合理配置线程池大小

 要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析:


根据任务所需要的cpu和io资源的量可以分为,

为了合理最大限度的使用系统资源同时也要保证的程序的高性能,可以给CPU密集型任务和IO密集型任务配置一些线程数。

看完上述内容,你们掌握Java并发编程中线程池工作原理的示例分析的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注亿速云行业资讯频道,感谢各位的阅读!

推荐阅读:
  1. Memcached工作原理的示例分析
  2. SpringMVC中工作原理的示例分析

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

java

上一篇:SEO优化需要注意些什么

下一篇:.NET5部署程序在Docker上运行

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》