2011-10-27 12 views
5

Estoy usando un ejemplo de cola de bloqueo que obtuve de este sitio web, pensando que era bastante agradable. Esta cola de bloqueo usa boost :: mutex. Se está lanzando en algún momento una excepción:boost lanzamiento de exclusión mutua (impar?) Excepción

terminate called after throwing an instance of 'boost::exception_detail::clone_impl<boost::exception_detail::error_info_injector<boost::system::system_error> >' 

lo que(): descriptor de archivo incorrecto

es el código de bloqueo de cola aquí:

#include <boost/thread/mutex.hpp> 
#include <boost/thread/thread.hpp> 
#include <boost/thread/condition_variable.hpp> 
#include <exception> 
#include <list> 
#include <stdio.h> 

struct BlockingQueueTerminate 
    : std::exception 
{}; 

namespace tools { 
    template<class T> 
    class BlockingQueue 
    { 
    private: 
    boost::mutex mtx_; 
    boost::condition_variable cnd_; 
    std::list<T> q_; 
    unsigned blocked_; 
    bool stop_; 

    public: 
    BlockingQueue() 
     : blocked_() 
     , stop_() 
    {} 

    ~BlockingQueue() 
    { 
     this->stop(true); 
    } 

    void stop(bool wait) 
    { 
     // tell threads blocked on BlockingQueue::pull() to leave 
     boost::mutex::scoped_lock lock(mtx_); 
     stop_ = true; 
     cnd_.notify_all(); 

     if(wait) // wait till all threads blocked on the queue leave BlockingQueue::pull() 
    while(blocked_) 
     cnd_.wait(lock); 
    } 

    void put(T t) 
    { 
     boost::mutex::scoped_lock lock(mtx_); // The exception is thrown here ! 
     q_.push_back(t); 
     cnd_.notify_one(); 
    } 

    T pull() 
    { 
     boost::mutex::scoped_lock lock(mtx_); 
     ++blocked_; 
     while(!stop_ && q_.empty()) 
    cnd_.wait(lock); 
     --blocked_; 

     if(stop_) { 
    cnd_.notify_all(); // tell stop() this thread has left 
    throw BlockingQueueTerminate(); 
     } 

     T front = q_.front(); 
     q_.pop_front(); 
     return front; 
    } 
    }; 
} 

Cualquiera puede detectar lo que va mal aquí? porque lo intenté todo el día, averigüé en vano. Creo que necesito un ojo externo para verlo. Busque el comentario '// ¡La excepción se arroja aquí!' para ver dónde exactamente ocurre el problema.

EDIT 1:

El contexto: Estoy usando esta cola de bloqueo con el fin de crear un contenedor asíncrono MySQL.

Aquí es mi MySQL.hh

#ifndef MYSQL_HH_ 
# define MYSQL_HH_ 
# include <boost/asio.hpp> 
# include <boost/thread.hpp> 
# include <boost/function.hpp> 
# include <mysql++/mysql++.h> 
# include <queue> 
# include "async_executor.hh" 
# include "BlockingQueue.hh" 

class t_mysql_event { 
public: 
    t_mysql_event(std::string query, boost::function<void(mysqlpp::StoreQueryResult)> cb) : 
    m_query(query), m_store_cb(cb), m_store_bool(true) {} 

    t_mysql_event(std::string query, boost::function<void()> cb) : 
    m_query(query), m_exec_cb(cb), m_store_bool(false) {} 

    bool is_store_query() { 
    return m_store_bool; 
    } 

    std::string toString() { 
    return m_query; 
    } 

    std::string      m_query; 
    boost::function<void(mysqlpp::StoreQueryResult)> m_store_cb; 
    boost::function<void()>    m_exec_cb; 

private: 
    bool       m_store_bool; 
}; 

namespace pools { 
    class MySQL { 
    public: 
    ~MySQL() {} 

    static MySQL* create_instance(boost::asio::io_service& io); 

    static MySQL* get_instance(); 

    void exec(std::string query, boost::function<void()> cb); 
    void store(std::string query, boost::function<void(mysqlpp::StoreQueryResult)> cb); 

    private: 
    MySQL(boost::asio::io_service& io) : executor(io, 100), parent_io(io), m_strand(io) 
    { 
     for (int i=0; i < 100; ++i) { 
    boost::thread(boost::bind(&MySQL::retreive, this)); 
     } 
    } 

    void async_exec(std::string query, boost::function<void()> cb, mysqlpp::Connection& conn); 
    void async_store(std::string query, boost::function<void(mysqlpp::StoreQueryResult)> cb, mysqlpp::Connection& conn); 

