沧州做企业网站公司,网站收录查询方法,网站性能优化怎么做,潍坊百度网站优化一、Dubbo线程模型
首先明确一个基本概念#xff1a;IO 线程和业务线程的区别 IO 线程#xff1a;配置在netty 连接点的用于处理网络数据的线程#xff0c;主要处理编解码等直接与网络数据 打交道的事件。 业务线程#xff1a;用于处理具体业务逻辑的线程#xff0c;可以…一、Dubbo线程模型
首先明确一个基本概念IO 线程和业务线程的区别 IO 线程配置在netty 连接点的用于处理网络数据的线程主要处理编解码等直接与网络数据 打交道的事件。 业务线程用于处理具体业务逻辑的线程可以理解为自己在provider 上写的代码所执行的线 程环境。
Dubbo 默认采用的是长链接的方式即默认情况下一个consumer 和一个provider 之间只会建立 一条链接这种情况下IO 线程的工作就是编码和解码数据监听具体的数据请求直接通过Channel发布数据等等
有两个参数⽤来配置服务消费者和服务提供者直接的socket连接个数
1. shareconnections表示可共享的socket连接个数 2. connections表示不共享的socket连接个数
服务A的shareconnections或者connections为2时服务A的消费者会向服务A的提供者建⽴两个socket连接 业务线程就是处理IO 线程处理之后的数据业务线程并不知道任何跟网络相关的内容只是纯 粹的处理业务逻辑在业务处理逻辑的时候往往存在复杂的逻辑所以业务线程池的配置往往都要 比IO 线程池的配置大很多。 IO 线程部分Netty 服务提供方NettyServer 又使用了两级线程池master 主要用来接受客户 端的链接请求并把接受的请求分发给worker 来处理。整个过程如下图 IO 线程与业务线程的交互如下:
IO 线程的派发策略
默认是all所有消息都派发到线程池包括请求响应连接事件断开事件心跳等。即worker 线程接收到事件后将该事件提交到业务线程池中自己再去处理其他IO 事件。 directworker 线程接收到事件后由worker 执行到底。 message只有请求响应消息派发到线程池其它连接断开事件心跳等消息直接在IO 线程上执行 execution只有请求消息派发到线程池不含响应客户端线程池响应和其它连接断开事件心跳等消息直接在IO线程上执行 connection在IO 线程上将连接断开事件放入队列有序逐个执行其它消息派发到线程池。
业务线程池设置 fixed固定大小线程池启动时建立线程不关闭一直持有。(缺省) coresize200 maxsize200 队列SynchronousQueue 回绝策略AbortPolicyWithReport - 打印线程信息jstack之后抛出异常 cached缓存线程池空闲一分钟自动删除需要时重建。 limited可伸缩线程池但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。
配置示例 dubbo:protocol namedubbodispatcherallthreadpoolfixedthreads100/ 在整个消费者调用过程中各个线程池都比较重要其中比较有特色的就是AllChannelHandler它完成了IO线程转向用户线程的任务转移比较关键。
二、派发策略All源码解析
消费者启动的时候会执行DubboProtocol#initClient建议与服务端端的socket连接
private ExchangeClient initClient(URL url) {ExchangeClient client;try {// Replace InstanceAddressURL with ServiceConfigURL.url new ServiceConfigURL(DubboCodec.NAME, url.getUsername(), url.getPassword(), url.getHost(), url.getPort(), url.getPath(), url.getParameters());// connection should be lazyif (url.getParameter(LAZY_CONNECT_KEY, false)) {client new LazyConnectExchangeClient(url, requestHandler);} else {client Exchangers.connect(url, requestHandler);}} catch (RemotingException e) {throw new RpcException(Fail to create remoting client for service( url ): e.getMessage(), e);}return client;
}
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
Transporters#connect最终返回的是NettyClient,点进这个对象的构造方法
public Client connect(URL url, ChannelHandler handler) throws RemotingException {return new NettyClient(url, handler);
}
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.// the handler will be wrapped: MultiMessageHandler-HeartbeatHandler-handlersuper(url, wrapChannelHandler(url, handler));
}
NettyClient#wrapChannelHandler中再次利用SPI机制获取线程派发策略dubbo默认的策略为allDispatcher策略。
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {return new MultiMessageHandler(new HeartbeatHandler(url.getOrDefaultFrameworkModel().getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)));
}
public ChannelHandler dispatch(ChannelHandler handler, URL url) {return new AllChannelHandler(handler, url);
} 除了默认的AllDispatcher还有DirectDispatcherMessageOnlyDispatcher等。 当消费端接收到远程服务端的响应之后按照Netty的处理流程消息会在channel绑定的handler上传递netty底层会调用handler#received。可以看到connecteddisconnectedreceivedcaught等方法都是在新的线程池ExecutorService中执行executor.execute方法会将任务ChannelEventRunnable提交到ExecutorService中。
public class AllChannelHandler extends WrappedChannelHandler {public AllChannelHandler(ChannelHandler handler, URL url) {super(handler, url);}Overridepublic void connected(Channel channel) throws RemotingException {ExecutorService executor getSharedExecutorService();try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));} catch (Throwable t) {throw new ExecutionException(connect event, channel, getClass() error when process connected event ., t);}}Overridepublic void disconnected(Channel channel) throws RemotingException {ExecutorService executor getSharedExecutorService();try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));} catch (Throwable t) {throw new ExecutionException(disconnect event, channel, getClass() error when process disconnected event ., t);}}Overridepublic void received(Channel channel, Object message) throws RemotingException {ExecutorService executor getPreferredExecutorService(message);try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {if(message instanceof Request t instanceof RejectedExecutionException){sendFeedback(channel, (Request) message, t);return;}throw new ExecutionException(message, channel, getClass() error when process received event ., t);}}Overridepublic void caught(Channel channel, Throwable exception) throws RemotingException {ExecutorService executor getSharedExecutorService();try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));} catch (Throwable t) {throw new ExecutionException(caught event, channel, getClass() error when process caught event ., t);}}
}
由于AllChannelHandler方法是在前面handler的基础上包装了一层所以ChannelEventRunnable中会将消息传递给AllHandlel的下一个handler从这里也清晰的看到了AllChannelHandler完成了IO线程转向用户线程的任务转移。
public void run() {if (state ChannelState.RECEIVED) {try {handler.received(channel, message);} catch (Exception e) {logger.warn(ChannelEventRunnable handle state operation error, channel is channel , message is message, e);}} else {switch (state) {case CONNECTED:try {handler.connected(channel);} catch (Exception e) {logger.warn(ChannelEventRunnable handle state operation error, channel is channel, e);}break;case DISCONNECTED:try {handler.disconnected(channel);} catch (Exception e) {logger.warn(ChannelEventRunnable handle state operation error, channel is channel, e);}break;case SENT:try {handler.sent(channel, message);} catch (Exception e) {logger.warn(ChannelEventRunnable handle state operation error, channel is channel , message is message, e);}break;case CAUGHT:try {handler.caught(channel, exception);} catch (Exception e) {logger.warn(ChannelEventRunnable handle state operation error, channel is channel , message is: message , exception is exception, e);}break;default:logger.warn(unknown state: state , message is message);}}}
三、AllDispatcher策略异常超时问题
Dubbo有一个经典问题就是当配置了消息派发策略为AllDispatcher时当服务端线程池满了之后当消费端再次发送请求就会一直傻傻等待超时导致没有任何服务端响应。那么问题就出现在AllChannelHandler前面已经说了AllDispatcher策略就是所有消息都派发到线程池包括请求响应连接事件断开事件心跳等。即worker 线程接收到事件后将该事件提交到业务线程池中自己再去处理其他IO 事件。
问题出现原因
那么当服务端线程池打满之后此时又再次来了一个请求此时依然会提交给线程池执行那么了解线程池原理的就清楚线程池任务满了之后会执行拒绝策略抛出RejectedExecutionException异常此时就会进入到received的catch方法中去然后就又再次抛出ExecutionException异常。 public void received(Channel channel, Object message) throws RemotingException {ExecutorService executor getPreferredExecutorService(message);try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {throw new ExecutionException(message, channel, getClass() error when process received event ., t);}
}
那么抛出的异常就又会被netty捕获进而继续执行nettyHandler的caught方法可以看到这里又再次将任务丢到了线程池中。但是此时线程池依然是满的业务线程池所有线程都堵住了所以也不能将异常消息返回给客户端然后客户端消费者只能傻傻等到超时。
public void caught(Channel channel, Throwable exception) throws RemotingException {ExecutorService executor getSharedExecutorService();try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));} catch (Throwable t) {throw new ExecutionException(caught event, channel, getClass() error when process caught event ., t);}
}
解决办法可以设置dispatcher为message只有请求和响应交给业务线程池处理其他的在IO线程处理配置如下 dubbo:protocol namedubbo dispatchermessage / 后面dubbo也修复了这个问题再received方法的catch中新加了一部分逻辑注释的大致意思也就是说修复当线程池满了之后异常信息无法被发送给消费端的问题当线程池满了拒绝执行任务会引起消费端等待超时所以代码中判断了下当抛出异常为RejectedExecutionException时就不把异常抛出交给AllChannelHandler#caught方法中的线程池执行而是直接用IO线程在通过channel将消息及时反馈给消费者消费者也就会收到服务端的“threadpool is exhausted ,detail msg”等响应消息。
public void received(Channel channel, Object message) throws RemotingException {ExecutorService cexecutor getExecutorService();try {cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {//TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time outif(message instanceof Request t instanceof RejectedExecutionException){Request request (Request)message;if(request.isTwoWay()){String msg Server side( url.getIp() , url.getPort() ) threadpool is exhausted ,detail msg: t.getMessage();Response response new Response(request.getId(), request.getVersion());response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);response.setErrorMessage(msg);channel.send(response);return;}}throw new ExecutionException(message, channel, getClass() error when process received event ., t);}
}