muduo源码分析:TcpConnection类 超、凢脫俗 2022-04-22 03:26 157阅读 0赞 ### 前言 ### 前面学习了TcpServer的实现,TcpServer对每个连接都会新建一个TcpConnection(使用shared\_ptr管理)。接下来学习一下TcpConnection的设计细节。 ### 连接状态 ### muduo对于一个连接的从生到死进行了状态的定义,类似一个状态机。 enum States { kDisconnected, kConnecting, kConnected, kDisconnecting }; 分别代表:已经断开、初始状态、已连接、正在断开 ### TcpConnection.h ### class TcpConnection : boost::noncopyable, public boost::enable_shared_from_this<TcpConnection> { public: TcpConnection(EventLoop* loop, const string& name, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr); ~TcpConnection(); EventLoop* getLoop() const { return loop_; } const string& name() const { return name_; } const InetAddress& localAddress() const { return localAddr_; } const InetAddress& peerAddress() const { return peerAddr_; } bool connected() const { return state_ == kConnected; } bool disconnected() const { return state_ == kDisconnected; } // return true if success. bool getTcpInfo(struct tcp_info*) const; string getTcpInfoString() const; // void send(string&& message); //下面三个send()函数给连接发送数据 void send(const void* message, int len); void send(const StringPiece& message); void send(Buffer* message); // this one will swap data void shutdown(); //关闭该链接写端 void forceClose(); //强制关闭该连接 void forceCloseWithDelay(double seconds); void setTcpNoDelay(bool on); void startRead(); void stopRead(); bool isReading() const { return reading_; }; // NOT thread safe, may race with start/stopReadInLoop void setContext(const boost::any& context) { context_ = context; } const boost::any& getContext() const { return context_; } boost::any* getMutableContext() { return &context_; } //以下接口为设置连接对应的各种回调: void setConnectionCallback(const ConnectionCallback& cb) { connectionCallback_ = cb; } void setMessageCallback(const MessageCallback& cb) { messageCallback_ = cb; } void setWriteCompleteCallback(const WriteCompleteCallback& cb) { writeCompleteCallback_ = cb; } void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark) { highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; } /// Advanced interface Buffer* inputBuffer() { return &inputBuffer_; } Buffer* outputBuffer() { return &outputBuffer_; } /// Internal use only. void setCloseCallback(const CloseCallback& cb) { closeCallback_ = cb; } void connectEstablished(); // should be called only once void connectDestroyed(); // should be called only once private: enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting }; void handleRead(Timestamp receiveTime); //处理读事件 void handleWrite(); //处理写事件 void handleClose(); //处理关闭事件 void handleError(); //处理错误事件 void sendInLoop(const StringPiece& message); void sendInLoop(const void* message, size_t len); void shutdownInLoop(); void forceCloseInLoop(); void setState(StateE s) { state_ = s; } const char* stateToString() const; void startReadInLoop(); void stopReadInLoop(); EventLoop* loop_; const string name_; StateE state_; // FIXME: use atomic variable bool reading_; // we don't expose those classes to client. boost::scoped_ptr<Socket> socket_; //连接对应的套接字 boost::scoped_ptr<Channel> channel_; //对应的事件分发器channel const InetAddress localAddr_; const InetAddress peerAddr_; //关注三个半事件 (这几个回调函数通过handle**那四个事件处理函数调用) ConnectionCallback connectionCallback_; //新连接建立回调函数 MessageCallback messageCallback_; //消息到达回调函数 WriteCompleteCallback writeCompleteCallback_; //写完毕回调函数 HighWaterMarkCallback highWaterMarkCallback_; CloseCallback closeCallback_; //连接关闭回调函数 size_t highWaterMark_; //输入输出缓冲区 Buffer inputBuffer_; Buffer outputBuffer_; // FIXME: use list<Buffer> as output buffer. boost::any context_; }; 先理解上面的 `loop_`, `socket_`, `channel_`好了,不明白请翻阅前几篇文章。 ### TcpConnection::TcpConnection() ### TcpConnection::TcpConnection(EventLoop* loop, //构造函数 const string& nameArg, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr) : loop_(CHECK_NOTNULL(loop)), name_(nameArg), state_(kConnecting), //初始状态为kConnection reading_(true), socket_(new Socket(sockfd)),//RAII管理已连接套接字 channel_(new Channel(loop, sockfd)), //使用Channel管理套接字上的读写 localAddr_(localAddr), peerAddr_(peerAddr), highWaterMark_(64*1024*1024) { //设置事件分发器的各事件回调 (将TcpConnection类的四个事件处理函数设置为事件分发器对应的回调函数) channel_->setReadCallback( boost::bind(&TcpConnection::handleRead, this, _1)); channel_->setWriteCallback( boost::bind(&TcpConnection::handleWrite, this)); channel_->setCloseCallback( boost::bind(&TcpConnection::handleClose, this)); channel_->setErrorCallback( boost::bind(&TcpConnection::handleError, this)); LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this << " fd=" << sockfd; socket_->setKeepAlive(true); } 构造函数在初始化列表中对socket、channel等进行了初始化,在函数体中设置了回调函数。 ### TcpConnection::~TcpConnection() ### TcpConnection::~TcpConnection() { LOG_DEBUG << "TcpConnection::dtor[" << name_ << "] at " << this << " fd=" << channel_->fd() << " state=" << stateToString(); assert(state_ == kDisconnected); } -------------------- # TcpConnection::handleRead() # void TcpConnection::handleRead(Timestamp receiveTime) //读事件处理,调用设置的messageCallback_函数 { loop_->assertInLoopThread(); int savedErrno = 0; ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); if (n > 0) { //调用回调函数,使用shared_from_this()得到自身的shared_ptr,延长了该对象的生命期,保证了它的生命期长过messageCallback_函数,messageCallback_能安全的使用它。 messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); } else if (n == 0) { handleClose(); } else { errno = savedErrno; LOG_SYSERR << "TcpConnection::handleRead"; handleError(); } } 前面提到了,在已连接套接字可读时,调用`TcpConnection::handleRead`,进而调用用户设置的回调函数`messageCallback_` -------------------- ### ### ### TcpConnection::handleWrite() ### void TcpConnection::handleWrite() //写事件处理,调用设置的writeCompleteCallback函数 { loop_->assertInLoopThread(); if (channel_->isWriting()) { ssize_t n = sockets::write(channel_->fd(), outputBuffer_.peek(), outputBuffer_.readableBytes()); if (n > 0) { outputBuffer_.retrieve(n); if (outputBuffer_.readableBytes() == 0) { channel_->disableWriting(); if (writeCompleteCallback_) { loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this())); } if (state_ == kDisconnecting) { shutdownInLoop(); } } } else { LOG_SYSERR << "TcpConnection::handleWrite"; // if (state_ == kDisconnecting) // { // shutdownInLoop(); // } } } else { LOG_TRACE << "Connection fd = " << channel_->fd() << " is down, no more writing"; } } -------------------- ### ### ### TcpConnection::handleClose() ### void TcpConnection::handleClose() //连接关闭处理函数,调用设置的connectionCallback_和closeCallback_回调 { loop_->assertInLoopThread(); LOG_TRACE << "fd = " << channel_->fd() << " state = " << stateToString(); assert(state_ == kConnected || state_ == kDisconnecting); // we don't close fd, leave it to dtor, so we can find leaks easily. setState(kDisconnected); //设置状态为kDisconnected,表示已断开 channel_->disableAll(); //移除注册的事件,使用epoll时是EPOLL_CTL_DEL TcpConnectionPtr guardThis(shared_from_this()); //延长本对象的生命周期,引用技术为2 //调用用户回调函数 connectionCallback_(guardThis); //参数为shared_ptr,保证了 connectionCallback_能安全的使用本对象 // 调用TcpServer::removeConnection closeCallback_(guardThis); } 连接断开时,会调用TcpConnection::handleClose;接着调用用户回调connectionCallback\_;最后调用closeCallback\_,即TcpServer::removeConnection(TcpServer创建TcpConnection时设置的) -------------------- ## TcpServer::removeConnection() ## void TcpServer::removeConnection(const TcpConnectionPtr& conn) { loop_->runInLoop(boost::bind(&TcpServer::removeConnectionInLoop, this, conn)); } void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn) { loop_->assertInLoopThread(); LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_ << "] - connection " << conn->name(); // 根据conn的name,从map容器中删除,此时引用计数会减1。erase之前引用计数为2(由前面的shared_from_this()保证),所以执行完erase,引用计数变为1 size_t n = connections_.erase(conn->name()); assert(n == 1); // 然后调用conn->connectDestroyed EventLoop* ioLoop = conn->getLoop(); ioLoop->queueInLoop( boost::bind(&TcpConnection::connectDestroyed, conn)); // bind延长了conn的生命期,connectDestroyed完成后,TcpConnection被析构。 // FIXME wake up ? } TcpServer先将该conn从map容器中删除,因为erase之前使用了shared\_from\_this,所以erase之前引用计数为2,那么erase之后引用计数将变为1。 如果没用shared\_from\_this,仅仅传递了一个裸指针过来,erase之后引用计数变为0,那么该TcpConnection会被析构!这意味着TcpConnection的Channel也会被析构,可是你现在正在使用该Channel啊(结合上图看),怎么能在使用某个对象的时候把它析构呢,这是严重的错误。所以muduo使用shared\_ptr管理TcpConnection,避免了上述问题。 最后queueInLoop就是将TcpConnection::connectDestroyed函数移动到EventLoop中执行,执行位置就是在Channel->handleEvent之后,此时可以安全的析构TcpConnection。(这么做的原因见前面) 注意上面最后的boost::bind,它让TcpConnection的生命期长到调用connectDestroyed的时刻。在connectDestroyed执行完之后,TcpConnection才被析构。 ## TcpConnection::connectDestroyed ## void TcpConnection::connectDestroyed() { loop_->assertInLoopThread(); if (state_ == kConnected) { setState(kDisconnected); channel_->disableAll(); connectionCallback_(shared_from_this()); } // 将EventLoop.Poller中的该channel从容器中删除 loop_->removeChannel(get_pointer(channel_)); } `TcpConnection::connectDestroyed`是该对象析构前调用的最后一个成员函数,它会通知用户连接已经断开。
还没有评论,来说两句吧...