2011-02-09 4 views
6

Tengo bastante experiencia en el uso de los métodos básicos de comunicación y grupo MPI2, y hago un poco de trabajo de simulación embarazosamente paralelo usando MPI. Hasta ahora, he estructurado mi código para tener un nodo de despacho y un grupo de nodos de trabajadores. El nodo de envío tiene una lista de archivos de parámetros que se ejecutarán con el simulador. Semilla cada nodo de trabajador con un archivo de parámetros. Los nodos de trabajadores ejecutan su simulación y luego solicitan otro archivo de parámetros, que proporciona el nodo de despacho. Una vez que se han ejecutado todos los archivos de parámetros, el nodo de envío apaga cada nodo de trabajo, antes de apagarse.Crear un contador que permanezca sincronizado a través de procesos MPI

Los archivos de parámetros se suelen llamar "Par_N.txt", donde N es el número entero de identificación (por ejemplo, N = 1-1000). Así que estaba pensando, si pudiera crear un contador, y podría tener este contador sincronizado en todos mis nodos, podría eliminar la necesidad de tener un nodo de envío y hacer que el sistema sea un poco más simple. Tan simple como esto suena en teoría, en la práctica sospecho que es un poco más difícil, ya que necesitaría asegurar que el contador esté bloqueado mientras se cambia, etc. Y pensé que podría haber una forma incorporada para que MPI maneja esto. ¿Alguna idea? ¿Estoy más pensando en esto?

+0

¿Podría explicarnos qué beneficios espera obtener al eliminar al despachador? – NPE

+0

@ aix- Claro. En algunas de nuestras ejecuciones más grandes, he notado que el nodo de envío se satura con la comunicación (por ejemplo, una ejecución con np = 10k nodos). Para superar esto, he comenzado a permitir múltiples nodos de despacho, donde cada nodo de despacho toma un subgrupo. Sin embargo, esto conduce a un código más complejo (es decir, más difícil de mantener). Por lo tanto, se trata principalmente de tratar de simplificar las cosas (si es algo que podría hacerse simplemente). – MarkD

+0

Además, en ejecuciones más pequeñas (digamos 5-10 nodos) que se realizan con más frecuencia, sería bueno no entregar un nodo completo para que sea un nodo de despacho. Nuestro sys-admin está muy en contra de sobrecargar los núcleos, y ha configurado el programador de tareas para que no permita trabajos en los que el número de procesos sea> la cantidad de núcleos solicitados. – MarkD

Respuesta

10

La implementación de un contador compartido no es trivial, pero una vez que lo haces y lo tienes en una biblioteca en algún lugar, puedes hacer un lote con él.

