四时宝库

程序员的知识宝库

面试突击系列:Dubbo 的核心源码和原理剖析

本 Chat 中,我们将基于 Dubbo 的 2.7.8 版本来讲解,主要是从面试官提问的角度,然后通过几个例子层层递进剖析源码,让大家能够一步步在面试的氛围中深入了解 Dubbo 核心原理。话不多说了,我们进入第一个面试提问。

Dubbo 整体框架原理

面试官:Dubbo 是用来做什么的?内部的大概原理能讲一下吗?

首先,Dubbo 就是一个 RPC 框架,用来使开发者实现远程调用如同本地调用一样方便的框架,只需要简单配置和引入提供者的接口,就可以直接通过接口调用到远程服务提供的实现。 内部的大概原理就是提供者提供接口,并且将接口暴露成为一个可供访问的服务,然后把提供者服务的 IP 和端口注册到注册中心,消费者在调用本地接口的时候,Dubbo 框架就会先根据接口类路径从注册中心拉取对应的 IP 和端口,然后连接到这个提供者执行业务逻辑,获取到结果后返回到消费者本地接口的方法中。 听起来有点绕,直接看下图:

现在先有个大概的印象,下面会更详细介绍整个过程,用一步一图的方式解析。

Dubbo 的提供者核心源码和原理

面试官:那服务提供者是如何将自己的服务暴露出去的,然后消费者为什么能调用?

在回答这个问题之前,我们先从一个源码中的小例子入手,然后层层深入跟进源码:

private static void startWithExport() throws InterruptedException {
    // 服务提供者的配置
    ServiceConfig<DemoServiceImpl> service = new ServiceConfig<>();
    service.setInterface(DemoService.class);
    service.setRef(new DemoServiceImpl());
    service.setApplication(new ApplicationConfig("dubbo-demo-api-provider"));
    service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
    // 最重要的入口,将服务暴露出去
    service.export();

    System.out.println("dubbo service started");
    // 卡住主进程
    new CountDownLatch(1).await();
}

这里的关键就是 service.export() 这行代码,跟进去后会发现,Dubbo 会将各种需要传递的数据比如方法入参、上下文参数等都写入到一个 URL 中,并且根据不同协议执行服务导出,这里我们全部按照默认 Zookeeper 注册中心的实现来跟进,因此可以看到下面这一段关键代码:

// 生成动态代理对象
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 真正导出服务
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);

这里的 PROTOCOL 会经过 ProtocolListenerWrapper、ProtocolFilterWrapper,然后才会到 RegistryProtocol,而在 RegistryProtocol 中,我们发现在这里是通过 doLocalExport() 再一次经历 ProtocolListenerWrapper、ProtocolFilterWrapper 然后到 DubboProtocol,而在这个过程中,ProtocolFilterWrapper 会执行所有已注册的 Filter。 然后 RegistryProtocol 还会获取注册中心的地址,并将本提供者注册到了注册中心,而注册中心的注册其实就是调用了 ZookeeperRegistry 的 doRegister() 直接将提供者的 URL 作为临时节点写入到了 Zookeeper 的 providers 目录下面。 顺便说一句,想了解更多的关于 Zookeeper 的相关源码和原理知识,欢迎订阅我的另一篇 Chat:面试突击系列:Zookeeper 的核心源码和原理剖析。

try {
    // 将服务注册到zk
    zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
    throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}

注册完成之后,RegistryProtocol 的使命就暂时结束了。到这里我们画个图表示一下:

接下来我们重点看一下 DubboProtocol 做了什么。 首先 DubboProtocol 通过调用 Exchangers#bind() 创建了一个 ExchangeServer,而在 bind() 中经过 HeaderExchanger、Transporters、Transporter$Adaptive 的调用,最终来到了 NettyTransporter,在这里创建了一个 NettyServer。

@Override
public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
    // 初始化netty服务端
    return new NettyServer(url, handler);
}

看到这里,我们就非常清楚地了解到了,Dubbo 的服务提供者其实就是使用 Netty 框架创建了一个 Server,由此我们可以猜测到,Dubbo 的服务消费者应该是作为 Netty 的 Client 连接到 Server 这里进行通信的。 所以 Dubbo 提供者是如何暴露服务的呢,其实就是干了两件事:一个是将提供者的信息注册到注册中心,一个是启动 NettyServer 作为服务端提供服务。至于 NettyServer 里面具体是如何初始化以及 Netty 参数优化,我们下面再讲。

