一个公式看懂:为什么Dubbo线程池会打满

0 文章概述

大家可能都遇到过DUBBO线程池打满这个问题,刚开始遇到这个问题可能会比较慌,常见方案可能就是重启服务,但也不知道重启是否可以解决。我认为重启不仅不能解决问题,甚至有可能加剧问题,这是为什么呢?本文我们就一起分析DUBBO线程池打满这个问题。

成都创新互联专业为企业提供封丘网站建设、封丘做网站、封丘网站设计、封丘网站制作等企业网站建设、网页设计与制作、封丘企业网站模板建站服务,十多年封丘做网站经验,不只是建网站,更提供有价值的思路和整体网络服务。

1 基础知识

1.1 DUBBO线程模型

1.1.1 基本概念

DUBBO底层网络通信采用Netty框架,我们编写一个Netty服务端进行观察:

public class NettyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyServerHandler());
}
});
ChannelFuture channelFuture = bootstrap.bind(7777).sync();
System.out.println("服务端准备就绪");
channelFuture.channel().closeFuture().sync();
} catch (Exception ex) {
System.out.println(ex.getMessage());
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

BossGroup线程组只有一个线程处理客户端连接请求,连接完成后将完成三次握手的SocketChannel连接分发给WorkerGroup处理读写请求,这两个线程组被称为「IO线程」。

我们再引出「业务线程」这个概念。服务生产者接收到请求后,如果处理逻辑可以快速处理完成,那么可以直接放在IO线程处理,从而减少线程池调度与上下文切换。但是如果处理逻辑非常耗时,或者会发起新IO请求例如查询数据库,那么必须派发到业务线程池处理。

DUBBO提供了多种线程模型,选择线程模型需要在配置文件指定dispatcher属性:






不同线程模型在选择是使用IO线程还是业务线程,DUBBO官网文档说明:

all
所有消息都派发到业务线程池,包括请求,响应,连接事件,断开事件,心跳

direct
所有消息都不派发到业务线程池,全部在IO线程直接执行

message
只有请求响应消息派发到业务线程池,其它连接断开事件,心跳等消息直接在IO线程执行

execution
只有请求消息派发到业务线程池,响应和其它连接断开事件,心跳等消息直接在IO线程执行

connection
在IO线程上将连接断开事件放入队列,有序逐个执行,其它消息派发到业务线程池

all所有消息都派发到业务线程池,包括请求,响应,连接事件,断开事件,心跳direct所有消息都不派发到业务线程池,全部在IO线程直接执行message只有请求响应消息派发到业务线程池,其它连接断开事件,心跳等消息直接在IO线程执行execution只有请求消息派发到业务线程池,响应和其它连接断开事件,心跳等消息直接在IO线程执行connection在IO线程上将连接断开事件放入队列,有序逐个执行,其它消息派发到业务线程池

1.1.2 确定时机

生产者和消费者在初始化时确定线程模型:

// 生产者
public class NettyServer extends AbstractServer implements Server {
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
}

// 消费者
public class NettyClient extends AbstractClient {
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}
}

生产者和消费者默认线程模型都会使用AllDispatcher,ChannelHandlers.wrap方法可以获取Dispatch自适应扩展点。如果我们在配置文件中指定dispatcher,扩展点加载器会从URL获取属性值加载对应线程模型。本文以生产者为例进行分析:

public class NettyServer extends AbstractServer implements Server {
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// ChannelHandlers.wrap确定线程策略
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
}

public class ChannelHandlers {
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)));
}
}

@SPI(AllDispatcher.NAME)
public interface Dispatcher {
@Adaptive({Constants.DISPATCHER_KEY, "channel.handler"})
ChannelHandler dispatch(ChannelHandler handler, URL url);
}

1.1.3 源码分析

我们分析其中两个线程模型源码,其它线程模型请阅读DUBBO源码。AllDispatcher模型所有消息都派发到业务线程池,包括请求,响应,连接事件,断开事件,心跳:

public class AllDispatcher implements Dispatcher {

// 线程模型名称
public static final String NAME = "all";

// 具体实现策略
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new AllChannelHandler(handler, url);
}
}


public class AllChannelHandler extends WrappedChannelHandler {

@Override
public void connected(Channel channel) throws RemotingException {
// 连接完成事件交给业务线程池
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() + " error when process connected event", t);
}
}

@Override
public void disconnected(Channel channel) throws RemotingException {
// 断开连接事件交给业务线程池
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {
throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event", t);
}
}

@Override
public void received(Channel channel, Object message) throws RemotingException {
// 请求响应事件交给业务线程池
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException) {
Request request = (Request)message;
if(request.isTwoWay()) {
String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() + " error when process received event", t);
}
}

@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
// 异常事件交给业务线程池
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {
throw new ExecutionException("caught event", channel, getClass() + " error when process caught event", t);
}
}
}

DirectDispatcher策略所有消息都不派发到业务线程池,全部在IO线程直接执行:

public class DirectDispatcher implements Dispatcher {

// 线程模型名称
public static final String NAME = "direct";

// 具体实现策略
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
// 直接返回handler表示所有事件都交给IO线程处理
return handler;
}
}

1.2 DUBBO线程池策略

1.2.1 基本概念

