基于Reactor模型的高性能网络库之Tcpserver组件-上层调度器

简介: TcpServer 是一个用于管理 TCP 连接的类,包含成员变量如事件循环(EventLoop)、连接池(ConnectionMap)和回调函数等。其主要功能包括监听新连接、设置线程池、启动服务器及处理连接事件。通过 Acceptor 接收新连接,并使用轮询算法将连接分配给子事件循环(subloop)进行读写操作。调用链从 start() 开始,经由线程池启动和 Acceptor 监听,最终由 TcpConnection 管理具体连接的事件处理。

成员变量

using  ThreadInitCallback=std::function<void(EventLoop*)>;
enum  Option// 是否开启端口复用,Acceptor 构造时会根据这个配置创建监听 socket。
{
    KNoReusePort,
    KReusePort,
};
using   ConnectionMap=std::unordered_map<std::string,TcpconnectionPtr>;
    EventLoop  *loop_;//baseLoop用户定义的loop

    const std::string   ipPort_;
    const  std::string   name_;

    std::unique_ptr<Acceptor>acceptor_;//运行在mainloop,监听新连接事件

    std::shared_ptr<EventLoopThreadPool>threadPool_;//one loop per   thread

    std::atomic_int started_;//标记是否启动过,防止 start() 被重复调用

    int  nextConnId_;// 自增连接编号
    ConnectionMap   connections_;//保存所有的连接


    回调:
      ConnectionCallback  connectionCallback_;//新连接时的回调
    MessageCallback  messageCallback_; //有读写消息时的回调
    WriteCompleteCallback writeCompleteCallback_;//消息发送完成的回调
    
    ThreadInitCallback    threadInitCallback_;//loop线程初始化的回调

成员函数

    void  setThreadInitcallback(const  ThreadInitCallback&cb){threadInitCallback_=cb;}//新线程初始化回调
    void  setConnectionCallback(const  ConnectionCallback&cb){connectionCallback_=cb;}//有连接建立时的回调
    void  setMessageCallback(const  MessageCallback&cb){messageCallback_=cb;}//有消息到达时的回调
    void  setWriteCompleteCallback(const  WriteCompleteCallback&cb){writeCompleteCallback_=cb;}//消息发送完成时的回调




    static  EventLoop*  CheckLoopNotNull(EventLoop*loop)//返回不为null的loop,,用于防止后续代码因空指针解引用而崩溃。
{
    if(loop==nullptr)
    {
        LOG_FATAL("%s:%s:%d mainloop is null!\n",__FILE__,__FUNCTION__,__LINE__);
    }
    return loop;
}




//loop:主 Reactor 的事件循环(BaseLoop)。
listenAddr:服务器监听地址。
nameArg:服务器名称。
option:是否启用端口复用(KReusePort)。
  TcpServer::TcpServer(EventLoop*loop,const  InetAddress  &ListenAddr,const  std::string &nameArg,Option  option)
  :loop_(CheckLoopNotNull(loop)),
  ipPort_(ListenAddr.toIpPort()),
  name_(nameArg),
  acceptor_(new Acceptor(loop,ListenAddr,option==KReusePort)),
  /**
   * 创建listenfd=>封装成为acceptchannel=>acceptchannel.enableReading向poller注册关注读事件=》poller进行监听,如果有读事件发生
   * =》调用回调函数acceptor::handleread=>返回connectfd=>newconnectioncallback被调用(Tcpserver设置(setnewconnectioncallback))
   */
    threadPool_(new EventLoopThreadPool(loop,name_)),//创建线程池
    connectionCallback_(),//初始化为空,交给上层设置
    messageCallback_(),
    nextConnId_(1),//连接 ID 生成器(从 1 开始递增),用于标识每个 TCP 连接。
    started_(0)//原子标志位(0 = 未启动,1 = 已启动),防止重复调用 start()。
  {
    //当有新用户连接时,会执行TcpServer::newConnection回调
        acceptor_->setNewconnectionCallback(std::bind(&TcpServer::newConnection,this,std::placeholders::_1,std::placeholders::_2));
  }
  accept工作流程