Dubbo 的消费者核心源码和原理

面试官:服务消费者是如何仅仅通过一个接口类直接调用到提供者的,并且做到失败重试、负载均衡的?

同样的,我们从一个消费者的小例子入手,层层剖析源码:

private static void runWithRefer() {
    ReferenceConfig<DemoService> reference = new ReferenceConfig<>();
    reference.setApplication(new ApplicationConfig("dubbo-demo-api-consumer"));
    // 注册中心配置
    reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
    // 设置接口类
    reference.setInterface(DemoService.class);
    // 关键代码:消费者入口
    DemoService service = reference.get();
    String message = service.sayHello("dubbo");
    System.out.println(message);
}

这里我们看到消费者设置了注册中心配置和接口类,然后通过调用 reference.get() 就实现了服务的引入,接着就是直接调用接口的方法。因此我们从 reference.get() 入手分析。 从 ReferenceConfig 中可以看到,在创建代理类的时候,其实是使用了 REF_PROTOCOL.refer() 实现的:

// 只有一个注册中心的时候
if (urls.size() == 1) {
    // 重点代码,创建服务引用
    invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
}

而这个 REF_PROTOCOL 也是跟提供者一样,经过了 ProtocolListenerWrapper、ProtocolFilterWrapper 的包装,然后来到了 RegistryProtocol。在 RegistryProtocol 中,首先是获取了注册中心的地址和集群容错策略,然后将本消费者作为临时节点注册到了注册中心的 consumers,接着从注册中心将接口类的提供者信息拉取到本地缓存起来,并且监听该接口类的提供者列表的变更事件,最后将接口类包装成一个包含集群容错策略的 Invoker 并返回,Invoker 的默认实现就是 MockClusterInvoker。而 MockClusterInvoker 里面的 invoker 对象其实就是 FailoverClusterInvoker,然后同时这个 invoker 还持有了 interceptors 拦截器链条。估计到时候就是经过层层拦截器链条之后,再调用 FailoverClusterInvoker 实现失败重试功能。关键代码如下:

// 3、包装机器容错策略到invoker
Invoker<T> invoker = cluster.join(directory);
public class MockClusterWrapper implements Cluster {

    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new MockClusterInvoker<T>(directory,
                // 这里的cluster默认是FailoverCluster
                this.cluster.join(directory));
    }
}

这里需要关注一下 RegistryDirectory#subscribe(),这个方法是订阅注册中心的提供者列表,拉取到了提供者的 URL 之后,通过 toInvokers() 将 URL 转换为 Invoker 对象,也就是下面这一段关键代码:

if (enabled) {
    // 关键代码:这里调用Dubbo协议转换服务到invoker
    // 这里的protocol.refer(serviceType, url)的结果是ProtocolFilterWrapper
    invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
}

这里的 protocol.refer(serviceType, url) 就会经过 ProtocolFilterWrapper 的包装,ProtocolFilterWrapper 会执行注册的消费者 Filter,然后才到了 DubboProtocol。而 DubboProtocol 中的关键就是创建了 ExchangeClient,代码如下:

// 创建即时连接
else {
    client = Exchangers.connect(url, requestHandler);
}

而这个 Exchangers.connect() 我们就很熟悉了,跟上面的提供者差不多,在connect() 中经过 HeaderExchanger、Transporters、Transporter$Adaptive 的调用,最终来到了 NettyTransporter,在这里创建了一个 NettyClient,也就是说,Dubbo 的消费者其实就是作为一个 Netty 的客户端,连接到提供者的 Netty 服务端,然后进行数据传输。 而前面返回的 invoker 对象也在 ReferenceConfig 中创建为 InvokerInvocationHandler 对象,所以 InvokerInvocationHandler 就是具体的代理类了,当调用我们代码定义的接口类的时候,实际就是调用 InvokerInvocationHandler 的 invoke() 方法,代码如下:

