提升asio async_write:如何不交错async_write调用?

这是我的实现:

  • 客户端A向客户端B发送消息
  • 服务器通过async_read处理消息来async_read正确的数据量,并等待来自客户端A的新数据(按顺序不阻塞客户端A)
  • 之后,服务器将处理这些信息(可能做一个mysql查询),然后用async_write发送消息给客户端B.

问题是,如果客户端A发送消息的速度非常快,那么在调用前一个async_write处理程序之前, async_writes会交错。

有一个简单的方法来避免这个问题?

编辑1:如果一个客户端C发送一个消息到客户端B刚刚客户端后,同样的问题应该出现…

编辑2:这将工作? 因为它似乎阻止,我不知道在哪里…

  namespace structure { class User { public: User(boost::asio::io_service& io_service, boost::asio::ssl::context& context) : m_socket(io_service, context), m_strand(io_service), is_writing(false) {} ssl_socket& getSocket() { return m_socket; } boost::asio::strand getStrand() { return m_strand; } void push(std::string str) { m_strand.post(boost::bind(&structure::User::strand_push, this, str)); } void strand_push(std::string str) { std::cout << "pushing: " << boost::this_thread::get_id() << std::endl; m_queue.push(str); if (!is_writing) { write(); std::cout << "going to write" << std::endl; } std::cout << "Already writing" << std::endl; } void write() { std::cout << "writing" << std::endl; is_writing = true; std::string str = m_queue.front(); boost::asio::async_write(m_socket, boost::asio::buffer(str.c_str(), str.size()), boost::bind(&structure::User::sent, this) ); } void sent() { std::cout << "sent" << std::endl; m_queue.pop(); if (!m_queue.empty()) { write(); return; } else is_writing = false; std::cout << "done sent" << std::endl; } private: ssl_socket m_socket; boost::asio::strand m_strand; std::queue<std::string> m_queue; bool is_writing; }; } #endif 

有一个简单的方法来避免这个问题?

是的,为每个客户维护一个传出队列。 检查async_write完成处理程序中的队列大小,如果非零,则启动另一个async_write操作。 这里是一个例子

 #include <boost/asio.hpp> #include <boost/bind.hpp> #include <deque> #include <iostream> #include <string> class Connection { public: Connection( boost::asio::io_service& io_service ) : _io_service( io_service ), _strand( _io_service ), _socket( _io_service ), _outbox() { } void write( const std::string& message ) { _strand.post( boost::bind( &Connection::writeImpl, this, message ) ); } private: void writeImpl( const std::string& message ) { _outbox.push_back( message ); if ( _outbox.size() > 1 ) { // outstanding async_write return; } this->write(); } void write() { const std::string& message = _outbox[0]; boost::asio::async_write( _socket, boost::asio::buffer( message.c_str(), message.size() ), _strand.wrap( boost::bind( &Connection::writeHandler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred ) ) ); } void writeHandler( const boost::system::error_code& error, const size_t bytesTransferred ) { _outbox.pop_front(); if ( error ) { std::cerr << "could not write: " << boost::system::system_error(error).what() << std::endl; return; } if ( !_outbox.empty() ) { // more messages to send this->write(); } } private: typedef std::deque<std::string> Outbox; private: boost::asio::io_service& _io_service; boost::asio::io_service::strand _strand; boost::asio::ip::tcp::socket _socket; Outbox _outbox; }; int main() { boost::asio::io_service io_service; Connection foo( io_service ); } 

一些关键点

  • boost::asio::io_service::strand保护对Connection::_outbox访问
  • 一个处理程序从Connection::write()派发,因为它是公共的

如果你在问题的例子中使用了类似的做法,那么对所有的方法都是公开的,这并不明显。