brpc icon indicating copy to clipboard operation
brpc copied to clipboard

brpc支持传输层多协议改进建议

Open zchuango opened this issue 1 month ago • 17 comments

Is your feature request related to a problem? 业界涌现出多种新型互联技术和协议,如RDMA、CXL、Nvlink、UB(UnifiedBus)等,结合当前项目代码来看Socket类融合了rdma和tcp协议已显的过于臃肿,后期如果计划扩展支持更多的新协议和技术将会带来很大维护成本,为了保证brpc的核心竞争力持续性,brpc需要考虑支持互联技术和新的协议演进发展,针对于此情况,目前提出一种多通信协议易扩展和兼容的Socket-Transport通信架构,该架构向上承接socket的调用,向下串联传输层协议栈方法,目前有以下方案和思路,还望一起讨论交流下。

Describe the solution you'd like

Image
  • Socket为brpc的通信基础类,针对该类进行传输层公共方法下推到Transport层,比如DoConnect、DoRead、DoWrite、WaitEpoll等。
  • TransportWrapper为Transport代理包装类,根据协议类型代理调用不同协议栈的Transport子类方法。
  • Trasnport为协议栈接口基类,接口定义了传输层需要泛化的协议栈公共方法,向上承接Socket调用。
  • RDMA、TCP、CXL、UB分别是传输层的不同通信协议的实现子类部分,方法调用通过TransportWrapper完成具体方法的调用。

Describe alternatives you've considered 架构改造计划: 第一步,基于Socket类进行架构改造和升级,同时将rdma和tcp协议的传输层公共部分下推到对应的子类上。 第二步,在已改造好的Socket-Transport架构上,引导导入多种新通信协议,例如CXL、UB等。

Additional context/screenshots

zchuango avatar Dec 08 '25 08:12 zchuango

是否还得配套内存管理相关的 API 设计? 比如 RDMA 网络,用户可以 zcopy 发送 attachment,而不是把用户数据从用户 buffer copy 到 IOBuf 上再 zcopy 发送。 还有 RDMA 的单边操作,要怎么支持呢?

ivanallen avatar Dec 08 '25 10:12 ivanallen

是否还得配套内存管理相关的 API 设计? 比如 RDMA 网络,用户可以 zcopy 发送 attachment,而不是把用户数据从用户 buffer copy 到 IOBuf 上再 zcopy 发送。 还有 RDMA 的单边操作,要怎么支持呢?

1、应该不需要重新设计内存管理相关API,目前brpc rdma通信还没有做到zcopy发送attachment吧,该方案下推到transport层逻辑只做架构层面的改进,如果该方案被接纳后续可以在rdma transport子类上进行迭代升级zcopy发送到attachment的逻辑。 2、了解到RDMA单边操作只在rdma endpoint实现,上面改造方案应该不涉及,直接沿用原有的使用方式。

zchuango avatar Dec 08 '25 11:12 zchuango

目前brpc rdma通信还没有做到zcopy发送attachment

支持的,zero copy都是通过IOBuf来实现。https://github.com/apache/brpc/blob/master/docs/cn/rdma.md

chenBright avatar Dec 08 '25 12:12 chenBright

Socket深度耦合在了RPC的整个流程中,还有IO多路复用等,改动量不小。

还有 RDMA 的单边操作,要怎么支持呢?

是的,每个协议可能还有一些特殊的使用方式。上层应该怎么提供接口呢?

chenBright avatar Dec 08 '25 12:12 chenBright

是的,每个协议可能还有一些特殊的使用方式。上层应该怎么提供接口呢?

目前尝试实现一下rdma和tcp的子类transport,上层RPC流程模块依然调用的是原Socket类提供的函数,下层transport承接了类系统socket调用的方法部分,协议的特殊使用方式都可以下推到transport子类来实现,这样原Socket类改造成公共的Transport context角色了

zchuango avatar Dec 08 '25 13:12 zchuango

#207 提到的 EndPoint 多形式,#1560 应该实现了。

chenBright avatar Dec 09 '25 02:12 chenBright

目前brpc rdma通信还没有做到zcopy发送attachment

支持的,zero copy都是通过IOBuf来实现。https://github.com/apache/brpc/blob/master/docs/cn/rdma.md