// 6、创建服务代理,这里默认调用的是 JavassistProxyFactory 的 getProxy()方法
// invoker就是MockClusterInvoker
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));

public class JavassistProxyFactory extends AbstractProxyFactory {

    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }
}

所以这里我们梳理一下这个过程:

所以下面我们从 InvokerInvocationHandler 的 invoke() 方法入手,开始分析一次完整的 Dubbo 调用是如何串起来的。 InvokerInvocationHandler 组装一下 RPC 调用参数然后直接调用 MockClusterInvoker 的 invoke(),并进行异常的捕获处理。然后 MockClusterInvoker 根据参数是否有 Mock 模式,如果有则返回 Mock 结果,如果没有则继续往下调用 FailoverClusterInvoker。 FailoverClusterInvoker 就是根据重试次数循环,捕获到异常后根据负载均衡策略重新选择 invoker 发起调用。关键代码如下:

// 3、使用循环,失败重试
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
    // 重试时,进行重新选择,避免重试时invoker列表已经发生变化
    // 注意:如果列表发生了变化,那么invoked判断会失效,因为invoker实例已经改变
    if (i > 0) {
        // 3.1、如果当前实例已经被销毁,则抛出异常
        checkWhetherDestroyed();
        // 3.2、重新获取所有服务提供者
        copyInvokers = list(invocation);
        // check again
        // 3.3、重新检查一下
        checkInvokers(copyInvokers, invocation);
    }
    // 3.4、根据负载均衡策略选择invoker
    Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
    invoked.add(invoker);
    RpcContext.getContext().setInvokers((List) invoked);
    try {
        // 3.5、具体发起远程调用
        // 这里拿到的invoker对象是RegistryDirectory$InvokerDelegate
        Result result = invoker.invoke(invocation);
        ...
        return result;
    } catch (RpcException e) {
        if (e.isBiz()) { // biz exception.
            throw e;
        }
        le = e;
    } catch (Throwable e) {
        le = new RpcException(e.getMessage(), e);
    } finally {
        providers.add(invoker.getUrl().getAddress());
    }
}

而负载均衡策略的则是在 select() 方法中实现,默认是使用 RandomLoadBalance 也就是随机选择策略,并且在重新选择的时候会先排除已经使用过的 invoker。 相信看到这里,大家就能回答前面的面试题了。 接下来 FailoverClusterInvoker 的 invoke() 会开始经过 Filter 的过滤链条处理,然后最终进入了 DubboInvoker 的 invoke() 方法,在这里会先从线程池获取一个线程,然后将前面初始化好的 Netty 客户端发起远程连接并且传输数据到服务提供者。

Netty 的参数优化

面试官:前面说到 Dubbo 是使用 Netty 作为通信框架,那么使用 Netty 有什么好处?

这个面试题其实就是在问 Netty 的使用,并且最好能将一些 Netty 的参数调优体现出来。 前面我们提到的,服务提供者的核心其实是初始化了 Netty 服务端并且将 IP 和端口上报给注册中心,然后服务消费者就从注册中心获取接口类对应的 IP 和端口,便可以通过 Netty 客户端连接到服务端,将数据传输到服务端进行处理,处理完成后接收返回的数据,转换为接口方法的返回类型,返回给该方法。 因此我们先看一下 Netty 服务端的初始化:

// 设置Netty的boss线程池和worker线程池
bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
workerGroup = NettyEventLoopFactory.eventLoopGroup(
        getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
        "NettyServerWorker");

首先这里是 reactor 线程模型的使用, 这里设置了2个线程池,bossGroup主要用于处理客户端的accept连接、断开连接等,workerGroup处理数据的传输,包括read、write事件和pipeline链条中的handler。 简单提一下,reactor 线程模型就是将线程池的职责分开,处理连接的属于低频操作且阻塞时间相对较长(因为需要进行 TCP 3次握手),处理数据的 IO 传输属于高频且和业务相关性紧密,使用互不干扰的线程池可以提高效率。 接下来看一下参数的设置:

// 设置netty的业务处理类
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();

