本 Chat 中,我们将基于 Dubbo 的 2.7.8 版本来讲解,主要是从面试官提问的角度,然后通过几个例子层层递进剖析源码,让大家能够一步步在面试的氛围中深入了解 Dubbo 核心原理。话不多说了,我们进入第一个面试提问。
Dubbo 整体框架原理
面试官:Dubbo 是用来做什么的?内部的大概原理能讲一下吗?
首先,Dubbo 就是一个 RPC 框架,用来使开发者实现远程调用如同本地调用一样方便的框架,只需要简单配置和引入提供者的接口,就可以直接通过接口调用到远程服务提供的实现。 内部的大概原理就是提供者提供接口,并且将接口暴露成为一个可供访问的服务,然后把提供者服务的 IP 和端口注册到注册中心,消费者在调用本地接口的时候,Dubbo 框架就会先根据接口类路径从注册中心拉取对应的 IP 和端口,然后连接到这个提供者执行业务逻辑,获取到结果后返回到消费者本地接口的方法中。 听起来有点绕,直接看下图:
现在先有个大概的印象,下面会更详细介绍整个过程,用一步一图的方式解析。
Dubbo 的提供者核心源码和原理
面试官:那服务提供者是如何将自己的服务暴露出去的,然后消费者为什么能调用?
在回答这个问题之前,我们先从一个源码中的小例子入手,然后层层深入跟进源码:
private static void startWithExport() throws InterruptedException {
// 服务提供者的配置
ServiceConfig<DemoServiceImpl> service = new ServiceConfig<>();
service.setInterface(DemoService.class);
service.setRef(new DemoServiceImpl());
service.setApplication(new ApplicationConfig("dubbo-demo-api-provider"));
service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
// 最重要的入口,将服务暴露出去
service.export();
System.out.println("dubbo service started");
// 卡住主进程
new CountDownLatch(1).await();
}
这里的关键就是 service.export() 这行代码,跟进去后会发现,Dubbo 会将各种需要传递的数据比如方法入参、上下文参数等都写入到一个 URL 中,并且根据不同协议执行服务导出,这里我们全部按照默认 Zookeeper 注册中心的实现来跟进,因此可以看到下面这一段关键代码:
// 生成动态代理对象
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 真正导出服务
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
这里的 PROTOCOL 会经过 ProtocolListenerWrapper、ProtocolFilterWrapper,然后才会到 RegistryProtocol,而在 RegistryProtocol 中,我们发现在这里是通过 doLocalExport() 再一次经历 ProtocolListenerWrapper、ProtocolFilterWrapper 然后到 DubboProtocol,而在这个过程中,ProtocolFilterWrapper 会执行所有已注册的 Filter。 然后 RegistryProtocol 还会获取注册中心的地址,并将本提供者注册到了注册中心,而注册中心的注册其实就是调用了 ZookeeperRegistry 的 doRegister() 直接将提供者的 URL 作为临时节点写入到了 Zookeeper 的 providers 目录下面。 顺便说一句,想了解更多的关于 Zookeeper 的相关源码和原理知识,欢迎订阅我的另一篇 Chat:面试突击系列:Zookeeper 的核心源码和原理剖析。
try {
// 将服务注册到zk
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
注册完成之后,RegistryProtocol 的使命就暂时结束了。到这里我们画个图表示一下:
接下来我们重点看一下 DubboProtocol 做了什么。 首先 DubboProtocol 通过调用 Exchangers#bind() 创建了一个 ExchangeServer,而在 bind() 中经过 HeaderExchanger、Transporters、Transporter$Adaptive 的调用,最终来到了 NettyTransporter,在这里创建了一个 NettyServer。
@Override
public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
// 初始化netty服务端
return new NettyServer(url, handler);
}
看到这里,我们就非常清楚地了解到了,Dubbo 的服务提供者其实就是使用 Netty 框架创建了一个 Server,由此我们可以猜测到,Dubbo 的服务消费者应该是作为 Netty 的 Client 连接到 Server 这里进行通信的。 所以 Dubbo 提供者是如何暴露服务的呢,其实就是干了两件事:一个是将提供者的信息注册到注册中心,一个是启动 NettyServer 作为服务端提供服务。至于 NettyServer 里面具体是如何初始化以及 Netty 参数优化,我们下面再讲。
Dubbo 的消费者核心源码和原理
面试官:服务消费者是如何仅仅通过一个接口类直接调用到提供者的,并且做到失败重试、负载均衡的?
同样的,我们从一个消费者的小例子入手,层层剖析源码:
private static void runWithRefer() {
ReferenceConfig<DemoService> reference = new ReferenceConfig<>();
reference.setApplication(new ApplicationConfig("dubbo-demo-api-consumer"));
// 注册中心配置
reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
// 设置接口类
reference.setInterface(DemoService.class);
// 关键代码:消费者入口
DemoService service = reference.get();
String message = service.sayHello("dubbo");
System.out.println(message);
}
这里我们看到消费者设置了注册中心配置和接口类,然后通过调用 reference.get() 就实现了服务的引入,接着就是直接调用接口的方法。因此我们从 reference.get() 入手分析。 从 ReferenceConfig 中可以看到,在创建代理类的时候,其实是使用了 REF_PROTOCOL.refer() 实现的:
// 只有一个注册中心的时候
if (urls.size() == 1) {
// 重点代码,创建服务引用
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
}
而这个 REF_PROTOCOL 也是跟提供者一样,经过了 ProtocolListenerWrapper、ProtocolFilterWrapper 的包装,然后来到了 RegistryProtocol。在 RegistryProtocol 中,首先是获取了注册中心的地址和集群容错策略,然后将本消费者作为临时节点注册到了注册中心的 consumers,接着从注册中心将接口类的提供者信息拉取到本地缓存起来,并且监听该接口类的提供者列表的变更事件,最后将接口类包装成一个包含集群容错策略的 Invoker 并返回,Invoker 的默认实现就是 MockClusterInvoker。而 MockClusterInvoker 里面的 invoker 对象其实就是 FailoverClusterInvoker,然后同时这个 invoker 还持有了 interceptors 拦截器链条。估计到时候就是经过层层拦截器链条之后,再调用 FailoverClusterInvoker 实现失败重试功能。关键代码如下:
// 3、包装机器容错策略到invoker
Invoker<T> invoker = cluster.join(directory);
public class MockClusterWrapper implements Cluster {
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new MockClusterInvoker<T>(directory,
// 这里的cluster默认是FailoverCluster
this.cluster.join(directory));
}
}
这里需要关注一下 RegistryDirectory#subscribe(),这个方法是订阅注册中心的提供者列表,拉取到了提供者的 URL 之后,通过 toInvokers() 将 URL 转换为 Invoker 对象,也就是下面这一段关键代码:
if (enabled) {
// 关键代码:这里调用Dubbo协议转换服务到invoker
// 这里的protocol.refer(serviceType, url)的结果是ProtocolFilterWrapper
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
}
这里的 protocol.refer(serviceType, url) 就会经过 ProtocolFilterWrapper 的包装,ProtocolFilterWrapper 会执行注册的消费者 Filter,然后才到了 DubboProtocol。而 DubboProtocol 中的关键就是创建了 ExchangeClient,代码如下:
// 创建即时连接
else {
client = Exchangers.connect(url, requestHandler);
}
而这个 Exchangers.connect() 我们就很熟悉了,跟上面的提供者差不多,在connect() 中经过 HeaderExchanger、Transporters、Transporter$Adaptive 的调用,最终来到了 NettyTransporter,在这里创建了一个 NettyClient,也就是说,Dubbo 的消费者其实就是作为一个 Netty 的客户端,连接到提供者的 Netty 服务端,然后进行数据传输。 而前面返回的 invoker 对象也在 ReferenceConfig 中创建为 InvokerInvocationHandler 对象,所以 InvokerInvocationHandler 就是具体的代理类了,当调用我们代码定义的接口类的时候,实际就是调用 InvokerInvocationHandler 的 invoke() 方法,代码如下:
// 6、创建服务代理,这里默认调用的是 JavassistProxyFactory 的 getProxy()方法
// invoker就是MockClusterInvoker
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
}
所以这里我们梳理一下这个过程:
所以下面我们从 InvokerInvocationHandler 的 invoke() 方法入手,开始分析一次完整的 Dubbo 调用是如何串起来的。 InvokerInvocationHandler 组装一下 RPC 调用参数然后直接调用 MockClusterInvoker 的 invoke(),并进行异常的捕获处理。然后 MockClusterInvoker 根据参数是否有 Mock 模式,如果有则返回 Mock 结果,如果没有则继续往下调用 FailoverClusterInvoker。 FailoverClusterInvoker 就是根据重试次数循环,捕获到异常后根据负载均衡策略重新选择 invoker 发起调用。关键代码如下:
// 3、使用循环,失败重试
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
// 重试时,进行重新选择,避免重试时invoker列表已经发生变化
// 注意:如果列表发生了变化,那么invoked判断会失效,因为invoker实例已经改变
if (i > 0) {
// 3.1、如果当前实例已经被销毁,则抛出异常
checkWhetherDestroyed();
// 3.2、重新获取所有服务提供者
copyInvokers = list(invocation);
// check again
// 3.3、重新检查一下
checkInvokers(copyInvokers, invocation);
}
// 3.4、根据负载均衡策略选择invoker
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
// 3.5、具体发起远程调用
// 这里拿到的invoker对象是RegistryDirectory$InvokerDelegate
Result result = invoker.invoke(invocation);
...
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
而负载均衡策略的则是在 select() 方法中实现,默认是使用 RandomLoadBalance 也就是随机选择策略,并且在重新选择的时候会先排除已经使用过的 invoker。 相信看到这里,大家就能回答前面的面试题了。 接下来 FailoverClusterInvoker 的 invoke() 会开始经过 Filter 的过滤链条处理,然后最终进入了 DubboInvoker 的 invoke() 方法,在这里会先从线程池获取一个线程,然后将前面初始化好的 Netty 客户端发起远程连接并且传输数据到服务提供者。
Netty 的参数优化
面试官:前面说到 Dubbo 是使用 Netty 作为通信框架,那么使用 Netty 有什么好处?
这个面试题其实就是在问 Netty 的使用,并且最好能将一些 Netty 的参数调优体现出来。 前面我们提到的,服务提供者的核心其实是初始化了 Netty 服务端并且将 IP 和端口上报给注册中心,然后服务消费者就从注册中心获取接口类对应的 IP 和端口,便可以通过 Netty 客户端连接到服务端,将数据传输到服务端进行处理,处理完成后接收返回的数据,转换为接口方法的返回类型,返回给该方法。 因此我们先看一下 Netty 服务端的初始化:
// 设置Netty的boss线程池和worker线程池
bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
workerGroup = NettyEventLoopFactory.eventLoopGroup(
getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
"NettyServerWorker");
首先这里是 reactor 线程模型的使用, 这里设置了2个线程池,bossGroup主要用于处理客户端的accept连接、断开连接等,workerGroup处理数据的传输,包括read、write事件和pipeline链条中的handler。 简单提一下,reactor 线程模型就是将线程池的职责分开,处理连接的属于低频操作且阻塞时间相对较长(因为需要进行 TCP 3次握手),处理数据的 IO 传输属于高频且和业务相关性紧密,使用互不干扰的线程池可以提高效率。 接下来看一下参数的设置:
// 设置netty的业务处理类
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NettyEventLoopFactory.serverSocketChannelClass())
//一般来说,一个端口释放后会等待两分钟之后才能再被使用,SO_REUSEADDR是让端口释放后立即就可以被再次使用,用于断开后的重连。
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
// 不启用开启Nagle算法,Nagle算法会收集网络小包再一次性发送,不启用则是即时发送,提高时效性
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
// ByteBuf的分配器(重用缓冲区),也就是使用对象池重复利用内存块
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
其实除了上面那几个参数外,还有一些比较重要的参数在这块源码中没有体现,因此我从Seata(分布式事务框架)将相关的源码摘出来:
this.serverBootstrap
.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
.channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ)
// TCP 3次握手的队列缓冲区大小
.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())
//一般来说,一个端口释放后会等待两分钟之后才能再被使用,SO_REUSEADDR是让端口释放后立即就可以被再次使用。
.option(ChannelOption.SO_REUSEADDR, true)
// 连接保活
.childOption(ChannelOption.SO_KEEPALIVE, true)
// 不启用开启Nagle算法,Nagle算法会收集网络小包再一次性发送,不启用则是即时发送
.childOption(ChannelOption.TCP_NODELAY, true)
// 发送数据缓冲区大小
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())
// 接收数据缓冲区大小
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())
// 控制网络水位,将网络传输速度维持在比较平稳的状态
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(),
nettyServerConfig.getWriteBufferHighWaterMark()))
.localAddress(new InetSocketAddress(listenPort))
而这个参数 WRITEBUFFERWATER_MARK 是属于新版本才有的,原理大概就是设置一个高水位、一个低水位,当输出的数据速率高于高水位时,则暂停一下 write 事件,改为处理 read 事件,这样就可以控制网络传输的速度比较平稳,避免大流量打死网卡。 最后看一下处理的链条:
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
// 默认不启用SSL
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
ch.pipeline().addLast("negotiation",
SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
}
ch.pipeline()
// 解码器
.addLast("decoder", adapter.getDecoder())
// 编码器
.addLast("encoder", adapter.getEncoder())
// 心跳检查handler
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
// 业务处理handler
.addLast("handler", nettyServerHandler);
}
});
// bind
// 绑定本地端口,并启动监听服务
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
这里我们可以看到 pipeline() 里面加入了很多 handler,其实这里就是使用了责任链设计模式,接收到的网络请求,都会依次使用每个 handler 处理一下。 编解码器涉及到了如何处理网络的粘包、拆包的处理。 IdleStateHandler 心跳处理器,这个 handler 其实是 Netty 提供的,目的是维持 server 端和 client 端的长连接,如果没有设置这个 handler,就会经常出现 TCP 的超时时间到了,然后客户端直接断开和服务端的连接,这个笔者之前遇到过这个 bug,现象就是建立连接一段时间之后,没有新的网络传输的时候,莫名其妙地断开了连接。
下面我们再看一下 Netty 客户端的初始化:
protected void doOpen() throws Throwable {
// 1、创建业务handler
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
// 2、创建启动器并配置
bootstrap = new Bootstrap();
bootstrap.group(NIO_EVENT_LOOP_GROUP)
// 连接保活
.option(ChannelOption.SO_KEEPALIVE, true)
// 不启用开启Nagle算法,Nagle算法会收集网络小包再一次性发送,不启用则是即时发送,提高时效性
.option(ChannelOption.TCP_NODELAY, true)
// ByteBuf的分配器(重用缓冲区),也就是使用对象池重复利用内存块
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
.channel(socketChannelClass());
// 设置连接超时
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout()));
// 3、添加handler到连接的pipeline
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslClientHandler(getUrl(), nettyClientHandler));
}
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
// 解码器
.addLast("decoder", adapter.getDecoder())
// 编码器
.addLast("encoder", adapter.getEncoder())
// 心跳检查handler
.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
// 业务处理handler
.addLast("handler", nettyClientHandler);
String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
if(socksProxyHost != null) {
int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
ch.pipeline().addFirst(socks5ProxyHandler);
}
}
});
}
客户端的参数与服务端的大同小异,需要注意的是客户端只需要一个线程池而已。
编解码器如何处理粘包和拆包
面试官:如何解决 TCP 网络传输中的拆包和粘包?
拆包是指在网络传输过程中,一份数据被拆分为多次传输,每次只传输了一部分。粘包是指在网络传输中,两份数据合并在一起传输过去了。Dubbo 的网络拆包和粘包的处理是通过在 Netty 的处理链条中添加的编解码器实现的。 Dubbo 的编码器是 DubboCodec 的父类 ExchangeCodec 实现的。我们看一下代码:
public class ExchangeCodec extends TelnetCodec {
@Override
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
// 1、对请求信息进行编码
if (msg instanceof Request) {
encodeRequest(channel, buffer, (Request) msg);
}
// 2、对响应信息进行编码
else if (msg instanceof Response) {
encodeResponse(channel, buffer, (Response) msg);
}
// 3、其他信息进行编码
else {
super.encode(channel, buffer, msg);
}
}
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
// 获取序列化扩展实现
Serialization serialization = getSerialization(channel);
// 创建Dubbo协议扩展头字节数组,HEADER_LENGTH=16
// header.
byte[] header = new byte[HEADER_LENGTH];
// 把魔数0xdabb写入协议头
// set magic number.
Bytes.short2bytes(MAGIC, header);
// 设置请求类型与序列化类型,标记到协议头
// set request and serialization flag.
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
if (req.isTwoWay()) {
header[2] |= FLAG_TWOWAY;
}
if (req.isEvent()) {
header[2] |= FLAG_EVENT;
}
// 将请求id设置到协议头
// set request id.
Bytes.long2bytes(req.getId(), header, 4);
// 使用serialization将对象数据部分进行编码,并把协议数据部分写入缓存
// encode request data.
int savedWriteIndex = buffer.writerIndex();
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {
encodeEventData(channel, out, req.getData());
} else {
encodeRequestData(channel, out, req.getData(), req.getVersion());
}
// 刷新缓存
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
bos.flush();
bos.close();
// 检查payload(协议数据部分)是否合法
int len = bos.writtenBytes();
checkPayload(channel, len);
Bytes.int2bytes(len, header, 12);
// 将协议头写入缓存
// write
buffer.writerIndex(savedWriteIndex);
buffer.writeBytes(header); // write header.
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}
protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {
int savedWriteIndex = buffer.writerIndex();
try {
// 获取序列化扩展实现
Serialization serialization = getSerialization(channel);
// 创建Dubbo协议扩展头字节数组, HEADER_LENGTH为 16
// header.
byte[] header = new byte[HEADER_LENGTH];
// 把魔数写入协议头
// set magic number.
Bytes.short2bytes(MAGIC, header);
// 设立请求类型与序列化类型,标记到协议头
// set request and serialization flag.
header[2] = serialization.getContentTypeId();
if (res.isHeartbeat()) {
header[2] |= FLAG_EVENT;
}
// 设置响应类型到第4字节位置
// set response status.
byte status = res.getStatus();
header[3] = status;
// 将请求ID设直到协议头
// set request id.
Bytes.long2bytes(res.getId(), header, 4);
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
// 使用serialization对数据部分进行编码,并把协议数据部分写入缓存
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
// encode response data or error message.
if (status == Response.OK) {
if (res.isHeartbeat()) {
encodeEventData(channel, out, res.getResult());
} else {
encodeResponseData(channel, out, res.getResult(), res.getVersion());
}
} else {
out.writeUTF(res.getErrorMessage());
}
// 刷新缓存
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
bos.flush();
bos.close();
int len = bos.writtenBytes();
// 检查payload (协议数据部分)是否合法
checkPayload(channel, len);
Bytes.int2bytes(len, header, 12);
// 将协议头写入缓存
// write
buffer.writerIndex(savedWriteIndex);
buffer.writeBytes(header); // write header.
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
} catch (Throwable t) {
// clear buffer
buffer.writerIndex(savedWriteIndex);
......
}
}
}
上面的代码其实可以看到 Dubbo 协议的组成,分为 2 大块:header 和 data 部分。header 总包含了 16 字节,前 2 字节为魔数,标记一个数据帧的开始,然后是1字节的请求类型和序列化标记 id,然后 1 字节是只在响应报文设置的结果码,然后 8 字节是请求 id,最后 4 字节是 body 内容的大小。 接下来看下解码的部分,同时可以从里面学习到对于网络拆包粘包的处理。InternalDecoder 就是解码的内部实现类。
private class InternalDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
ChannelBuffer message = new NettyBackedChannelBuffer(input);
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
// decode object.
do {
// 先保存未读取之前的位置
int saveReaderIndex = message.readerIndex();
// 调用DubboCodec对数据进行解码
Object msg = codec.decode(channel, message);
// 如果遇到拆包,则重置message为之前的位置
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
message.readerIndex(saveReaderIndex);
break;
} else {
//is it possible to go here ?
if (saveReaderIndex == message.readerIndex()) {
throw new IOException("Decode without read data.");
}
// 否则,走到这里就是读取成功的
if (msg != null) {
// 把解码成功的对象放入out列表
out.add(msg);
}
}
} while (message.readable());
}
}
这里我们可以看到 Dubbo 对于拆包的处理,其实就是判断一下是否遇到了 Codec2.DecodeResult.NEEDMOREINPUT,如果遇到了则放弃前面读取的部分,然后等待下次 read 通道里面的数据。至于粘包的处理,因为这里读取 header 的时候,都是按照固定从长度读取,并且读取 data 的时候,也是按照 header 里面指定的长度读取的,所以读到的结果一定是完整的,不会出现多余的字节,如果不完整就是走拆包的处理逻辑。 接下来 codec.decode() 就是开始解析协议的内容了:
@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int readable = buffer.readableBytes();
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
// 先读取协议头
buffer.readBytes(header);
//解析Dubbo协议数据部分
return decode(channel, buffer, readable, header);
}
@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// 检查魔数,确认为Dubbo协议帧
// check magic number.
if (readable > 0 && header[0] != MAGIC_HIGH
|| readable > 1 && header[1] != MAGIC_LOW) {
int length = header.length;
if (header.length < readable) {
header = Bytes.copyOf(header, readable);
buffer.readBytes(header, length, readable - length);
}
for (int i = 1; i < header.length - 1; i++) {
if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
buffer.readerIndex(buffer.readerIndex() - header.length + i);
header = Bytes.copyOf(header, i);
break;
}
}
return super.decode(channel, buffer, readable, header);
}
// 检查是否读取了一个完整的Dubbo协议头
// check length.
if (readable < HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
}
// 从协议头的最后四个字节读取协议数据部分的大小
// get data length.
int len = Bytes.bytes2int(header, 12);
checkPayload(channel, len);
// 如采遇到半包问题,则直接返回
int tt = len + HEADER_LENGTH;
if (readable < tt) {
return DecodeResult.NEED_MORE_INPUT;
}
// 解析协议数据部分
// limit input stream.
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
try {
// 这里调用的是子类DubboCodec
return decodeBody(channel, is, header);
} finally {
if (is.available() > 0) {
try {
if (logger.isWarnEnabled()) {
logger.warn("Skip input stream " + is.available());
}
StreamUtils.skipUnusedStream(is);
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}