2011-10-13 9 views
20

Aquí está mi aplicación:impulso asio async_write: cómo no entrelazado llamadas async_write?

  • cliente A envía un mensaje al Cliente B
  • proceso de servidor el mensaje por async_read la cantidad correcta de los datos y esperará a que los nuevos datos del cliente A (a fin de no bloquear el cliente a)
  • después servidor procesará la información (probablemente hacer una consulta MySQL ) y luego enviar el mensaje al cliente B con async_write.

El problema es que, si el cliente A envía el mensaje muy rápido, async_writes intercalará antes de que el controlador de async_write anterior se llama.

¿Hay una manera sencilla de evitar este problema?

EDIT 1: Si un cliente C envía un mensaje al Cliente B justo después el cliente A, debería aparecer el mismo problema ...

EDIT 2: Esto funcionaría? porque parece bloquear, no sé dónde ...

namespace structure {                
    class User {                  
    public:                   
    User(boost::asio::io_service& io_service, boost::asio::ssl::context& context) : 
     m_socket(io_service, context), m_strand(io_service), is_writing(false) {}  

    ssl_socket& getSocket() {              
     return m_socket;                
    }                    

    boost::asio::strand getStrand() {            
     return m_strand;                
    }                    

    void push(std::string str) {             
     m_strand.post(boost::bind(&structure::User::strand_push, this, str));   
    }                    

    void strand_push(std::string str) {            

     std::cout << "pushing: " << boost::this_thread::get_id() << std::endl;  
     m_queue.push(str);               
     if (!is_writing) {               
     write();                 
     std::cout << "going to write" << std::endl;         
     }                    
     std::cout << "Already writing" << std::endl;         
    }                    

    void write() {                 
     std::cout << "writing" << std::endl;           
     is_writing = true;               
     std::string str = m_queue.front();           
     boost::asio::async_write(m_socket,           
           boost::asio::buffer(str.c_str(), str.size()),  
           boost::bind(&structure::User::sent, this)   
           );             
    }                    

    void sent() {                 
     std::cout << "sent" << std::endl;            
     m_queue.pop();                
     if (!m_queue.empty()) {              
     write();                 
     return;                  
     }                    
     else                   
     is_writing = false;               
     std::cout << "done sent" << std::endl;          
    }           

    private:          
    ssl_socket   m_socket;    
    boost::asio::strand m_strand;    
    std::queue<std::string>  m_queue;  
    bool      is_writing;  
    };           
}            

#endif 
+0

Tenga en cuenta que asíncrono de escritura es mucho menos valioso que asíncrono leer. La mayoría de las escrituras son prácticamente instantáneas, ya que el SO almacenará los datos de forma local. Por otro lado, las lecturas pueden bloquear la espera del lado remoto y no hay nada que se pueda hacer localmente. La escritura síncrona es, por lo tanto, una forma viable de implementar la secuencia. Esto también resuelve el problema de la propiedad de los datos: el código anterior es incorrecto ya que 'str' se destruye cuando' write() 'regresa, lo que puede suceder antes de que 'boost :: asio_async_write()' acceda al búfer. – MSalters

Respuesta

37

¿Existe una forma sencilla de evitar este problema?

Sí, mantenga una cola saliente para cada cliente. Inspeccionar el tamaño de la cola en el controlador async_write finalización, si no es cero, comenzar otra operación async_write. Este es un ejemplo

#include <boost/asio.hpp> 
#include <boost/bind.hpp> 

#include <deque> 
#include <iostream> 
#include <string> 

class Connection 
{ 
public: 
    Connection(
      boost::asio::io_service& io_service 
      ) : 
     _io_service(io_service), 
     _strand(_io_service), 
     _socket(_io_service), 
     _outbox() 
    { 

    } 

    void write( 
      const std::string& message 
      ) 
    { 
     _strand.post(
       boost::bind(
        &Connection::writeImpl, 
        this, 
        message 
        ) 
       ); 
    } 

private: 
    void writeImpl(
      const std::string& message 
      ) 
    { 
     _outbox.push_back(message); 
     if (_outbox.size() > 1) { 
      // outstanding async_write 
      return; 
     } 

     this->write(); 
    } 

