原创

如何正确使用线程池

温馨提示:
本文最后更新于 2024年03月16日,已超过 398 天没有更新。若文章内的图片失效(无法正常加载),请留言反馈或直接联系我

线程池是什么

线程池是一种管理和调度线程的资源池,它提供了一种异步执行并发任务的方式,并且可以在多任务之间合理分配和管理系统资源。当池内线程执行完任务,并不会立刻销毁,而是在池内存活一段时间,待有任务时可直接复用池内等待中线程,从而有效地较少了线程创建和销毁的系统开销

优点

  • 降低资源消耗:线程的创建和销毁对系统资源的消耗较大,这些时间可能比系统用来处理业务的时间还长,频繁的创建和销毁线程,可能会导致系统资源不足。线程池通过重用已存在的线程来减少这种开销
  • 提高响应速度:当有任务到达时,我们可以直接使用线程池内已存在的线程去执行该任务,而不需要耗时等待创建新线程,有效提高了系统的响应速度
  • 提高线程的可管理性:线程池将线程管理和执行任务进行分离,我们可以更方便地对线程进行统一的分配、调优和监控,避免无限制地创建线程导致的资源耗尽和系统不稳定问题
  • 提供高级功能:线程池通常提供了一些高级功能,如定时执行、定期执行以及控制线程数量等,这些功能使得线程池更加强大易用

核心参数

线程池共有7个核心参数,如下:

  • corePoolSize:线程池中最小的可运行线程数量。当提交任务时,如果当前线程数小于核心线程数,那么线程池会创建一个新的线程来处理任务,即使有线程空闲。这个参数决定了线程池中基本线程的数量
  • maximumPoolSize:线程池内允许存在的最大线程数量,当任务数量超过核心线程数且等待队列已满时,线程池会创建新的线程来处理任务,直到达到这个最大值。这个参数决定了线程池能够同时运行的最大线程数量
  • keepAliveTime:这是当线程池中的线程数量超过核心线程数时,多余的空闲线程在终止前等待新任务的最长时间。如果在这个时间内没有新任务到来,这些多余的线程将会被终止。这个参数有助于控制线程池中的线程数量,减少资源消耗
  • unit:与keepAliveTime配合使用,指定存活时间的单位,如TimeUnit.SECONDS表示秒
  • workQueue:等待队列,当任务数量大于核心线程数量时,新增任务会被添加到任务队列中进行等待,执行的是FIF0原则(先进先出),等待队列尽量指定最大长度
  • threadFactory:创建线程时使用的线程工厂,可以用来设置线程名、是否是守护线程等,JDK提供默认的Executors.defaultThreadFactory()
  • handler:拒绝策略,当队列放满且线程池达到最大线程数量时,新增任务的执行策略

拒绝策略

拒绝处理任务时的策略,系统提供了4种可选:

  • AbortPolicy:该策略是线程池的默认策略,它会在拒绝新任务时抛出RejectedExecutianException异常。这种策略通常用于紧急停止系统或通知人工干预
  • DiscardPolicy:该策略会直接丢弃新提交的任务并且不会有任何提示。适应于那些不重要或可重复提交的任务
  • CallerRunsPolicy:该策略如果新任务添加到线程池失败,那么调用方会自己去执行该任务,而不是丢弃或抛异常。可以作为一种自我调节机制,防止系统过载
  • DiscardOldestPolicy:该策略在新任务无法被接收时主动丢弃队列中存在时间最久的任务,为新任务腾出空间。适合于对任务有时效性要求的场景使用

自定义:除了以上拒绝策略外,我们也可以自己定义拒绝策略,实现RejectedExecutionHandler接口,具体的逻辑在rejectedExecution方法中去实现即可

在实际应用中,选择合适的拒绝策略对于系统的健壮性和用户体验至关重要。例如:

  • 对于一个高可用性的系统,可能需要使用CallerRunsPolicy来确保所有任务都能得到处理,即使这意味着调用者的性能会受到一定影响
  • 而对于一个对性能要求极高的系统,可能会选择AbortPolicy来快速失败并通知开发者进行干预

此外,在使用线程池时,还需要注意避免内存溢出和过度切换的问题,合理配置线程池的参数,以及考虑线程池的池化设计思想,以确保系统的稳定性和效率

工作队列