这里说的还没有做到zcopy发送attachment应该是指:用户在使用rdma时也需要像tcp一样先将数据从用户业务使用的buffer中copy到brpc的iobuf吧,数据到了iobuf以后传输数据确实是zcopy的,不过这个copy想要去掉就只能是用户业务数据也使用iobuf操作

Shaozheng528 avatar Dec 09 '25 03:12 Shaozheng528

目前brpc rdma通信还没有做到zcopy发送attachment

支持的,zero copy都是通过IOBuf来实现。https://github.com/apache/brpc/blob/master/docs/cn/rdma.md

这里说的还没有做到zcopy发送attachment应该是指:用户在使用rdma时也需要像tcp一样先将数据从用户业务使用的buffer中copy到brpc的iobuf吧,数据到了iobuf以后传输数据确实是zcopy的,不过这个copy想要去掉就只能是用户业务数据也使用iobuf操作

可以使用append_user_data_with_meta,还是得用户适配。

chenBright avatar Dec 09 '25 03:12 chenBright

#207 提到的 EndPoint 多形式,#1560 应该实现了。

这个改进建议应该是想要对目前rdma的实现方式进行修改方便后续拓展其他传输层协议,与https://github.com/apache/brpc/issues/207 中网络层连接支持域名及更多形式是不同方面的改动?目前rdma的实现在socket类以及多个不同文件都有修改,这个改进建议的目的应该是把新增传输层协议的实现方式改为更集中拓展性更强的实现方式?

Shaozheng528 avatar Dec 09 '25 03:12 Shaozheng528

这个改进建议的目的应该是把新增传输层协议的实现方式改为更集中拓展性更强的实现方式?

@Shaozheng528 嗯,是的,主要为了方便扩展新的通信传输层协议。

#207 提到的 EndPoint 多形式,#1560 应该实现了。

@chenBright 当前设计Transport接口都是从Socket已有方法抽取出来,并且和底层通信传输层关联性较高的部分,其中tcp endpoint和rdma endpoint均有被这些方法调用访问,为了把不同的endpoint调用拆分到对应的Transport实现类中。这里可以分别衍生出RdmaTransport和TcpTransport承接不同通信endpoint的调用,这样就能降低Socket类扩展新的传输协议成本,只需实现Transport接口部分即可完成新的传输协议对接,以下是对socket类下推方法抽出来的公共接口定义和类图部分可以参考。

Transport接口定义:

class Transport{
public:
    virtual void BeforeRecycled(Socket* socket) = 0;
    virtual int WaitAndReset(Socket* socket, int32_t expected_nref) = 0;
    virtual int StartWrite(Socket* socket, WriteRequest* req, WriteOptions& opt) = 0;
    virtual void* KeepWrite(void* void_arg) = 0;
    virtual ssize_t DoWrite(Socket* socket, WriteRequest* req) = 0;
    virtual int OnInputEvent(void* user_data, uint32_t events, bthread_attr_t& thread_attr) = 0;
    virtual void DebugSocket(std::ostream& ostream, SocketId id) = 0;
}

Transport结构类图 Image

zchuango avatar Dec 09 '25 12:12 zchuango

#207 提到的 EndPoint 多形式,#1560 应该实现了。

我只是列一下跟Socket相关的EndPoint目前已经支持多协议了。

chenBright avatar Dec 13 '25 05:12 chenBright

class Transport{ public: virtual void BeforeRecycled(Socket* socket) = 0; virtual int WaitAndReset(Socket* socket, int32_t expected_nref) = 0; virtual int StartWrite(Socket* socket, WriteRequest* req, WriteOptions& opt) = 0; virtual void* KeepWrite(void* void_arg) = 0; virtual ssize_t DoWrite(Socket* socket, WriteRequest* req) = 0; virtual int OnInputEvent(void* user_data, uint32_t events, bthread_attr_t& thread_attr) = 0; virtual void DebugSocket(std::ostream& ostream, SocketId id) = 0; }

最上面的Transport层接口是DoConnect/DoRead/DoWrite,为什么到这里变成了WaitAndReset/StartWrite/KeepWrite/DoWrite/OnInputEvent呢?感觉前者更合理些。后者耦合了太多逻辑,比如bthread操作、多路复用、健康检查等。