上个章节分析了线程模型,我们知道不同的线程模型会选择使用还是IO线程还是业务线程。如果使用业务线程池,那么使用什么线程池策略是本章节需要回答的问题。DUBBO官网线程派发模型图展示了线程模型和线程池策略的关系:

DUBBO提供了多种线程池策略,选择线程池策略需要在配置文件指定threadpool属性:





不同线程池策略会创建不同特性的线程池:

fixed
包含固定个数线程

cached
线程空闲一分钟会被回收,当新请求到来时会创建新线程

limited
线程个数随着任务增加而增加,但不会超过最大阈值。空闲线程不会被回收

eager
当所有核心线程数都处于忙碌状态时,优先创建新线程执行任务,而不是立即放入队列

fixed包含固定个数线程cached线程空闲一分钟会被回收,当新请求到来时会创建新线程limited线程个数随着任务增加而增加,但不会超过最大阈值。空闲线程不会被回收eager当所有核心线程数都处于忙碌状态时,优先创建新线程执行任务,而不是立即放入队列

1.2.2 确定时机

本文我们以AllDispatcher为例分析线程池策略在什么时候确定:

public class AllDispatcher implements Dispatcher {
public static final String NAME = "all";

@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new AllChannelHandler(handler, url);
}
}

public class AllChannelHandler extends WrappedChannelHandler {
public AllChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
}

在WrappedChannelHandler构造函数中如果配置指定了threadpool属性,扩展点加载器会从URL获取属性值加载对应线程池策略,默认策略为fixed:

public class WrappedChannelHandler implements ChannelHandlerDelegate {

public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;
this.url = url;
// 获取线程池自适应扩展点
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
componentKey = Constants.CONSUMER_SIDE;
}
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}
}

@SPI("fixed")
public interface ThreadPool {
@Adaptive({Constants.THREADPOOL_KEY})
Executor getExecutor(URL url);
}

1.2.3 源码分析

(1) FixedThreadPool
public class FixedThreadPool implements ThreadPool {

@Override
public Executor getExecutor(URL url) {

// 线程名称
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);

// 线程个数默认200
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);

// 队列容量默认0
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);

// 队列容量等于0使用阻塞队列SynchronousQueue
// 队列容量小于0使用无界阻塞队列LinkedBlockingQueue
// 队列容量大于0使用有界阻塞队列LinkedBlockingQueue
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue()
: (queues < 0 ? new LinkedBlockingQueue()
: new LinkedBlockingQueue(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
(2) CachedThreadPool
public class CachedThreadPool implements ThreadPool {

@Override
public Executor getExecutor(URL url) {

// 获取线程名称
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);

// 核心线程数默认0
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);

// 最大线程数默认Int最大值
int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);

// 队列容量默认0
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);

// 线程空闲多少时间被回收默认1分钟
int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);

// 队列容量等于0使用阻塞队列SynchronousQueue
// 队列容量小于0使用无界阻塞队列LinkedBlockingQueue
// 队列容量大于0使用有界阻塞队列LinkedBlockingQueue
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue()
: (queues < 0 ? new LinkedBlockingQueue()
: new LinkedBlockingQueue(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
(3) LimitedThreadPool
public class LimitedThreadPool implements ThreadPool {

@Override
public Executor getExecutor(URL url) {

// 获取线程名称
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);

// 核心线程数默认0
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);

// 最大线程数默认200
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);

// 队列容量默认0
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);

// 队列容量等于0使用阻塞队列SynchronousQueue
// 队列容量小于0使用无界阻塞队列LinkedBlockingQueue
// 队列容量大于0使用有界阻塞队列LinkedBlockingQueue
// keepalive时间设置Long.MAX_VALUE表示不回收空闲线程
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue()
: (queues < 0 ? new LinkedBlockingQueue()
: new LinkedBlockingQueue(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
(4) EagerThreadPool

我们知道ThreadPoolExecutor是普通线程执行器。当线程池核心线程达到阈值时新任务放入队列,当队列已满开启新线程处理,当前线程数达到最大线程数时执行拒绝策略。

但是EagerThreadPool自定义线程执行策略,当线程池核心线程达到阈值时,新任务不会放入队列而是开启新线程进行处理(要求当前线程数没有超过最大线程数)。当前线程数达到最大线程数时任务放入队列。

public class EagerThreadPool implements ThreadPool {

@Override
public Executor getExecutor(URL url) {

// 线程名
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);

// 核心线程数默认0
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);

// 最大线程数默认Int最大值
int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);

// 队列容量默认0
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);

// 线程空闲多少时间被回收默认1分钟
int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);

// 初始化自定义线程池和队列重写相关方法
TaskQueue taskQueue = new TaskQueue(queues <= 0 ? 1 : queues);
EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
threads,
alive,
TimeUnit.MILLISECONDS,
taskQueue,
new NamedInternalThreadFactory(name, true),
new AbortPolicyWithReport(name, url));
taskQueue.setExecutor(executor);
return executor;
}
}

1.3 一个公式

现在我们知道DUBBO会选择线程池策略进行业务处理,那么应该如何估算可

分享题目:一个公式看懂:为什么Dubbo线程池会打满
标题路径:http://www.shufengxianlan.com/qtweb/news31/395181.html

网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联