线程池设计允许使用不同类型的工作队列来适应不同的应用场景和需求。下面是一些常用的工作队列类型:

  • SynchronousQueue:不存储元素的阻塞队列,当一个线程试图添加元素时,必须有另一个线程等待接收该元素
  • ArrayBlockingQueue:基于数组实现的阻塞队列,按照FIFO排序。当队列为空时,从队列中获取元素的操作会被阻塞;当队列已满时,往队列中添加元素的操作会被阻塞
  • LinkedBlockingQueue:基于链表的阻塞队列,按照FIFO排序。默认是长度Integer.MAX_VALUE,因此在任务数量大且执行时间长时,不设置队列长度可以防止任务丢失,但同时也可能会导致内存溢出
  • DelayedWorkQueue:支持延迟元素的无界阻塞队列,根据指定的执行时间从小到大排序,否则根据插入顺序排序
  • PriorityBlockingQueue:支持优先级的无界阻塞队列,会对任务进行管理和排序,保证优先级高的任务先执行

每种工作队列都有其特定的用途和优势。选择合适的工作队列可以帮助提高线程池的性能和效率,更好地满足不同任务的执行需求。

有几种创建方式

1.使用Executors工具类

使用Executors工具类提供的不同静态方法创建线程池,如下所示

  • Executors.newSingleThreadExecutor():创建一个单线程的线程池,该线程只有一个工作线程来执行任务,其它任务都会进入队列进行排队等待
  • Executors.newFixedThreadPool():创建一个固定大小的线程池,超过线程池大小的任务会进入队列(无界队列)中等待
  • Executors.newCachedThreadPool():创建一个可缓存线程池,该线程池的线程数量不固定,当有任务加入执行时,如果线程池中有空闲线程时直接使用空闲线程,否则就新建线程。该线程池适合短生命周期的异步任务,需要注意控制任务的数量,否则很有会造成系统瘫痪
  • Executors.newScheduledThreadPool():创建一个可定时执行的线程池,该线程池可以执行定时或周期性任务

注意:一般禁止使用该方式创建线程池,推荐使用手动创建线程池

2.通过ThreadPoolExecutor手动创建

这种方式允许开发者自定义线程池的参数,如核心线程数、最大线程数、线程空闲时间等

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 2, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));

execute和submit区别

线程池一般有execute和submit两种方法来提交任务:

  • execute 用于提交不需要返回值的任务
  • submit 用于提交需要返回值的任务,线程池会返回一个future类型的对象,通过这个future对象可以判断是否执行成功,并且可以通过future的get()方法来获取返回值
public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService executor = Executors.newSingleThreadExecutor();
    executor.execute(() -> log.info("线程已经执行了..."));
    Future<Integer> result = executor.submit(() -> 10);
    log.info("执行结果: {}", result.get());
}

工作流程

线程池工作流程图如下:
流程图

关闭线程池

shutdow()和shutdowNow()是Java中Runtime类的两个方法,用于关闭Java虚拟机

  • shutdown():当调用该方法,线程池会停止接受新任务,但它不会立即关闭。它会等待所有已经提交的任务(包括正在执行的和队列中等待的)执行完成后再关闭,如果某个线程无法正常结束,那么这个线程会被强制终止。这个方法不会立即停止JVM,而是等待所有线程执行完毕后才会关闭JVM
  • shutdownNow():调用该方法,会尝试立即关闭线程池,它不会等待当前正在执行的任务完成,而是会尝试强制终止所有正在运行的线程,包括守护线程和非守护线程。这个方法会返回一个包含未执行完毕的线程的列表,可以通过这个列表来查看哪些线程没有被正确关闭

总结:shutdown()方法提供了一种平滑关闭线程池的方式,允许正在执行的任务继续运行直到完成,而shutdownNow()则是一种更加直接的方法,它会尝试立即停止所有活动,并且不保证正在执行的任务能够正常完成

线程池状态

线程池的状态转换是为了更好地管理和控制线程的生命周期,确保资源的有效利用和应用程序的稳定性。线程池主要有以下五种状态:

  • RUNNING:当线程池被创建后,它就会处于RUNNING状态,这是线程池的初始状态,该状态下,线程池会接收新任务并处理已添加的任务
  • SHUTDOWN:该状态下,线程池不再接受新的任务,但是会执行完已提交的任务。这通常是通过调用shutdown()方法实现的
  • STOP:线程池会停止处理正在执行的任务,并且不再处理队列中的任务,也不会接受新的任务。这通常是通过调用shutdownNow()方法实现的
  • TIDYING:该状态表示线程池正在关闭中,线程池不会再接受新的任务或者执行任何操作,所有的任务都已经完成并且所有线程都已经终止
  • TERMINATED:这是线程池的最终状态,表示所有的任务都已经完成,线程池中的线程也已经全部终止
    线程池状态

