From 8fe6cbd107607e995f3d55584e5fa829e68a6f1f Mon Sep 17 00:00:00 2001 From: an-tao Date: Mon, 8 Jan 2024 14:37:30 +0800 Subject: [PATCH 1/2] Make some changes for performance --- trantor/net/inner/BufferNode.h | 4 +- trantor/net/inner/MemBufferNode.cc | 17 +++- trantor/net/inner/TcpConnectionImpl.cc | 106 +++++++++++++++++++------ trantor/net/inner/TcpConnectionImpl.h | 1 + 4 files changed, 98 insertions(+), 30 deletions(-) diff --git a/trantor/net/inner/BufferNode.h b/trantor/net/inner/BufferNode.h index bbce40e8..fd266d54 100644 --- a/trantor/net/inner/BufferNode.h +++ b/trantor/net/inner/BufferNode.h @@ -65,8 +65,8 @@ class BufferNode : public NonCopyable { isDone_ = true; } - static BufferNodePtr newMemBufferNode(); - + static BufferNodePtr newMemBufferNode(trantor::MsgBuffer buffer); + static BufferNodePtr newMemBufferNode(const char *data, size_t len); static BufferNodePtr newStreamBufferNode(StreamCallback &&cb); #ifdef _WIN32 static BufferNodePtr newFileBufferNode(const wchar_t *fileName, diff --git a/trantor/net/inner/MemBufferNode.cc b/trantor/net/inner/MemBufferNode.cc index c6cd2479..d98513c7 100644 --- a/trantor/net/inner/MemBufferNode.cc +++ b/trantor/net/inner/MemBufferNode.cc @@ -4,7 +4,14 @@ namespace trantor class MemBufferNode : public BufferNode { public: - MemBufferNode() = default; + explicit MemBufferNode(trantor::MsgBuffer &&buffer) + : buffer_(std::move(buffer)) + { + } + MemBufferNode(const char *data, size_t len) : buffer_(len) + { + buffer_.append(data, len); + } void getData(const char *&data, size_t &len) override { @@ -29,8 +36,12 @@ class MemBufferNode : public BufferNode private: trantor::MsgBuffer buffer_; }; -BufferNodePtr BufferNode::newMemBufferNode() +BufferNodePtr BufferNode::newMemBufferNode(trantor::MsgBuffer buffer) +{ + return std::make_shared(std::move(buffer)); +} +BufferNodePtr BufferNode::newMemBufferNode(const char *data, size_t len) { - return std::make_shared(); + return std::make_shared(data, len); } } // namespace trantor diff --git a/trantor/net/inner/TcpConnectionImpl.cc b/trantor/net/inner/TcpConnectionImpl.cc index 8fddf1a5..9ad34bc0 100644 --- a/trantor/net/inner/TcpConnectionImpl.cc +++ b/trantor/net/inner/TcpConnectionImpl.cc @@ -378,6 +378,56 @@ void TcpConnectionImpl::forceClose() } }); } + +void TcpConnectionImpl::sendInLoop(trantor::MsgBuffer &&buffer) +{ + loop_->assertInLoopThread(); + if (status_ != ConnStatus::Connected) + { + LOG_WARN << "Connection is not connected,give up sending"; + return; + } + if (!ioChannelPtr_->isWriting() && writeBufferList_.empty()) + { + // send directly + auto sendLen = writeInLoop(buffer.peek(), buffer.readableBytes()); + if (sendLen < 0) + { + LOG_TRACE << "write error"; + return; + } + buffer.retrieve(sendLen); + } + if (buffer.readableBytes() > 0 && status_ == ConnStatus::Connected) + { + if (writeBufferList_.empty() || writeBufferList_.back()->isFile() || + writeBufferList_.back()->isStream()) + { + writeBufferList_.push_back( + BufferNode::newMemBufferNode(std::move(buffer))); + } + else + { + writeBufferList_.back()->append(buffer.peek(), + buffer.readableBytes()); + } + if (highWaterMarkCallback_ && + writeBufferList_.back()->remainingBytes() > + static_cast(highWaterMarkLen_)) + { + highWaterMarkCallback_(shared_from_this(), + writeBufferList_.back()->remainingBytes()); + } + if (highWaterMarkCallback_ && tlsProviderPtr_ && + tlsProviderPtr_->getBufferedData().readableBytes() > + highWaterMarkLen_) + { + highWaterMarkCallback_( + shared_from_this(), + tlsProviderPtr_->getBufferedData().readableBytes()); + } + } +} #ifndef _WIN32 void TcpConnectionImpl::sendInLoop(const void *buffer, size_t length) #else @@ -407,11 +457,15 @@ void TcpConnectionImpl::sendInLoop(const char *buffer, size_t length) if (writeBufferList_.empty() || writeBufferList_.back()->isFile() || writeBufferList_.back()->isStream()) { - writeBufferList_.push_back(BufferNode::newMemBufferNode()); + writeBufferList_.push_back(BufferNode::newMemBufferNode( + static_cast(buffer) + sendLen, length)); + } + else + { + writeBufferList_.back()->append(static_cast(buffer) + + sendLen, + length); } - writeBufferList_.back()->append(static_cast(buffer) + - sendLen, - length); if (highWaterMarkCallback_ && writeBufferList_.back()->remainingBytes() > static_cast(highWaterMarkLen_)) @@ -448,12 +502,12 @@ void TcpConnectionImpl::send(const std::shared_ptr &msgPtr) { if (loop_->isInLoopThread()) { - sendInLoop(msgPtr->peek(), msgPtr->readableBytes()); + sendInLoop(std::move(*msgPtr)); } else { loop_->queueInLoop([thisPtr = shared_from_this(), msgPtr]() { - thisPtr->sendInLoop(msgPtr->peek(), msgPtr->readableBytes()); + thisPtr->sendInLoop(std::move(*msgPtr)); }); } } @@ -465,11 +519,12 @@ void TcpConnectionImpl::send(const char *msg, size_t len) } else { - auto buffer = std::make_shared(msg, len); - loop_->queueInLoop( - [thisPtr = shared_from_this(), buffer = std::move(buffer)]() { - thisPtr->sendInLoop(buffer->data(), buffer->length()); - }); + trantor::MsgBuffer buffer(len); + buffer.append(msg, len); + loop_->queueInLoop([thisPtr = shared_from_this(), + buffer = std::move(buffer)]() mutable { + thisPtr->sendInLoop(std::move(buffer)); + }); } } void TcpConnectionImpl::send(const void *msg, size_t len) @@ -484,12 +539,12 @@ void TcpConnectionImpl::send(const void *msg, size_t len) } else { - auto buffer = - std::make_shared(static_cast(msg), len); - loop_->queueInLoop( - [thisPtr = shared_from_this(), buffer = std::move(buffer)]() { - thisPtr->sendInLoop(buffer->data(), buffer->length()); - }); + trantor::MsgBuffer buffer(len); + buffer.append(static_cast(msg), len); + loop_->queueInLoop([thisPtr = shared_from_this(), + buffer = std::move(buffer)]() mutable { + thisPtr->sendInLoop(std::move(buffer)); + }); } } void TcpConnectionImpl::send(const std::string &msg) @@ -528,9 +583,10 @@ void TcpConnectionImpl::send(const MsgBuffer &buffer) } else { - loop_->queueInLoop([thisPtr = shared_from_this(), buffer]() { - thisPtr->sendInLoop(buffer.peek(), buffer.readableBytes()); - }); + loop_->queueInLoop( + [thisPtr = shared_from_this(), buf = buffer]() mutable { + thisPtr->sendInLoop(std::move(buf)); + }); } } @@ -538,14 +594,14 @@ void TcpConnectionImpl::send(MsgBuffer &&buffer) { if (loop_->isInLoopThread()) { - sendInLoop(buffer.peek(), buffer.readableBytes()); + sendInLoop(std::move(buffer)); } else { - loop_->queueInLoop( - [thisPtr = shared_from_this(), buffer = std::move(buffer)]() { - thisPtr->sendInLoop(buffer.peek(), buffer.readableBytes()); - }); + loop_->queueInLoop([thisPtr = shared_from_this(), + buffer = std::move(buffer)]() mutable { + thisPtr->sendInLoop(std::move(buffer)); + }); } } void TcpConnectionImpl::sendFile(const char *fileName, diff --git a/trantor/net/inner/TcpConnectionImpl.h b/trantor/net/inner/TcpConnectionImpl.h index f19729a0..0afe8d0f 100644 --- a/trantor/net/inner/TcpConnectionImpl.h +++ b/trantor/net/inner/TcpConnectionImpl.h @@ -245,6 +245,7 @@ class TcpConnectionImpl : public TcpConnection, ssize_t writeInLoop(const void *buffer, size_t length); #else void sendInLoop(const char *buffer, size_t length); + void sendInLoop(trantor::MsgBuffer &&buffer); // -1: error, 0: EAGAIN, >0: bytes sent ssize_t writeRaw(const char *buffer, size_t length); // -1: error, 0: EAGAIN, >0: bytes sent From f0af75e97bce1bbf83deba7ed1b168afeca21ec2 Mon Sep 17 00:00:00 2001 From: an-tao Date: Mon, 8 Jan 2024 14:46:25 +0800 Subject: [PATCH 2/2] sendInLoop --- trantor/net/inner/TcpConnectionImpl.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trantor/net/inner/TcpConnectionImpl.h b/trantor/net/inner/TcpConnectionImpl.h index 0afe8d0f..ddaaa99d 100644 --- a/trantor/net/inner/TcpConnectionImpl.h +++ b/trantor/net/inner/TcpConnectionImpl.h @@ -245,12 +245,12 @@ class TcpConnectionImpl : public TcpConnection, ssize_t writeInLoop(const void *buffer, size_t length); #else void sendInLoop(const char *buffer, size_t length); - void sendInLoop(trantor::MsgBuffer &&buffer); // -1: error, 0: EAGAIN, >0: bytes sent ssize_t writeRaw(const char *buffer, size_t length); // -1: error, 0: EAGAIN, >0: bytes sent ssize_t writeInLoop(const char *buffer, size_t length); #endif + void sendInLoop(trantor::MsgBuffer &&buffer); size_t highWaterMarkLen_{0}; std::string name_;