bootstrap.group(bossGroup, workerGroup)
    .channel(NettyEventLoopFactory.serverSocketChannelClass())
    //一般来说,一个端口释放后会等待两分钟之后才能再被使用,SO_REUSEADDR是让端口释放后立即就可以被再次使用,用于断开后的重连。
    .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
    // 不启用开启Nagle算法,Nagle算法会收集网络小包再一次性发送,不启用则是即时发送,提高时效性
    .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
    // ByteBuf的分配器(重用缓冲区),也就是使用对象池重复利用内存块
    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)

其实除了上面那几个参数外,还有一些比较重要的参数在这块源码中没有体现,因此我从Seata(分布式事务框架)将相关的源码摘出来:

this.serverBootstrap
      .group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
      .channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ)
      // TCP 3次握手的队列缓冲区大小
  .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())
      //一般来说,一个端口释放后会等待两分钟之后才能再被使用,SO_REUSEADDR是让端口释放后立即就可以被再次使用。
  .option(ChannelOption.SO_REUSEADDR, true)
      // 连接保活
  .childOption(ChannelOption.SO_KEEPALIVE, true)
      // 不启用开启Nagle算法,Nagle算法会收集网络小包再一次性发送,不启用则是即时发送
  .childOption(ChannelOption.TCP_NODELAY, true)
      // 发送数据缓冲区大小
  .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())
      // 接收数据缓冲区大小
  .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())
      // 控制网络水位,将网络传输速度维持在比较平稳的状态
  .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
      new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(),
          nettyServerConfig.getWriteBufferHighWaterMark()))
  .localAddress(new InetSocketAddress(listenPort))

而这个参数 WRITEBUFFERWATER_MARK 是属于新版本才有的,原理大概就是设置一个高水位、一个低水位,当输出的数据速率高于高水位时,则暂停一下 write 事件,改为处理 read 事件,这样就可以控制网络传输的速度比较平稳,避免大流量打死网卡。 最后看一下处理的链条:

.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
        // 默认不启用SSL
        if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
            ch.pipeline().addLast("negotiation",
                    SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
        }
        ch.pipeline()
                // 解码器
                .addLast("decoder", adapter.getDecoder())
                // 编码器
                .addLast("encoder", adapter.getEncoder())
                // 心跳检查handler
                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                // 业务处理handler
                .addLast("handler", nettyServerHandler);
    }
});
// bind
// 绑定本地端口,并启动监听服务
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();

这里我们可以看到 pipeline() 里面加入了很多 handler,其实这里就是使用了责任链设计模式,接收到的网络请求,都会依次使用每个 handler 处理一下。 编解码器涉及到了如何处理网络的粘包、拆包的处理。 IdleStateHandler 心跳处理器,这个 handler 其实是 Netty 提供的,目的是维持 server 端和 client 端的长连接,如果没有设置这个 handler,就会经常出现 TCP 的超时时间到了,然后客户端直接断开和服务端的连接,这个笔者之前遇到过这个 bug,现象就是建立连接一段时间之后,没有新的网络传输的时候,莫名其妙地断开了连接。

下面我们再看一下 Netty 客户端的初始化:

protected void doOpen() throws Throwable {
    // 1、创建业务handler
    final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
    // 2、创建启动器并配置
    bootstrap = new Bootstrap();
    bootstrap.group(NIO_EVENT_LOOP_GROUP)
            // 连接保活
            .option(ChannelOption.SO_KEEPALIVE, true)
            // 不启用开启Nagle算法,Nagle算法会收集网络小包再一次性发送,不启用则是即时发送,提高时效性
            .option(ChannelOption.TCP_NODELAY, true)
            // ByteBuf的分配器(重用缓冲区),也就是使用对象池重复利用内存块
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
            .channel(socketChannelClass());

    // 设置连接超时
    bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout()));
    // 3、添加handler到连接的pipeline
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());

            if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslClientHandler(getUrl(), nettyClientHandler));
            }

            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
            ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                    // 解码器
                    .addLast("decoder", adapter.getDecoder())
                    // 编码器
                    .addLast("encoder", adapter.getEncoder())
                    // 心跳检查handler
                    .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
                    // 业务处理handler
                    .addLast("handler", nettyClientHandler);

            String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
            if(socksProxyHost != null) {
                int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
                Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
                ch.pipeline().addFirst(socks5ProxyHandler);
            }
        }
    });
}

