成员变量
using ThreadInitCallback=std::function<void(EventLoop*)>; enum Option// 是否开启端口复用,Acceptor 构造时会根据这个配置创建监听 socket。 { KNoReusePort, KReusePort, }; using ConnectionMap=std::unordered_map<std::string,TcpconnectionPtr>; EventLoop *loop_;//baseLoop用户定义的loop const std::string ipPort_; const std::string name_; std::unique_ptr<Acceptor>acceptor_;//运行在mainloop,监听新连接事件 std::shared_ptr<EventLoopThreadPool>threadPool_;//one loop per thread std::atomic_int started_;//标记是否启动过,防止 start() 被重复调用 int nextConnId_;// 自增连接编号 ConnectionMap connections_;//保存所有的连接 回调: ConnectionCallback connectionCallback_;//新连接时的回调 MessageCallback messageCallback_; //有读写消息时的回调 WriteCompleteCallback writeCompleteCallback_;//消息发送完成的回调 ThreadInitCallback threadInitCallback_;//loop线程初始化的回调
成员函数
void setThreadInitcallback(const ThreadInitCallback&cb){threadInitCallback_=cb;}//新线程初始化回调 void setConnectionCallback(const ConnectionCallback&cb){connectionCallback_=cb;}//有连接建立时的回调 void setMessageCallback(const MessageCallback&cb){messageCallback_=cb;}//有消息到达时的回调 void setWriteCompleteCallback(const WriteCompleteCallback&cb){writeCompleteCallback_=cb;}//消息发送完成时的回调 static EventLoop* CheckLoopNotNull(EventLoop*loop)//返回不为null的loop,,用于防止后续代码因空指针解引用而崩溃。 { if(loop==nullptr) { LOG_FATAL("%s:%s:%d mainloop is null!\n",__FILE__,__FUNCTION__,__LINE__); } return loop; } //loop:主 Reactor 的事件循环(BaseLoop)。 listenAddr:服务器监听地址。 nameArg:服务器名称。 option:是否启用端口复用(KReusePort)。 TcpServer::TcpServer(EventLoop*loop,const InetAddress &ListenAddr,const std::string &nameArg,Option option) :loop_(CheckLoopNotNull(loop)), ipPort_(ListenAddr.toIpPort()), name_(nameArg), acceptor_(new Acceptor(loop,ListenAddr,option==KReusePort)), /** * 创建listenfd=>封装成为acceptchannel=>acceptchannel.enableReading向poller注册关注读事件=》poller进行监听,如果有读事件发生 * =》调用回调函数acceptor::handleread=>返回connectfd=>newconnectioncallback被调用(Tcpserver设置(setnewconnectioncallback)) */ threadPool_(new EventLoopThreadPool(loop,name_)),//创建线程池 connectionCallback_(),//初始化为空,交给上层设置 messageCallback_(), nextConnId_(1),//连接 ID 生成器(从 1 开始递增),用于标识每个 TCP 连接。 started_(0)//原子标志位(0 = 未启动,1 = 已启动),防止重复调用 start()。 { //当有新用户连接时,会执行TcpServer::newConnection回调 acceptor_->setNewconnectionCallback(std::bind(&TcpServer::newConnection,this,std::placeholders::_1,std::placeholders::_2)); } accept工作流程 // * 创建listenfd => 封装成为acceptChannel => acceptChannel.enableReading() 向poller注册关注读事件 * => poller进行监听,如果有读事件发生 * => 调用回调函数Acceptor::handleRead() * => 返回connfd => newConnectionCallback被调用(TcpServer设置) */ TcpServer::~TcpServer() { for(auto&item :connections_)//遍历连接池:connections_ 是一个 unordered_map<std::string, TcpconnectionPtr> { TcpconnectionPtr conn(item.second);//这个局部强智能指针对象,出右括号,可以自动释放new出来的Tcpconnection对象,构造一个局部的 shared_ptr 智能指针 conn,从 item.second 拷贝构造 item.second.reset();//通过 reset() 释放哈希表中该条目对 TcpConnection 对象的引用。 //销毁链接 conn->getLoop()->runInLoop(std::bind(&Tcpconnection::connectDestroyed,conn)); /** * 将连接销毁操作,转移到连接所属的subloop线程中去执行。 conn->getLoop() 获取的是该连接对象所归属的 EventLoop(一般是某个 subLoop)。 runInLoop(...) 是异步操作:如果当前线程是该 loop 所在线程,就立即执行;否则会将任务加入该线程的事件队列中。 */ } } void TcpServer::setThreadNum(int numThreads) { threadPool_->setThreadNum(numThreads); } void TcpServer::start()//开启服务器监听 { if(started_++==0)//防止一个tcpserver对象被start多次 { threadPool_->start(threadInitCallback_);//启动底层的subloop线程池 loop_->runInLoop(std::bind(&Acceptor::listen,acceptor_.get())); // acceptor_->listen(),主线程开始监听 //将 listenfd 注册到主 Reactor 的 Poller 中,监听读事件。 //主 Reactor 开始循环处理事件,当有新连接时,调用 Acceptor::handleRead()。 //新连接被接受后,通过 newConnectionCallback 分配给subloop线程处理 } } //有一个新的客户端连接,会执行这个回调 void TcpServer::newConnection(int sockfd,const InetAddress &peerAddr) { EventLoop*ioloop=threadPool_->getNextLoop();//轮询算法选择一个subloop,来管理channel char buf[64]={0}; snprintf(buf,sizeof buf,"-%s#%d", ipPort_.c_str(),nextConnId_); ++nextConnId_; std::string connName=name_+buf; //连接名称格式:服务器名-IP:端口#连接ID(例如:MyServer-127.0.0.1:8080#1)。 LOG_INFO("Tcpserver::newConnection[%s]-new connection [%s]from %s\n",name_.c_str(),connName.c_str(),peerAddr.toIpPort().c_str()); //通过sockfd获取其绑定的本机ip地址和端口信息 sockaddr_in local; ::bzero(&local,sizeof local); socklen_t addrlen=sizeof local; if(::getsockname(sockfd,(sockaddr*)&local,&addrlen)<0) { LOG_ERROR("sockets::getLocalAddr"); } InetAddress localAddr(local); //通过 getsockname 获取套接字绑定的本地地址(服务器地址),将结果封装为 InetAddress 对象 localAddr。 //根据连接成功的sockfd,创建TcpConnection连接对象 TcpconnectionPtr conn(new Tcpconnection( ioloop, connName, sockfd, //socket,channel localAddr, peerAddr)); connections_[connName]=conn; //下面的回调都是用户设置的给tcpserver->tcpconnection->channel->poller->notify channel调用回调 conn->setConnectionCallback(connectionCallback_); conn->setMessageCallback(messageCallback_); conn->setWriteCompleteCallback(writeCompleteCallback_); //设置了如何关闭连接的回调 conn->shutdown conn->setCloseCallback(std::bind(&TcpServer::removeConnection,this,std::placeholders::_1)); //直接调用Tcpconnection::connectEstablished ioloop->runInLoop(std::bind(&Tcpconnection::connectEstablished,conn)); } //这个是acceptor接受新连接之后,将connfd和客户端地址传给Tcpserver,剩下交给Tcpserver来协调,从线程池选一个subloop来管理这条连接, 然后获取本机地址,将subloop,连接名称,connfd以及客户端地址,服务器地址来创建conn对象,运行在subloop线程管理这条连接,将新建的连接 加入tcpserver的连接池中,进行管理,设置conn的connectioncallback之类地回调,交由用户在设置,最后连接成功后,在subloop中执行tcpserver::connectEstablished ,成功转接给subloop,现在conn开始管理连接 //外部调用的接口,用于请求销毁一个 TCP 连接。 void TcpServer::removeConnection(const TcpconnectionPtr &conn)//removeConnection():线程安全地销毁连接 { //线程安全地销毁一个 Tcp 连接对象 conn,并确保它的销毁逻辑是在 TcpServer 所在线程(也就是 main reactor)中执行的。 无论调用者位于哪个线程,removeConnectionInLoop 都会在主 Reactor 线程中执行,避免多线程竞争对 connections_ 哈希表的修改。 loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop,this,conn)); } */ void TcpServer::removeConnectionInLoop(const TcpconnectionPtr &conn)//该函数在 IO 线程(EventLoop 线程)中被执行,用于真正地移除并销毁一个连接 { LOG_INFO("TcpServer::removeConnectionInLoop [%s]-connection %s\n",name_.c_str(),conn->name().c_str());//打印日志,记录当前正在执行删除连接操作,服务器的名称+连接名称 connections_.erase(conn->name());//从 TcpServer 的连接映射表 connections_ 中移除这个连接。从连接管理器中移除: EventLoop *ioLoop=conn->getLoop();//获取连接所属的subloop线程 ioLoop->queueInLoop(std::bind(&Tcpconnection::connectDestroyed,conn)); //在sub线程中执行销毁操作: //connectDestroyed 是 TcpConnection 的成员函数,负责: 从 Poller 中注销该连接的所有事件。 关闭套接字(close(sockfd))。 调用用户的 connectionCallback_(通知连接已关闭)。
调用链
Tcpserver:其实就是对所有连接的一个管理,以及处理acceptor的新连接,设置回调函数交给subloop处理新连接的读写事件(将connfd和客户端地址返回了),这个新连接由tcpserver创建一个conn对象来进行管理
整个调用链:Tcpserver(包含一个baseloop主线程)调用start->底层线程池启动,创建多个subloop,一个subloop在一条独立线程
然后acceptor组件开始监听,内部创建listenfd,channel通过baseloop向poller注册事件,有新连接的话,调用回调acceptor::handleread(),会得到connfd和客户端地址,这里面会调用回调函数Tcpserver::newconnectioncallback(),acceptor通过这个回调函数把connfd和客户端地址交给上层tcpserver
在Tcpserver::newconnectioncallback()中,会轮询选择任意一个subloop,然后创建一个Tcpconnection对象conn,把这个连接交由conn管理,并放进subloop线程里面;
具体的读写事件都交给Tcpconnection管理,通过channel向poller注册,可读,可写,连接关闭,连接失败事件,一旦poller检测到connfd有事件发生,调用对应的回调函数通过缓冲区进行处理