//
 * 创建listenfd => 封装成为acceptChannel => acceptChannel.enableReading() 向poller注册关注读事件 
 * => poller进行监听,如果有读事件发生 
 * => 调用回调函数Acceptor::handleRead() 
 * => 返回connfd => newConnectionCallback被调用(TcpServer设置)
 */


  

    TcpServer::~TcpServer()
    {
        for(auto&item :connections_)//遍历连接池:connections_ 是一个 unordered_map<std::string, TcpconnectionPtr>
        {
            TcpconnectionPtr  conn(item.second);//这个局部强智能指针对象,出右括号,可以自动释放new出来的Tcpconnection对象,构造一个局部的 shared_ptr 智能指针 conn,从 item.second 拷贝构造
            item.second.reset();//通过 reset() 释放哈希表中该条目对 TcpConnection 对象的引用。
            //销毁链接
            conn->getLoop()->runInLoop(std::bind(&Tcpconnection::connectDestroyed,conn));
            /**
             * 将连接销毁操作,转移到连接所属的subloop线程中去执行。
            conn->getLoop() 获取的是该连接对象所归属的 EventLoop(一般是某个 subLoop)。
            runInLoop(...) 是异步操作:如果当前线程是该 loop 所在线程,就立即执行;否则会将任务加入该线程的事件队列中。
             */
        }
     }




   void   TcpServer::setThreadNum(int numThreads)
   {
      threadPool_->setThreadNum(numThreads);
   }



   
    void   TcpServer::start()//开启服务器监听
    {
        if(started_++==0)//防止一个tcpserver对象被start多次
        {
            threadPool_->start(threadInitCallback_);//启动底层的subloop线程池
            loop_->runInLoop(std::bind(&Acceptor::listen,acceptor_.get()));
            // acceptor_->listen(),主线程开始监听
            //将 listenfd 注册到主 Reactor 的 Poller 中,监听读事件。
            //主 Reactor 开始循环处理事件,当有新连接时,调用 Acceptor::handleRead()。
            //新连接被接受后,通过 newConnectionCallback 分配给subloop线程处理
        }
    }



        //有一个新的客户端连接,会执行这个回调
   void  TcpServer::newConnection(int  sockfd,const   InetAddress &peerAddr)
   {
        EventLoop*ioloop=threadPool_->getNextLoop();//轮询算法选择一个subloop,来管理channel
        char buf[64]={0};
        snprintf(buf,sizeof  buf,"-%s#%d", ipPort_.c_str(),nextConnId_);
        ++nextConnId_;
        std::string connName=name_+buf;
//连接名称格式:服务器名-IP:端口#连接ID(例如:MyServer-127.0.0.1:8080#1)。
        LOG_INFO("Tcpserver::newConnection[%s]-new connection [%s]from  %s\n",name_.c_str(),connName.c_str(),peerAddr.toIpPort().c_str());

        //通过sockfd获取其绑定的本机ip地址和端口信息
        sockaddr_in   local;
        ::bzero(&local,sizeof  local);
        socklen_t  addrlen=sizeof  local;
        if(::getsockname(sockfd,(sockaddr*)&local,&addrlen)<0)
        {
            LOG_ERROR("sockets::getLocalAddr");
        }
        InetAddress  localAddr(local);
//通过 getsockname 获取套接字绑定的本地地址(服务器地址),将结果封装为 InetAddress 对象 localAddr。

        //根据连接成功的sockfd,创建TcpConnection连接对象
        TcpconnectionPtr  conn(new   Tcpconnection(
        
            ioloop,
            connName,
            sockfd,        //socket,channel
            localAddr,
            peerAddr));
        connections_[connName]=conn;
        //下面的回调都是用户设置的给tcpserver->tcpconnection->channel->poller->notify  channel调用回调
        conn->setConnectionCallback(connectionCallback_);
        conn->setMessageCallback(messageCallback_);
        conn->setWriteCompleteCallback(writeCompleteCallback_);
        //设置了如何关闭连接的回调  conn->shutdown
        conn->setCloseCallback(std::bind(&TcpServer::removeConnection,this,std::placeholders::_1));
        //直接调用Tcpconnection::connectEstablished
        ioloop->runInLoop(std::bind(&Tcpconnection::connectEstablished,conn));
   }

//这个是acceptor接受新连接之后,将connfd和客户端地址传给Tcpserver,剩下交给Tcpserver来协调,从线程池选一个subloop来管理这条连接,
然后获取本机地址,将subloop,连接名称,connfd以及客户端地址,服务器地址来创建conn对象,运行在subloop线程管理这条连接,将新建的连接
加入tcpserver的连接池中,进行管理,设置conn的connectioncallback之类地回调,交由用户在设置,最后连接成功后,在subloop中执行tcpserver::connectEstablished
,成功转接给subloop,现在conn开始管理连接