客户端的参数与服务端的大同小异,需要注意的是客户端只需要一个线程池而已。

编解码器如何处理粘包和拆包

面试官:如何解决 TCP 网络传输中的拆包和粘包?

拆包是指在网络传输过程中,一份数据被拆分为多次传输,每次只传输了一部分。粘包是指在网络传输中,两份数据合并在一起传输过去了。Dubbo 的网络拆包和粘包的处理是通过在 Netty 的处理链条中添加的编解码器实现的。 Dubbo 的编码器是 DubboCodec 的父类 ExchangeCodec 实现的。我们看一下代码:

public class ExchangeCodec extends TelnetCodec {
    @Override
    public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
        // 1、对请求信息进行编码
        if (msg instanceof Request) {
            encodeRequest(channel, buffer, (Request) msg);
        }
        // 2、对响应信息进行编码
        else if (msg instanceof Response) {
            encodeResponse(channel, buffer, (Response) msg);
        }
        // 3、其他信息进行编码
        else {
            super.encode(channel, buffer, msg);
        }
    }

    protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
        // 获取序列化扩展实现
        Serialization serialization = getSerialization(channel);
        // 创建Dubbo协议扩展头字节数组,HEADER_LENGTH=16
        // header.
        byte[] header = new byte[HEADER_LENGTH];
        // 把魔数0xdabb写入协议头
        // set magic number.
        Bytes.short2bytes(MAGIC, header);

        // 设置请求类型与序列化类型,标记到协议头
        // set request and serialization flag.
        header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

        if (req.isTwoWay()) {
            header[2] |= FLAG_TWOWAY;
        }
        if (req.isEvent()) {
            header[2] |= FLAG_EVENT;
        }

        // 将请求id设置到协议头
        // set request id.
        Bytes.long2bytes(req.getId(), header, 4);

        // 使用serialization将对象数据部分进行编码,并把协议数据部分写入缓存
        // encode request data.
        int savedWriteIndex = buffer.writerIndex();
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
        ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
        if (req.isEvent()) {
            encodeEventData(channel, out, req.getData());
        } else {
            encodeRequestData(channel, out, req.getData(), req.getVersion());
        }
        // 刷新缓存
        out.flushBuffer();
        if (out instanceof Cleanable) {
            ((Cleanable) out).cleanup();
        }
        bos.flush();
        bos.close();
        // 检查payload(协议数据部分)是否合法
        int len = bos.writtenBytes();
        checkPayload(channel, len);
        Bytes.int2bytes(len, header, 12);

        // 将协议头写入缓存
        // write
        buffer.writerIndex(savedWriteIndex);
        buffer.writeBytes(header); // write header.
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
    }

    protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {
        int savedWriteIndex = buffer.writerIndex();
        try {
            //  获取序列化扩展实现
            Serialization serialization = getSerialization(channel);
            // 创建Dubbo协议扩展头字节数组, HEADER_LENGTH为 16
            // header.
            byte[] header = new byte[HEADER_LENGTH];
            // 把魔数写入协议头
            // set magic number.
            Bytes.short2bytes(MAGIC, header);
            // 设立请求类型与序列化类型,标记到协议头
            // set request and serialization flag.
            header[2] = serialization.getContentTypeId();
            if (res.isHeartbeat()) {
                header[2] |= FLAG_EVENT;
            }
            // 设置响应类型到第4字节位置
            // set response status.
            byte status = res.getStatus();
            header[3] = status;
            // 将请求ID设直到协议头
            // set request id.
            Bytes.long2bytes(res.getId(), header, 4);

            buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
            ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
            // 使用serialization对数据部分进行编码,并把协议数据部分写入缓存
            ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
            // encode response data or error message.
            if (status == Response.OK) {
                if (res.isHeartbeat()) {
                    encodeEventData(channel, out, res.getResult());
                } else {
                    encodeResponseData(channel, out, res.getResult(), res.getVersion());
                }
            } else {
                out.writeUTF(res.getErrorMessage());
            }
            // 刷新缓存
            out.flushBuffer();
            if (out instanceof Cleanable) {
                ((Cleanable) out).cleanup();
            }
            bos.flush();
            bos.close();

            int len = bos.writtenBytes();
            //  检查payload (协议数据部分)是否合法
            checkPayload(channel, len);
            Bytes.int2bytes(len, header, 12);

            // 将协议头写入缓存
            // write
            buffer.writerIndex(savedWriteIndex);
            buffer.writeBytes(header); // write header.
            buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
        } catch (Throwable t) {
            // clear buffer
            buffer.writerIndex(savedWriteIndex);
            ......
        }
    }
}