异常处理

使用线程池处理任务的适合,任务代码可能抛出运行时异常,此时,线程池可能会捕获该异常,也可能创建新的线程来替换异常线程,导致我们可能无法感知任务异常信息,因此我们需要考虑线程池异常情况

常见的异常处理方式有以下几种

1.手动捕获:

在线程任务代码中,使用try-catch来捕获和处理异常。这是最基本的异常处理方式,可以确保单个任务中的异常不会影响到其他任务的执行

@Slf4j
public class ThreadException extends Thread {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.execute(new ThreadException());
        executor.shutdown();
    }

    @Override
    public void run() {
        try {
            int i = 1 / 0;
        }catch (Exception e){
            log.error("任务执行异常", e);
        }
    }
}

2.实现Thread.UncaughtExceptionHandler接口

通过实现Thread.UncaughtExceptionHandler接口来处理线程池中未捕获的异常。当线程中发生未捕获的异常时,调用该接口的uncaughtException方法处理异常

@Slf4j
public class ThreadExceptionHandler implements Thread.UncaughtExceptionHandler {
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        log.error("{} 线程任务异常", t.getName(), e);
    }
}
@Slf4j
public class ThreadException extends Thread {
    public static void main(String[] args) {
        ThreadException thread = new ThreadException();
        thread.setUncaughtExceptionHandler(new ThreadExceptionHandler());
        thread.start();
    }

    @Override
    public void run() {
        int i = 1 / 0;
    }
}

3.自定义线程执行器

自定义线程执行器,继承ThreadPoolExecutor类,并重写了afterExecute方法

@Slf4j
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    protected void afterExecute(Runnable runnable, Throwable throwable){
        if (Objects.nonNull(throwable)){
            log.error("线程任务异常", throwable);
        }
        super.afterExecute(runnable, throwable);
    }
}
@Slf4j
public class ThreadException extends Thread {
    public static void main(String[] args) {
        MyThreadPoolExecutor executor = new MyThreadPoolExecutor(1, 1, 0, TimeUnit.MINUTES, new LinkedBlockingQueue<>(10));
        executor.execute(new ThreadException());
        executor.shutdown();
    }

    @Override
    public void run() {
        int i = 1 / 0;
    }
}

4.使用submit方法提交任务

通过返回的Future对象来获取任务的结果或异常。如果在线程中抛出了异常,通过调用future.get()方法来捕获到这个异常。这样,即使在线程中发生了异常,也可以在主线程中进行处理

@Slf4j
public class ThreadException {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        Future<?> future = executor.submit(() -> {
            throw new RuntimeException("模拟异常");
        });
        try {
            future.get(); // 获取任务结果或捕获异常
        } catch (Exception e) {
            log.error("任务执行异常", e);
        }
        // 关闭线程池
        executor.shutdown();
    }
}

如何定义线程池大小

以下是定义线程池大小时可以考虑的几个关键点:

  • CPU核心数:线程池的大小通常与CPU核心数有关。理想情况下,每个CPU核心分配一个线程,以充分利用CPU资源
  • 任务类型:如果任务是计算密集型的,建议N+1,最大化CPU的使用效率。如果任务是I/O密集型的,CPU相对比较空闲,建议2N+1
  • 系统资源:系统资源的可用性也会影响线程池的大小。需要确保有足够的内存来支持额外的线程栈和其他资源
  • 任务执行时间:如果任务的执行时间较短,可以适当增加线程数以提高吞吐量。反之,应减少线程数以避免过多的上下文切换
  • 动态调整:根据系统当前负载动态调整线程池的大小。例如使用ThreadPoolExecutorsetCorePoolSizesetMaximumPoolSize方法
  • 公式计算:有一种理论计算公式,((线程等待时间+线程CPU时间)/线程CPU时间)*CPU核数。但实际情况下线程等待时间和线程CPU时间不好测量,实际应用时还需要根据具体情况进行调整
  • 避免过多线程:过多的线程会导致上下文切换的开销增大,反而降低系统的整体性能。因此并不是线程越多越好
  • 性能测试:线程数的设置可以根据实际的压测,监控JVM的线程情况以及CPU的负载情况,根据实际情况创建合理的线程数,充分利用资源

综上:定义线程池大小是一个复杂的过程,需要综合考虑多种因素,包括任务的性质、CPU核心数量以及系统资源等,并根据实际情况通过性能测试来找到最佳的线程池大小,甚至可能在系统运行中通过监控系统性能指标进行动态调整

正文到此结束
本文目录