    void write() 
    { 
     const std::string& message = _outbox[0]; 
     boost::asio::async_write(
       _socket, 
       boost::asio::buffer(message.c_str(), message.size()), 
       _strand.wrap(
        boost::bind(
         &Connection::writeHandler, 
         this, 
         boost::asio::placeholders::error, 
         boost::asio::placeholders::bytes_transferred 
         ) 
        ) 
       ); 
    } 

    void writeHandler(
      const boost::system::error_code& error, 
      const size_t bytesTransferred 
      ) 
    { 
     _outbox.pop_front(); 

     if (error) { 
      std::cerr << "could not write: " << boost::system::system_error(error).what() << std::endl; 
      return; 
     } 

     if (!_outbox.empty()) { 
      // more messages to send 
      this->write(); 
     } 
    } 


private: 
    typedef std::deque<std::string> Outbox; 

private: 
    boost::asio::io_service& _io_service; 
    boost::asio::io_service::strand _strand; 
    boost::asio::ip::tcp::socket _socket; 
    Outbox _outbox; 
}; 

int 
main() 
{ 
    boost::asio::io_service io_service; 
    Connection foo(io_service); 
} 

algunos puntos clave

  • la boost::asio::io_service::strand protege el acceso a Connection::_outbox
  • un controlador se envía desde Connection::write() ya que es pública

no era obvio para mí si usaba prácticas similares en el ejemplo de su pregunta, ya que todos los métodos son públicos.

+0

He intentado esta solución, la cosa es que tengo un único io_servicio con múltiples hilos ejecutando run(), incluso con el uso de strand.post para insertar datos en la cola, parece segfault porque se llama desde 2 hilos diferentes ... alguna idea por que? – TheSquad

+0

@TheSquad que me parece una pregunta aparte. Probablemente haya implementado su lógica de forma incorrecta, puede ser fácil de hacer con hebras y múltiples hilos. El uso de una cola es una solución apropiada para su pregunta original. –

+0

¿Qué utilizarías para saber cuándo debes extraer los datos de la cola? – TheSquad

4

Sólo tratando de mejorar la gran respuesta de Sam. Los puntos de mejora son:

  • async_write se esfuerza para enviar cada byte de la memoria intermedia (s) antes de completar, lo que significa que debe suministrar todos los datos de entrada que tiene a la operación de escritura, de lo contrario, la sobrecarga de encuadre puede aumentar debido a que los paquetes TCP son más pequeños de lo que podrían haber sido.

  • asio::streambuf, mientras que es muy cómodo de usar, no es copia cero.El siguiente ejemplo muestra un copia cero enfoque: mantenga los fragmentos de datos de entrada donde están y use una sobrecarga de dispersión/recopilación de async_write que tome una secuencia de búferes de entrada (que son solo indicadores de los datos de entrada reales) .

código fuente completo:

#include <boost/asio.hpp> 
#include <iostream> 
#include <memory> 
#include <mutex> 
#include <string> 
#include <thread> 
#include <unordered_set> 
#include <vector> 

using namespace std::chrono_literals; 
using boost::asio::ip::tcp; 

class Server 
{ 
    class Connection : public std::enable_shared_from_this<Connection> 
    { 
    friend class Server; 
    void ProcessCommand(const std::string& cmd) { 
     if (cmd == "stop") { 
     server_.Stop(); 
     return; 
     } 
     if (cmd == "") { 
     Close(); 
     return; 
     } 
     std::thread t([this, self = shared_from_this(), cmd] { 
     for (int i = 0; i < 30; ++i) { 
      Write("Hello, " + cmd + " " + std::to_string(i) + "\r\n"); 
     } 
     server_.io_service_.post([this, self] { 
      DoReadCmd(); 
     }); 
     }); 
     t.detach(); 
    } 

    void DoReadCmd() { 
     read_timer_.expires_from_now(server_.read_timeout_); 
     read_timer_.async_wait([this](boost::system::error_code ec) { 
     if (!ec) { 
      std::cout << "Read timeout\n"; 
      Shutdown(); 
     } 
     }); 
     boost::asio::async_read_until(socket_, buf_in_, '\n', [this, self = shared_from_this()](boost::system::error_code ec, std::size_t bytes_read) { 
     read_timer_.cancel(); 
     if (!ec) { 
      const char* p = boost::asio::buffer_cast<const char*>(buf_in_.data()); 
      std::string cmd(p, bytes_read - (bytes_read > 1 && p[bytes_read - 2] == '\r' ? 2 : 1)); 
      buf_in_.consume(bytes_read); 
      ProcessCommand(cmd); 
     } 
     else { 
      Close(); 
     } 
     }); 
    } 