//外部调用的接口,用于请求销毁一个 TCP 连接。
     void   TcpServer::removeConnection(const  TcpconnectionPtr  &conn)//removeConnection():线程安全地销毁连接
     {
            //线程安全地销毁一个 Tcp 连接对象 conn,并确保它的销毁逻辑是在 TcpServer 所在线程(也就是 main reactor)中执行的。
            无论调用者位于哪个线程,removeConnectionInLoop 都会在主 Reactor 线程中执行,避免多线程竞争对 connections_ 哈希表的修改。
            loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop,this,conn));
     }








      */
    void   TcpServer::removeConnectionInLoop(const  TcpconnectionPtr  &conn)//该函数在 IO 线程(EventLoop 线程)中被执行,用于真正地移除并销毁一个连接
    {
            LOG_INFO("TcpServer::removeConnectionInLoop  [%s]-connection %s\n",name_.c_str(),conn->name().c_str());//打印日志,记录当前正在执行删除连接操作,服务器的名称+连接名称
            connections_.erase(conn->name());//从 TcpServer 的连接映射表 connections_ 中移除这个连接。从连接管理器中移除:
            EventLoop  *ioLoop=conn->getLoop();//获取连接所属的subloop线程
            ioLoop->queueInLoop(std::bind(&Tcpconnection::connectDestroyed,conn));
            //在sub线程中执行销毁操作:
            //connectDestroyed 是 TcpConnection 的成员函数,负责:
   Poller 中注销该连接的所有事件。
  关闭套接字(close(sockfd))。
  调用用户的 connectionCallback_(通知连接已关闭)。

调用链

Tcpserver:其实就是对所有连接的一个管理,以及处理acceptor的新连接,设置回调函数交给subloop处理新连接的读写事件(将connfd和客户端地址返回了),这个新连接由tcpserver创建一个conn对象来进行管理


整个调用链:Tcpserver(包含一个baseloop主线程)调用start->底层线程池启动,创建多个subloop,一个subloop在一条独立线程

然后acceptor组件开始监听,内部创建listenfd,channel通过baseloop向poller注册事件,有新连接的话,调用回调acceptor::handleread(),会得到connfd和客户端地址,这里面会调用回调函数Tcpserver::newconnectioncallback(),acceptor通过这个回调函数把connfd和客户端地址交给上层tcpserver


在Tcpserver::newconnectioncallback()中,会轮询选择任意一个subloop,然后创建一个Tcpconnection对象conn,把这个连接交由conn管理,并放进subloop线程里面;

具体的读写事件都交给Tcpconnection管理,通过channel向poller注册,可读,可写,连接关闭,连接失败事件,一旦poller检测到connfd有事件发生,调用对应的回调函数通过缓冲区进行处理

相关文章
|
5月前
|
JSON 监控 网络协议
干货分享“对接的 API 总是不稳定,网络分层模型” 看电商 API 故障的本质
本文从 OSI 七层网络模型出发,深入剖析电商 API 不稳定的根本原因,涵盖物理层到应用层的典型故障与解决方案,结合阿里、京东等大厂架构,详解如何构建高稳定性的电商 API 通信体系。
|
5月前
基于Reactor模式的高性能网络库github地址
https://github.com/zyi30/reactor-net.git
153 0
|
2月前
|
机器学习/深度学习 数据采集 人工智能
深度学习实战指南:从神经网络基础到模型优化的完整攻略
🌟 蒋星熠Jaxonic,AI探索者。深耕深度学习,从神经网络到Transformer,用代码践行智能革命。分享实战经验,助你构建CV、NLP模型,共赴二进制星辰大海。
|
3月前
|
机器学习/深度学习 传感器 算法
【无人车路径跟踪】基于神经网络的数据驱动迭代学习控制(ILC)算法,用于具有未知模型和重复任务的非线性单输入单输出(SISO)离散时间系统的无人车的路径跟踪(Matlab代码实现)
【无人车路径跟踪】基于神经网络的数据驱动迭代学习控制(ILC)算法,用于具有未知模型和重复任务的非线性单输入单输出(SISO)离散时间系统的无人车的路径跟踪(Matlab代码实现)
264 2
|
3月前
|
机器学习/深度学习 并行计算 算法
【CPOBP-NSWOA】基于豪冠猪优化BP神经网络模型的多目标鲸鱼寻优算法研究(Matlab代码实现)
【CPOBP-NSWOA】基于豪冠猪优化BP神经网络模型的多目标鲸鱼寻优算法研究(Matlab代码实现)
100 8
|
3月前
|
监控 前端开发 安全
Netty 高性能网络编程框架技术详解与实践指南
本文档全面介绍 Netty 高性能网络编程框架的核心概念、架构设计和实践应用。作为 Java 领域最优秀的 NIO 框架之一,Netty 提供了异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。本文将深入探讨其 Reactor 模型、ChannelPipeline、编解码器、内存管理等核心机制,帮助开发者构建高性能的网络应用系统。
259 0
|
4月前
|
算法 安全 网络安全
【多智能体系统】遭受DoS攻击的网络物理多智能体系统的弹性模型预测控制MPC研究(Simulink仿真实现)
【多智能体系统】遭受DoS攻击的网络物理多智能体系统的弹性模型预测控制MPC研究(Simulink仿真实现)
238 0
|
4月前
|
机器学习/深度学习 算法 数据库
基于GoogleNet深度学习网络和GEI步态能量提取的步态识别算法matlab仿真,数据库采用CASIA库
本项目基于GoogleNet深度学习网络与GEI步态能量图提取技术,实现高精度步态识别。采用CASI库训练模型,结合Inception模块多尺度特征提取与GEI图像能量整合,提升识别稳定性与准确率,适用于智能安防、身份验证等领域。
|
4月前
|
运维 监控 安全
计算机网络及其安全组件纲要
本文主要介绍了 “计算机网络及常见组件” 的基本概念,涵盖网卡、IP、MAC、OSI模型、路由器、交换机、防火墙、WAF、IDS、IPS、域名、HTTP、HTTPS、网络拓扑等内容。
303 0
|
SQL 安全 网络安全
网络安全与信息安全:知识分享####
【10月更文挑战第21天】 随着数字化时代的快速发展,网络安全和信息安全已成为个人和企业不可忽视的关键问题。本文将探讨网络安全漏洞、加密技术以及安全意识的重要性,并提供一些实用的建议,帮助读者提高自身的网络安全防护能力。 ####
286 17

热门文章

最新文章