上面的代码其实可以看到 Dubbo 协议的组成,分为 2 大块:header 和 data 部分。header 总包含了 16 字节,前 2 字节为魔数,标记一个数据帧的开始,然后是1字节的请求类型和序列化标记 id,然后 1 字节是只在响应报文设置的结果码,然后 8 字节是请求 id,最后 4 字节是 body 内容的大小。 接下来看下解码的部分,同时可以从里面学习到对于网络拆包粘包的处理。InternalDecoder 就是解码的内部实现类。

private class InternalDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {

        ChannelBuffer message = new NettyBackedChannelBuffer(input);

        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);

        // decode object.
        do {
            // 先保存未读取之前的位置
            int saveReaderIndex = message.readerIndex();
            // 调用DubboCodec对数据进行解码
            Object msg = codec.decode(channel, message);
            // 如果遇到拆包,则重置message为之前的位置
            if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                message.readerIndex(saveReaderIndex);
                break;
            } else {
                //is it possible to go here ?
                if (saveReaderIndex == message.readerIndex()) {
                    throw new IOException("Decode without read data.");
                }
                // 否则,走到这里就是读取成功的
                if (msg != null) {
                    // 把解码成功的对象放入out列表
                    out.add(msg);
                }
            }
        } while (message.readable());
    }
}

这里我们可以看到 Dubbo 对于拆包的处理,其实就是判断一下是否遇到了 Codec2.DecodeResult.NEEDMOREINPUT,如果遇到了则放弃前面读取的部分,然后等待下次 read 通道里面的数据。至于粘包的处理,因为这里读取 header 的时候,都是按照固定从长度读取,并且读取 data 的时候,也是按照 header 里面指定的长度读取的,所以读到的结果一定是完整的,不会出现多余的字节,如果不完整就是走拆包的处理逻辑。 接下来 codec.decode() 就是开始解析协议的内容了:

@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    int readable = buffer.readableBytes();
    byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
    // 先读取协议头
    buffer.readBytes(header);
    //解析Dubbo协议数据部分
    return decode(channel, buffer, readable, header);
}

@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
    // 检查魔数,确认为Dubbo协议帧
    // check magic number.
    if (readable > 0 && header[0] != MAGIC_HIGH
            || readable > 1 && header[1] != MAGIC_LOW) {
        int length = header.length;
        if (header.length < readable) {
            header = Bytes.copyOf(header, readable);
            buffer.readBytes(header, length, readable - length);
        }
        for (int i = 1; i < header.length - 1; i++) {
            if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                buffer.readerIndex(buffer.readerIndex() - header.length + i);
                header = Bytes.copyOf(header, i);
                break;
            }
        }
        return super.decode(channel, buffer, readable, header);
    }
    // 检查是否读取了一个完整的Dubbo协议头
    // check length.
    if (readable < HEADER_LENGTH) {
        return DecodeResult.NEED_MORE_INPUT;
    }

    // 从协议头的最后四个字节读取协议数据部分的大小
    // get data length.
    int len = Bytes.bytes2int(header, 12);
    checkPayload(channel, len);

    // 如采遇到半包问题,则直接返回
    int tt = len + HEADER_LENGTH;
    if (readable < tt) {
        return DecodeResult.NEED_MORE_INPUT;
    }

    // 解析协议数据部分
    // limit input stream.
    ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

    try {
        // 这里调用的是子类DubboCodec
        return decodeBody(channel, is, header);
    } finally {
        if (is.available() > 0) {
            try {
                if (logger.isWarnEnabled()) {
                    logger.warn("Skip input stream " + is.available());
                }
                StreamUtils.skipUnusedStream(is);
            } catch (IOException e) {
                logger.warn(e.getMessage(), e);
            }
        }
    }
}

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言
    友情链接