    void retreive(); 

    private: 
    tools::async_executor   executor; 
    boost::asio::io_service&  parent_io; 
    boost::asio::strand   m_strand; 
    tools::BlockingQueue<t_mysql_event*> m_events; 
    std::queue<mysqlpp::Connection*> m_stack; 
    }; 
} 

#endif //MYSQL_HH_ 

Aquí está la MySQL.cc:

#include "MySQL.hh" 

static pools::MySQL* _instance = 0; 

namespace pools { 


    MySQL* MySQL::create_instance(boost::asio::io_service& io) { 
    if (!_instance) 
     _instance = new MySQL(io); 
    return _instance; 
    } 

    MySQL* MySQL::get_instance() { 
    if (!_instance) { 
     exit(1); 
    } 
    return _instance; 
    } 

    void MySQL::exec(std::string query, boost::function<void()> cb) { 
    m_events.put(new t_mysql_event(query, cb)); 
    } 

    void MySQL::store(std::string query, boost::function<void(mysqlpp::StoreQueryResult)> cb) { 
    m_events.put(new t_mysql_event(query, cb)); 
    } 

    void MySQL::retreive() { 
    mysqlpp::Connection conn("***", "***", "***", "***"); 
    for(;;) { 
     t_mysql_event *event = m_events.pull(); 
     if (event->is_store_query()) 
    async_store(event->m_query, event->m_store_cb, conn); 
     else 
    async_exec(event->m_query, event->m_exec_cb, conn); 
     delete event; 
    } 
    } 

    void MySQL::async_exec(std::string query, boost::function<void()> cb, mysqlpp::Connection& conn) { 
    mysqlpp::Query db_q = conn.query(query.c_str()); 
    db_q.exec(); 
    parent_io.post(cb); 
    } 

    void MySQL::async_store(std::string query, boost::function<void(mysqlpp::StoreQueryResult)> cb, mysqlpp::Connection& conn) { 
    mysqlpp::Query db_q = conn.query(query.c_str()); 
    mysqlpp::StoreQueryResult res = db_q.store(); 
    parent_io.post(boost::bind(cb, res)); 
    } 
} 

Después:

class MyClass { 
public: 
    MyClass() : _mysql(pools::MySQL::get_instance()) {} 

    startQueries(); 
private: 
    void Query1() { 
     std::stringstream query(""); 
     query << "INSERT INTO Table1 ***"; 
     _mysql->exec(query.str(), 
        boost::bind(&MyClass::Query2, this, _1)); 
    } 
    void Query2() { 
     std::stringstream query(""); 
     query << "INSERT INTO Table2 ***"; 
     _mysql->exec(query.str(), 
        boost::bind(&MyClass::Query3, this, _1)); 
    } 
    void Query3() { 
     std::stringstream query(""); 
     query << "INSERT INTO Table3 ***"; 
     _mysql->exec(query.str(), 
        boost::bind(&MyClass::done, this, _1)); 
    } 
    void done() {} 
    pools::MySQL *_mysql; 
}; 

Con la esperanza de que dará respuesta a alguna solicitud de más información ..

Cosa graciosa:

Si reemplazo cada _mysql por pools :: MySQL :: get_instance() I no parece bloquearse. Pero sospecho que hay un error mucho más importante que a continuación ...

+0

¿Estás seguro de que es este código y no el que no publicaste? –

+0

Sí, estoy bastante seguro de que la excepción es arrojar desde este punto exactamente. He usado gdb y un montón de std :: cout para estar seguro antes de preguntar. – TheSquad

+0

Eso es raro, porque no creo que los constructores puedan lanzar una excepción: http://www.boost.org/doc/libs/1_37_0/doc/html/boost/interprocess/scoped_lock.html#id2914282-bb –

Respuesta

0

esta excepción se produce si la cola ya está destruido, sino que intenta llamar a su método put. Compruebe esto colocando un punto de interrupción (o una declaración de impresión) en el destructor de cola.

+0

No se destruye ya que es un objeto como atributo en una clase singleton. Es cierto que no sabías eso. Agregué mucha más información en mi publicación. – TheSquad

+0

@Andy T: ¿Realmente arrojaría una excepción? Supongo que preferiría segfault. – Atmocreations

+0

@Atmocreations Si el código :: mutex de boost :: detecta un estado no válido, puede lanzar una excepción. La dirección del objeto aún puede ser direcciones válidas mapeadas al proceso incluso después de que el objeto ha sido eliminado. Si este es el caso, no ocurrirá seg-fault. – selalerer

Cuestiones relacionadas