还有这里TransportWrapper的作用是什么呢?

wwbmmm avatar Dec 13 '25 06:12 wwbmmm

#207 提到的 EndPoint 多形式,#1560 应该实现了。

我只是列一下跟Socket相关的EndPoint目前已经支持多协议了。

现在的EndPoint其实还是tcp的EndPoint,没有支持非tcp的协议

wwbmmm avatar Dec 13 '25 06:12 wwbmmm

最上面的Transport层接口是DoConnect/DoRead/DoWrite,为什么到这里变成了WaitAndReset/StartWrite/KeepWrite/DoWrite/OnInputEvent呢?感觉前者更合理些。后者耦合了太多逻辑,比如bthread操作、多路复用、健康检查等。

是的,当前方案只是将存在rdma宏定义的方法简单抽取出来,然后拆分到不同transport子类上,按照这个思路符合的方法只有WaitAndReset/StartWrite/KeepWrite/DoWrite/OnInputEvent这些,DoConnect/DoRead因为不需要对tcp和rdma调用做差异化处理,就没有下推。考虑到后续Transport支持更多通信协议的方式演进,目前也有些观点可以一起讨论下:

  1. Transport层作为多协议打通抽象,需要抽象出更加合理接口层,同时需要考虑兼顾多个协议的方向演进,这块希望后续社区贡献者以及设备厂商共同努力推进。
  2. Socket作为架构改造的切入点,后期可以作为上层应用的对接下层Transport的context角色来演进。

上面初步设计的Socket.h抽象接口还不完善,还需要跟社区一起详细讨论给出更优的Socket.h和transport层接口,一起推进共建多协议兼容的特性。 个人想法后续演进可以小步多次迭代方式来做,第一步,按照既定的socket方法下推方式,打通完善Socket-Transport调用流程,第二步,统一抽象Transport层更合理的公共接口,反补到上层调用。当然当前下推方式和方法, @wwbmmm @chenBright 大佬帮忙一起看看还有哪些方法更合适?

还有这里TransportWrapper的作用是什么呢?

TransportWrapper是Transport子类实例的包装类,这里会把不同协议Transport统一包装管理,包装类会协助公共方法与Transport子类方法绑定以及处理协议差异点逻辑的包装,上游socket只关注功能方法调用。以下是相关示例代码:

transport_wrapper.h

namespace brpc {
class Socket;
class TransportWrapper {
    friend class Socket;
public:

#if BRPC_WITH_RDMA
    std::shared_ptr<RdmaTransport> rdmaTransport = nullptr;
#endif

    explicit TransportWrapper(Socket* socket, const SocketOptions& options);

    static auto GetEdgeTriggeredHandler(Mode socket_mode) -> void (*)(Socket*) {
        if (socket_mode == TCP) {
            return InputMessenger::OnNewMessages;
        }
#if BRPC_WITH_RDMA
        else if (socket_mode == RDMA) {
            return  brpc::rdma::RdmaEndpoint::OnNewDataFromTcp;
        }
#endif
        else{
            LOG(FATAL) << "Unknown socket mode";
            return nullptr;
        }
    }

    static std::shared_ptr<AppConnect> GetAppConnect(Mode socket_mode){
        if (socket_mode == TCP) {
            return nullptr;
        }
#if BRPC_WITH_RDMA
        else if (socket_mode == RDMA) {
            return  std::make_shared<rdma::RdmaConnect>();
        }
#endif
        else{
            LOG(FATAL) << "Unknown socket mode";
            return nullptr;
        }
    }

    ......
private:
    std::function<void(Socket *)> BeforeRecycled;
    std::function<int(Socket *, int32_t)> WaitAndReset;
    std::function<int(Socket *, Socket::WriteRequest *, const Socket::WriteOptions &)> StartWrite;
    std::function<void*(void *)> KeepWrite;
    std::function<ssize_t(Socket *, Socket::WriteRequest *)> DoWrite;
    std::function<int(void *, uint32_t, const bthread_attr_t &)> OnInputEvent;
    std::function<void(std::ostream &, SocketId)> DebugSocket;
};

}