    void DoWrite() { 
     active_buffer_ ^= 1; // switch buffers 
     for (const auto& data : buffers_[active_buffer_]) { 
     buffer_seq_.push_back(boost::asio::buffer(data)); 
     } 
     write_timer_.expires_from_now(server_.write_timeout_); 
     write_timer_.async_wait([this](boost::system::error_code ec) { 
     if (!ec) { 
      std::cout << "Write timeout\n"; 
      Shutdown(); 
     } 
     }); 
     boost::asio::async_write(socket_, buffer_seq_, [this, self = shared_from_this()](const boost::system::error_code& ec, size_t bytes_transferred) { 
     write_timer_.cancel(); 
     std::lock_guard<std::mutex> lock(buffers_mtx_); 
     buffers_[active_buffer_].clear(); 
     buffer_seq_.clear(); 
     if (!ec) { 
      std::cout << "Wrote " << bytes_transferred << " bytes\n"; 
      if (!buffers_[active_buffer_^1].empty()) // have more work 
      DoWrite(); 
     } 
     else { 
      Close(); 
     } 
     }); 
    } 
    bool Writing() const { return !buffer_seq_.empty(); } 

    Server& server_; 
    boost::asio::streambuf buf_in_; 
    std::mutex buffers_mtx_; 
    std::vector<std::string> buffers_[2]; // a double buffer 
    std::vector<boost::asio::const_buffer> buffer_seq_; 
    int active_buffer_ = 0; 
    bool closing_ = false; 
    bool closed_ = false; 
    boost::asio::deadline_timer read_timer_, write_timer_; 
    tcp::socket socket_; 
    public: 
    Connection(Server& server) : server_(server), read_timer_(server.io_service_), write_timer_(server.io_service_), socket_(server.io_service_) { 
    } 

    void Start() { 
     socket_.set_option(tcp::no_delay(true)); 
     DoReadCmd(); 
    } 

    void Close() { 
     closing_ = true; 
     if (!Writing()) 
     Shutdown(); 
    } 

    void Shutdown() { 
     if (!closed_) { 
     closing_ = closed_ = true; 
     boost::system::error_code ec; 
     socket_.shutdown(tcp::socket::shutdown_both, ec); 
     socket_.close(); 
     server_.active_connections_.erase(shared_from_this()); 
     } 
    } 

    void Write(std::string&& data) { 
     std::lock_guard<std::mutex> lock(buffers_mtx_); 
     buffers_[active_buffer_^1].push_back(std::move(data)); // move input data to the inactive buffer 
     if (!Writing()) 
     DoWrite(); 
    } 

    }; 

    void DoAccept() { 
    if (acceptor_.is_open()) { 
     auto session = std::make_shared<Connection>(*this); 
     acceptor_.async_accept(session->socket_, [this, session](boost::system::error_code ec) { 
     if (!ec) { 
      active_connections_.insert(session); 
      session->Start(); 
     } 
     DoAccept(); 
     }); 
    } 
    } 

    boost::asio::io_service io_service_; 
    tcp::acceptor acceptor_; 
    std::unordered_set<std::shared_ptr<Connection>> active_connections_; 
    const boost::posix_time::time_duration read_timeout_ = boost::posix_time::seconds(30); 
    const boost::posix_time::time_duration write_timeout_ = boost::posix_time::seconds(30); 

public: 
    Server(int port) : acceptor_(io_service_, tcp::endpoint(tcp::v6(), port), false) { } 

    void Run() { 
    std::cout << "Listening on " << acceptor_.local_endpoint() << "\n"; 
    DoAccept(); 
    io_service_.run(); 
    } 

    void Stop() { 
    acceptor_.close(); 
    { 
     std::vector<std::shared_ptr<Connection>> sessionsToClose; 
     copy(active_connections_.begin(), active_connections_.end(), back_inserter(sessionsToClose)); 
     for (auto& s : sessionsToClose) 
     s->Shutdown(); 
    } 
    active_connections_.clear(); 
    io_service_.stop(); 
    } 

}; 

int main() { 
    try { 
    Server srv(8888); 
    srv.Run(); 
    } 
    catch (const std::exception& e) { 
    std::cerr << "Error: " << e.what() << "\n"; 
    } 
} 
Cuestiones relacionadas