2012-05-21 9 views
6

me encuentro con frecuencia escribir código C++ de la forma:forma idiomática para paralelizar la función a través de líneas de archivos en C++

while (getline(strm, line)) { 
    cout << computationally_intensive_function(line) << endl; 
} 

me gustaría paralelizar este código. La mejor solución que he encontrado hasta el momento es la construcción de vector de cadenas para contener un gran número (10.000 hasta 100.000) de las líneas, y luego paralelizar más de este vector con

#pragma omp parallel for 

A continuación, vaciar el vector y repetir mientras las líneas permanecen. Sin embargo, este método requiere mucha memoria y los otros núcleos están inactivos mientras que el proceso principal está almacenando en búfer cadenas. ¿Hay una mejor manera? Algo como Python's multiprocessing.Pool.map o Hadoop? (Me gustaría evitar el uso de la API C++ de Hadoop sin embargo, debido a Hadoop es bastante pesado y no se puede instalar en todas partes mi código estaría funcionando.)

+2

Si tiene Acceda a un compilador de C++ 11, puede usar 'std :: async' o implementar su propio grupo de subprocesos con' std :: packaged_task'. – xDD

+0

Si cada línea es independiente y su archivo está disponible, ¿por qué no cortarlo con anticipación y alimentarlo a varias instancias de este proceso? –

+0

larga historia, necesita potencialmente más de un núcleo de las cadenas de almacenamiento en búfer. es necesario que pueda equilibrar la carga entre los núcleos, de modo que la función de cálculo intensivo se ejecute tanto como sea posible, mientras que los subprocesos que se almacenan en el búfer también se ejecutan a toda velocidad. esta no es una respuesta trivial –

Respuesta

5

Existe que no es bien característica conocida de OpenMP 3.0 tareas, que es bastante desafortunado ya que fueron creados específicamente para cubrir casos como este. Si su compilador es compatible con esa versión estándar, definitivamente debe ir a las tareas de OpenMP. Pero hay que tener en cuenta que la escritura a stdout (o std::cout) desde varios subprocesos en general, se mezcla su producción mal y que lo más probable es que desee sincronizar en él:

#pragma omp parallel 
{ 
    #pragma omp master 
    while (getline(strm, line)) 
    #pragma omp task 
    { 
     result_type result = computationally_intensive_function(line); 
     #pragma omp critical 
     { 
      cout << result << endl; 
      cout.flush(); 
     } 
    } 
    #pragma omp taskwait 
} 

lo dejo a usted para decidir qué variables deben ser shared y lo que debería ser private.

+0

¡Exactamente lo que necesitaba! ¡¡Gracias!! – gilesc

1

Debe superponer sus cálculos con las líneas de lectura del archivo. Una buena forma de hacerlo sería utilizar el algoritmo de canalización de Threading Building Blocks. Lo que hace es especificar tres filtros (en función de lo que se muestra en el ejemplo de pseudocódigo), dos en serie uno y uno paralelo. Los filtros seriales son de entrada y salida. El primero lee datos de un archivo línea por línea y pasa cada línea a un segundo filtro, que es paralelo y ejecuta su función de computación/procesamiento en un modo de subprocesos múltiples. La última etapa/filtro también es serial y tiene salida. Estoy copiar y pegar un ejemplo de TBB Tutorial, que parece estar haciendo exactamente lo que quiere lograr:

// Holds a slice of text. 
/** Instances *must* be allocated/freed using methods herein, because the 
C++ declaration 
represents only the header of a much larger object in memory. */ 
class TextSlice { 
    // Pointer to one past last character in sequence 
    char* logical_end; 
    // Pointer to one past last available byte in sequence. 
    char* physical_end; 
public: 
    // Allocate a TextSlice object that can hold up to max_size characters. 
    static TextSlice* allocate(size_t max_size) { 
     // +1 leaves room for a terminating null character. 
     TextSlice* t = (TextSlice*)tbb::tbb_allocator<char>().allocate(sizeof(TextSlice)+max_size+1); 
     t->logical_end = t->begin(); 
     t->physical_end = t->begin()+max_size; 
     return t; 
    } 
    // Free this TextSlice object 
    void free() { 
     tbb::tbb_allocator<char>().deallocate((char*)this, 
     sizeof(TextSlice)+(physical_end-begin())+1); 
    } 
    // Pointer to beginning of sequence 
    char* begin() {return (char*)(this+1);} 
    // Pointer to one past last character in sequence 
    char* end() {return logical_end;} 
    // Length of sequence 
    size_t size() const {return logical_end-(char*)(this+1);} 
    // Maximum number of characters that can be appended to sequence 
    size_t avail() const {return physical_end-logical_end;} 
    // Append sequence [first,last) to this sequence. 
    void append(char* first, char* last) { 
     memcpy(logical_end, first, last-first); 
     logical_end += last-first; 
    } 
    // Set end() to given value. 
    void set_end(char* p) {logical_end=p;} 
}; 

Y la función de conseguir esto para ejecutar es:

void RunPipeline(int ntoken, FILE* input_file, FILE* output_file) { 
    tbb::parallel_pipeline(
    ntoken, 
    tbb::make_filter<void,TextSlice*>(
    tbb::filter::serial_in_order, MyInputFunc(input_file)) 
    & 
    tbb::make_filter<TextSlice*,TextSlice*>(
    tbb::filter::parallel, MyTransformFunc()) 
    & 
    tbb::make_filter<TextSlice*,void>(
    tbb::filter::serial_in_order, MyOutputFunc(output_file))); 
} 
Cuestiones relacionadas