transport_wrapper.cpp

namespace brpc {
    TransportWrapper::TransportWrapper(Socket* socket, const SocketOptions& options) {
        if (options.socket_mode == TCP) {
            // 使用共享指针创建对象
            auto transport = std::make_shared<TcpTransport>();

            DoWrite = [transport](Socket* socket, Socket::WriteRequest* req) {
                return transport->DoWrite(socket, req);
            };

	    ......
        }
#if BRPC_WITH_RDMA
        else if (options.socket_mode == RDMA) {
            auto transport = std::make_shared<RdmaTransport>(socket, options);

            DoWrite = [transport](Socket* socket, Socket::WriteRequest* req) {
                return transport->DoWrite(socket, req);
            };

            ......

            rdmaTransport = transport;
        }
#endif
        else {
            LOG(ERROR) << "socket_mode set error";
        }
    }
}

zchuango avatar Dec 16 '25 07:12 zchuango

是的,当前方案只是将存在rdma宏定义的方法简单抽取出来,然后拆分到不同transport子类上

这种方法看似简单,但实际实现可能会遇到问题,这些方法里面会访问Socket的私有成员,这就意味着具体Transport的实现里面,还要访问Socket的私有成员。这样并没有从根本上解决Socket和Transport解耦的问题,而且会引入新的问题——Socket里和Transport无关的一些代码被复制到多个Transport的实现代码里,增加了维护成本。

因此,我不建议这种改法,我觉得可以这样:

  1. 将Socket里面,和特定transport相关的私有成员抽取出来到各自的transport中,如
class Socket {
private:
  Transport* _transport;
};

class TcpTransport {
private:
  int _fd;
  // ...其他tcp独有的成员
};

class RdmaTransport {
private:
  rdma::RdmaEndpoint* _rdma_ep;
  RdmaState _rdma_state;
  TcpTransport* _tcp_transport; // RDMA状态关闭时, fallback到tcp
  // ...其他rdma独有的成员
};
  1. 把Socket里面,#if BRPC_WITH_RDMA 相关的代码抽取到transport中,如