En el libro Using MPI-2, que debe tener a mano si va a implementar esto, uno de los ejemplos (el código es available online) es un contador compartido. El "no escalable" debería funcionar bien en varias docenas de procesos: el contador es una matriz de 0..size-1 de enteros, uno por rango, y luego la operación `obtener siguiente artículo de trabajo 'consiste en bloquear la ventana, leer la contribución de todos los demás al mostrador (en este caso, cuántos elementos han tomado), actualizar la suya (++), cerrar la ventana y calcular el total. Todo esto se hace con operaciones pasivas de un solo lado. (El de mayor escala simplemente usa un árbol en lugar de un conjunto de 1 día).

Así que el uso sería decir rango 0 host el contador, y todos siguen haciendo unidades de trabajo y actualizando el contador para obtener el siguiente hasta que no haya más trabajo; luego esperas en una barrera o algo y finalizas.

Una vez que tenga algo como esto - utilizando un valor compartido para obtener la siguiente unidad de trabajo disponible - trabajando, entonces puede generalizar a un enfoque más sofisticado. Así que, como sugirió suzterpatt, todos los que toman "su parte" de las unidades de trabajo al principio funcionan de maravilla, pero ¿qué hacer si algunos finalizan más rápido que otros? La respuesta habitual ahora es el robo de trabajo; todos guardan su lista de unidades de trabajo en una cola, y luego, cuando uno se queda sin trabajo, roba unidades de trabajo del otro lado de alguien más dequeue, hasta que no queda más trabajo.Esta es realmente la versión completamente distribuida del maestro trabajador, donde no hay más trabajo de partición maestro único. Una vez que haya funcionado un solo contador compartido, puede crear mutex a partir de ellos, y desde allí puede implementar el dequeue. Pero si el simple contador compartido funciona lo suficientemente bien, es posible que no necesites ir allí.

Actualización: Bien, aquí hay un hacky intento de hacer el contador compartido - mi versión del sencillo en el libro MPI-2: parece funcionar, pero no diría nada mucho más fuerte que eso (No he jugado con esto por mucho tiempo). Hay una implementación de contador simple (correspondiente a la versión sin escala en el libro MPI-2) con dos pruebas simples, una correspondiente aproximadamente a su caso de trabajo; cada elemento actualiza el contador para obtener un elemento de trabajo, luego hace el "trabajo" (duerme durante un período de tiempo aleatorio). Al final de cada prueba, se imprime la estructura de datos del contador, que es el número de incrementos que cada rango ha hecho.

#include <mpi.h> 
#include <stdlib.h> 
#include <stdio.h> 
#include <unistd.h> 

struct mpi_counter_t { 
    MPI_Win win; 
    int hostrank ; 
    int myval; 
    int *data; 
    int rank, size; 
}; 

struct mpi_counter_t *create_counter(int hostrank) { 
    struct mpi_counter_t *count; 

    count = (struct mpi_counter_t *)malloc(sizeof(struct mpi_counter_t)); 
    count->hostrank = hostrank; 
    MPI_Comm_rank(MPI_COMM_WORLD, &(count->rank)); 
    MPI_Comm_size(MPI_COMM_WORLD, &(count->size)); 

    if (count->rank == hostrank) { 
     MPI_Alloc_mem(count->size * sizeof(int), MPI_INFO_NULL, &(count->data)); 
     for (int i=0; i<count->size; i++) count->data[i] = 0; 
     MPI_Win_create(count->data, count->size * sizeof(int), sizeof(int), 
         MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win)); 
    } else { 
     count->data = NULL; 
     MPI_Win_create(count->data, 0, 1, 
         MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win)); 
    } 
    count -> myval = 0; 

    return count; 
} 

int increment_counter(struct mpi_counter_t *count, int increment) { 
    int *vals = (int *)malloc(count->size * sizeof(int)); 
    int val; 

    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, count->hostrank, 0, count->win); 

    for (int i=0; i<count->size; i++) { 

     if (i == count->rank) { 
      MPI_Accumulate(&increment, 1, MPI_INT, 0, i, 1, MPI_INT, MPI_SUM, 
          count->win); 
     } else { 
      MPI_Get(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win); 
     } 
    } 

    MPI_Win_unlock(0, count->win); 
    count->myval += increment; 

    vals[count->rank] = count->myval; 
    val = 0; 
    for (int i=0; i<count->size; i++) 
     val += vals[i]; 

    free(vals); 
    return val; 
} 

void delete_counter(struct mpi_counter_t **count) { 
    if ((*count)->rank == (*count)->hostrank) { 
     MPI_Free_mem((*count)->data); 
    } 
    MPI_Win_free(&((*count)->win)); 
    free((*count)); 
    *count = NULL; 

    return; 
} 

void print_counter(struct mpi_counter_t *count) { 
    if (count->rank == count->hostrank) { 
     for (int i=0; i<count->size; i++) { 
      printf("%2d ", count->data[i]); 
     } 
     puts(""); 
    } 
} 

int test1() { 
    struct mpi_counter_t *c; 
    int rank; 
    int result; 

    c = create_counter(0); 

    MPI_Comm_rank(MPI_COMM_WORLD, &rank); 
    result = increment_counter(c, 1); 
    printf("%d got counter %d\n", rank, result); 

    MPI_Barrier(MPI_COMM_WORLD); 
    print_counter(c); 
    delete_counter(&c); 
} 


int test2() { 
    const int WORKITEMS=50; 

    struct mpi_counter_t *c; 
    int rank; 
    int result = 0; 

    c = create_counter(0); 

    MPI_Comm_rank(MPI_COMM_WORLD, &rank); 
    srandom(rank); 

    while (result < WORKITEMS) { 
     result = increment_counter(c, 1); 
     if (result <= WORKITEMS) { 
      printf("%d working on item %d...\n", rank, result); 
      sleep(random() % 10); 
     } else { 
      printf("%d done\n", rank); 
     } 
    } 

    MPI_Barrier(MPI_COMM_WORLD); 
    print_counter(c); 
    delete_counter(&c); 
} 

int main(int argc, char **argv) { 

    MPI_Init(&argc, &argv); 

    test1(); 
    test2(); 

    MPI_Finalize(); 
} 
+1

MPI_Fetch_and_op en MPI-3 simplifica enormemente este código. – Jeff

3

No puedo pensar en ningún mecanismo incorporado para resolver ese problema, tendrías que implementarlo manualmente. A juzgar por sus comentarios, quiere descentralizar el programa, en cuyo caso cada proceso (o al menos grupos de procesos) debería mantener sus propios valores del contador y mantenerlo sincronizado. Esto probablemente podría hacerse con un uso inteligente de envíos/recepciones no bloqueantes, pero la semántica de estos no es trivial.

En su lugar, resolvería el problema de saturación simplemente emitiendo varios archivos a la vez a procesos de trabajo. Esto reduciría el tráfico de la red y le permitirá mantener su configuración simple de despachador único.

+0

@ suszterpatt- He estado pensando un poco sobre lo que mencionas, dado un ID de proceso, y el número total de procesos, puedo tomar fácilmente una "porción" del trabajo que cada proceso debe realizar. Mi preocupación aquí, sin embargo, es que las simulaciones tienen un tiempo de cálculo muy variable (más de 2 órdenes de magnitud dependiendo de las tasas de convergencia), y puedo ver una situación que surge, donde a un solo nodo se le asigna un gran número de procesos a largo plazo. y mi equilibrio de carga se convertiría en un problema. – MarkD

+0

@ MarkD: Teóricamente, eso es ciertamente posible. Sin embargo, parece que está procesando cantidades realmente enormes de datos, por lo que las posibilidades de que esto ocurra no sean tan grandes. Aún así, una solución alternativa podría ser permitir que el despachador "deshaga el parche" de los archivos que su nodo de trabajo aún no ha comenzado a procesar, y enviarlos a un trabajador actualmente inactivo. Todavía considero que este enfoque es más simple que implementar una variable compartida. – suszterpatt

0

Parece que está utilizando su nodo de despacho para realizar el equilibrio dinámico de carga (asignando trabajo a los procesadores cuando estén disponibles). Un contador compartido que no requiera que todos los procesadores detengan no lo hará. Le recomendaría que se quede con lo que tiene ahora o haga lo que sugiere suszterpatt, envíe lotes de archivos a la vez.

0

No está claro si es necesario revisar los archivos en estricto orden o no. De lo contrario, ¿por qué no hacer que cada nodo i maneje todos los archivos donde N % total_workers == i - es decir, la distribución cíclica del trabajo?

Cuestiones relacionadas