Tars网络包处理过程分析

很早就想写一篇关于Tars网络包处理的文章,但是又不知道从何写起,因为心里想着无非就是基于Linux Epoll的那一套东西,加上一些线程间的数据交互。

直到遇见了BUG!

这不是Tars框架本身的BUG,只是使用者在没有吃透Tars包处理运作机制情况下发生的BUG。

本文会围绕这个BUG从头到尾来谈谈Tars对网络包处理。

(文章不会贴出整段代码,会以代码片段+说明的形式展开,源码:https://github.com/Tencent/Tars,版本:5237058b91c5350be881205249f196970c503614,Date: Tue Jun 19 20:56:02 2018 +0800)

关于这一切的开始

某天正在和客户端同学调试Tars框架下WebSocket(后文称WS)通信,发现一个奇怪的现象,服务器会偶尔收不到完整的WS包,尤其是客户端在连续发送几k大小包的情况下

我们都知道WS协议自带分包的机制,第一时间怀疑找的轮子有问题,故在解析WS协议那里排查BUG了好久,无果;接着利用tcpdump抓包发现网卡确实是收到了完整的分片包,排除客户端发包以及网络通信问题;最后补全了Debug日志努力重现BUG,发现一分为二的WS包居然被Tars塞到了二个不同的处理线程…….当时心里咯噔一下,Tars还有这种操作!

 

分析

现在我已经可以确定是Tars框架导致了这种结果,那么究竟里面有什么魔术?先分析一下Tars网络相关代码。

网络线程与Servant处理线程建立联系:

首先区分两个概念:

  • Tars网络收发数据包是独立的线程(配置文件里的<server>netthread,默认没填写,默认值1);
  • Tars自实现业务是另外的线程,称为Servant线程(配置文件里<server><xxx.xxx.xxxAdapter>threads);

Tars的自实现业务都是基于Servant,申请Servant的配置处可以填写[线程数]。

先从进程启动开始分析:

//Tars/cpp/servant/libservant/Application.cpp
void Application::main(int argc, char *argv[])
{
    //...

    //初始化Server部分
    initializeServer();

    vector<TC_EpollServer::BindAdapterPtr> adapters;

    //绑定对象和端口
    bindAdapter(adapters);

    //...

    //设置HandleGroup分组
    //把对象和_epollServer相互绑定
    setHandle(adapters[i]);

    //启动业务处理线程 
    _epollServer->startHandle();

    //...
}

initializeServer()中会对网络通信线程进行配置初始化,核心是创建TC_EpollServer _epollServer。

//Tars/cpp/servant/libservant/Application.cpp
void Application::initializeServer()
{
    //...

    //读取网络线程数,一般都是默认1
    string sNetThread = _conf.get("/tars/application/server<netthread>", "1");
    unsigned int iNetThreadNum = TC_Common::strto<unsigned int>(sNetThread);

    //...

    //初始化TC_EpollServer _epollServer 
    _epollServer = new TC_EpollServer(iNetThreadNum);

    //...
}

在bindAdapter()中会对Servant进行配置初始化:

//Tars/cpp/servant/libservant/Application.cpp
void Application::bindAdapter(vector<TC_EpollServer::BindAdapterPtr>& adapters)
{
    //...

    //读取配置里设置的Servant处理线程数
    bindAdapter->setHandleNum(TC_Common::strto<int>(_conf.get(sLastPath + "<threads>", "0")));

    bindAdapter->setBackPacketBuffLimit(iBackPacketBuffLimit);

    //_epollServer绑定Servant的对象
    _epollServer->bind(bindAdapter);

    //...
}

最终,Servant对象处理线程会在_epollServer->startHandle()中启动Servant线程。

经过这一系列处理,将网络收发包线程与Servant业务对象线程关系建立起来。

网络收包:
收包的过程这里不贴代码了,流程是:

NetThread收到数据包 -> 经过Connection(这里有防过载的处理)-> 转交到Adapter手上 -> 保存至recv_queue。

对于收包几个点需要清晰理解的:

  1. 默认只有1个NetThread来处理网络数据包;
  2. 一个Servant业务对应一个Adapter,NetThread管理着连接列表ConnectionList,列表里的Connection保存有Adapter的指针;
  3. 一个Adapter只有一个recv_queue,它是一个线程安全的双端队列(实现机制就是粗暴了加锁);

Servant处理线程:

处理线程的核心循环在Tars/cpp/servant/libservant/ServantHandle.cpp这个文件中实现:

//Tars/cpp/servant/libservant/ServantHandle.cpp
void ServantHandle::run()
{
    initialize();

    if(!ServerConfig::OpenCoroutine)
    {
        //线程核心处理逻辑
        handleImp();
    }
    else
    //协程逻辑
    //...
}

进入handleImp(),注意这个函数实现位置:

//Tars/cpp/util/src/tc_epoll_server.cpp
void TC_EpollServer::Handle::handleImp()
{
    //Servant线程循环开始,每个Handle就是一个线程
    //...

    //业务自己逻辑处理,在每次循环时调用
    handleCustomMessage(true);

    //...
    //这里_handleGroup对应的是Handle::_handleGroup
    map<string, BindAdapterPtr>& adapters = _handleGroup->adapters;

    for (auto& kv : adapters)
    {
        BindAdapterPtr &adapter = kv.second;

        try
        {
            //注意这里多个Servant线程会"抢占式"的从recv_queue里取数据
            //也是这个机制导致了文章开头引出的问题
            while (adapter->waitForRecvQueue(recv, 0))
            {
                //...
                else
                {
                    //进入包处理,会区分Tars还是noTars协议,找到对应的dispatch入口
                    //实现在Tars/cpp/servant/libservant/ServantHandle.cpp
                    handle(stRecvData);
                }
                //业务自己逻辑处理,在每次收到包时调用
                handleCustomMessage(false);
                
                //...
            }
    //...
}

整个流程目前已经清晰:

epoll处理收到数据包 -> 保存至Adapter的recv_queue <- Servant业务线程”抢占式”从recv_queue里面读取数据包处理

 

解决

首先抛出一个疑问:为什么原始的TCP数据包没有问题?而基于TCP的WS数据包就有问题?

我们知道TCP流式传输,其协议本身会分片、粘包,对于这些Tars也比较容易在应用层进行处理,上述二种情况处理可以参考这个Demo里parseProtocol的相关实现。

虽然我们自定义的协议包(有明确的包头,知道包体的长度),也会出现两个数据包同时被两个线程处理的情况。

但是这种情况不会出错,因为两个数据包是独立完整的,各自执行对应的逻辑

WS协议若分片,这是在应用层的分片,部分WS客户端会根据包大小“智能”的主动分片。

这种情况也可以放入parseProtocol来解决,但是考虑到每个包都是Parse WS包头,有些耗性能,

所以最终解决方案是在Servant::doRequest后面存了一层WS的状态(又造了半个轮子),遇见分包先缓存(加锁处理),记录缺少包长,可以省几次Parse。

 

总结

问题解决算是告一段落,但是感觉不是那么优雅,期间也想过其他的方案:

  1. 让客户端同学禁用WS分片,但是客户端同学表示这是引擎库封装好的,无能为力;
  2. 将Servant改成单线程处理,避免不了自行维护WS的状态,但是可以不用锁;
  3. 修改Tars架构,让同一连接(fd)始终保持同一Servant线程处理,这样可以避免加锁;

最后想展开谈一下第3个方案,如果真实现了,那么对于开启Servant多线程的业务来说可能是一件好事:

固定连接服务线程固定化,方面排查BUG,同时在一些业务场景会减少锁竞争

进一步想了想实现方案雏形:

NetThread收到数据包 -> 根据fd进行分配至BindAdapter(此时BindAdapter的数量应该和Servant线程数量持平)-> 每个Servant从各recv_queue取数据

再评估这个方案,基本要把Tars底层改30%(数据结构,各种指针相互引用,逻辑),无法评估是否影响性能。而且似乎这种人为的fd数据分配干预有点违背Tars设计哲学。

 

有时候随便看一看别人代码,一些边边角角的设计意图很有可能被忽略掉,但是一旦因为理解误差出现BUG或者踩了坑,这种强化记忆还是酸爽:D。

(全文结束)


转载文章请注明出处:漫漫路 - lanindex.com

Leave a Comment

Your email address will not be published.