StartWrite里面

    if (_conn) {
        butil::IOBuf* data_arr[1] = { &req->data };
        nw = _conn->CutMessageIntoFileDescriptor(fd(), data_arr, 1);
    } else {
#if BRPC_WITH_RDMA
        if (_rdma_ep && _rdma_state != RDMA_OFF) {
            butil::IOBuf* data_arr[1] = { &req->data };
            nw = _rdma_ep->CutFromIOBufList(data_arr, 1);
        } else {
#else
        {
#endif
            nw = req->data.cut_into_file_descriptor(fd());
        }
    }

这段代码可以改成

    if (_conn) {
        butil::IOBuf* data_arr[1] = { &req->data };
        nw = _conn->CutMessageIntoFileDescriptor(fd(), data_arr, 1);
    } else {
        nw = _transport->CutFromIOBuf(&req->data);
    }

然后各自的transport里面

int TcpTransport::CutFromIOBuf(butil::IOBuf* buf) {
    return buf->cut_into_file_descriptor(_fd);
}

int RdmaTransport::CutFromIOBuf(butil::IOBuf* buf) {
    if (_rdma_ep && _rdma_state != RDMA_OFF) {
        butil::IOBuf* data_arr[1] = { buf };
        return _rdma_ep->CutFromIOBufList(data_arr, 1);
    } else {
        return _tcp_transport->CutFromIOBuf(buf);
    }
}

wwbmmm avatar Dec 16 '25 11:12 wwbmmm

是的,当前方案只是将存在rdma宏定义的方法简单抽取出来,然后拆分到不同transport子类上

这种方法看似简单,但实际实现可能会遇到问题,这些方法里面会访问Socket的私有成员,这就意味着具体Transport的实现里面,还要访问Socket的私有成员。这样并没有从根本上解决Socket和Transport解耦的问题,而且会引入新的问题——Socket里和Transport无关的一些代码被复制到多个Transport的实现代码里,增加了维护成本。

因此,我不建议这种改法,我觉得可以这样:

  1. 将Socket里面,和特定transport相关的私有成员抽取出来到各自的transport中,如

class Socket { private: Transport* _transport; };

class TcpTransport { private: int _fd; // ...其他tcp独有的成员 };

class RdmaTransport { private: rdma::RdmaEndpoint* _rdma_ep; RdmaState _rdma_state; TcpTransport* _tcp_transport; // RDMA状态关闭时, fallback到tcp // ...其他rdma独有的成员 }; 2. 把Socket里面,#if BRPC_WITH_RDMA 相关的代码抽取到transport中,如

StartWrite里面

if (_conn) {
    butil::IOBuf* data_arr[1] = { &req->data };
    nw = _conn->CutMessageIntoFileDescriptor(fd(), data_arr, 1);
} else {

#if BRPC_WITH_RDMA if (_rdma_ep && _rdma_state != RDMA_OFF) { butil::IOBuf* data_arr[1] = { &req->data }; nw = _rdma_ep->CutFromIOBufList(data_arr, 1); } else { #else { #endif nw = req->data.cut_into_file_descriptor(fd()); } } 这段代码可以改成

if (_conn) {
    butil::IOBuf* data_arr[1] = { &req->data };
    nw = _conn->CutMessageIntoFileDescriptor(fd(), data_arr, 1);
} else {
    nw = _transport->CutFromIOBuf(&req->data);
}

然后各自的transport里面

int TcpTransport::CutFromIOBuf(butil::IOBuf* buf) { return buf->cut_into_file_descriptor(_fd); }

int RdmaTransport::CutFromIOBuf(butil::IOBuf* buf) { if (_rdma_ep && _rdma_state != RDMA_OFF) { butil::IOBuf* data_arr[1] = { buf }; return _rdma_ep->CutFromIOBufList(data_arr, 1); } else { return _tcp_transport->CutFromIOBuf(buf); } }

这种改法看着确实比将整个函数下推更好,脑测应该可以满足目前RDMA和TCP的需求,但是我有一个疑问,就是这种实现只能新增一些操作,不能减少一些操作,如果后续加入的传输层协议对目前的非宏定义部分的操作有一些不需要的也会因为这个架构而被迫执行响应的操作,可能会有一些不必要的性能损耗? 那个时候再将对应的操作下推到其他协议中来避免这个问题可以吗? 或者对这一点损耗不进行处理(如果损耗很小)? 或者可以确定不会出现这种问题?

Shaozheng528 avatar Dec 17 '25 02:12 Shaozheng528

这种改法看着确实比将整个函数下推更好,脑测应该可以满足目前RDMA和TCP的需求,但是我有一个疑问,就是这种实现只能新增一些操作,不能减少一些操作,如果后续加入的传输层协议对目前的非宏定义部分的操作有一些不需要的也会因为这个架构而被迫执行响应的操作,可能会有一些不必要的性能损耗? 那个时候再将对应的操作下推到其他协议中来避免这个问题可以吗? 或者对这一点损耗不进行处理(如果损耗很小)? 或者可以确定不会出现这种问题?

可以具体问题具体分析吧,如果损耗较小可以不处理,损耗较大可以下推到transport,或者做成开关,由transport来控制这个开关

wwbmmm avatar Dec 17 '25 02:12 wwbmmm

这种方法看似简单,但实际实现可能会遇到问题,这些方法里面会访问Socket的私有成员,这就意味着具体Transport的实现里面,还要访问Socket的私有成员。这样并没有从根本上解决Socket和Transport解耦的问题,而且会引入新的问题——Socket里和Transport无关的一些代码被复制到多个Transport的实现代码里,增加了维护成本。

因此,我不建议这种改法,我觉得可以这样:

将Socket里面,和特定transport相关的私有成员抽取出来到各自的transport中,如

按照这个方式我梳理了下感觉可行,结合个人的理解对需要下推的代码部分梳理,主要思路就是把RDMA宏定义代码段下推到Transport接口中并标识出所属源方法,然后对代码公共部分合并抽象出Transport接口。以下是具体的抽象结果和整理的改进想法,@wwbmmm 可以一起再探讨下看看。

trasnport接口最新定义: Image

源代码梳理和改进思路: Image

zchuango avatar Dec 23 '25 11:12 zchuango