8

Estoy trabajando en un marco de trabajo simple que es muy similar al descrito en id Tech 5 Challenges. En el nivel más básico, tengo un conjunto de listas de trabajos, y quiero programar estas listas en varios subprocesos de CPU (utilizando un grupo de subprocesos estándar para el despacho real). Sin embargo, me pregunto cómo esta señal/esperar cosas dentro de una lista de espera se puede implementar de manera eficiente. Según lo entiendo, el token de espera bloquea la ejecución de la lista si el token de señal no se ha ejecutado. Esto significa implícitamente que todo antes de una señal tiene que terminar antes de que la señal se pueda elevar. Así que vamos a decir que tenemos una lista como esta:Implementación de una lista de trabajos con sincronización interna

J1, J2, S, J3, W, J4 

continuación, el envío puede ir así:

#1: J1, J2, J3 
<wait for J1, J2, run other lists if possible> 
#2: J4 

Sin embargo, esto no es tan fácil como parece, ya que dado un conjunto de listas , Tendría que mover algunos de ellos entre ready y waiting y también tengo un código especial para reunir todos los trabajos antes de una señal y etiquetar algo en ellos, para que puedan activar la señal si y solo si todos han terminado (es decir, por ejemplo que ya no es posible agregar trabajos a la lista mientras que se ejecuta, como las siguientes señales acceden a los trabajos previamente insertados.)

¿Existe alguna forma "estándar" de implementar esto de manera eficiente? También me pregunto cómo programar mejor la ejecución de la lista de trabajos, en este momento, cada núcleo toma una lista de trabajos y programa todos los trabajos, lo que da una escala bastante buena (para 32k puestos de trabajo a 0.7 ms, obtengo el 101%, lo que supongo se debe en parte al hecho de que la versión de rosca única se está programando en diferentes núcleos algunas veces).

+0

¿Cuántas listas de trabajos tiene en promedio? – fabrizioM

+0

10-40 listas de trabajos, con un total de ~ 300-500 trabajos (algunas listas pueden contener más de 50 trabajos) – Anteru

+0

¿Los pares de señal/sincronización también pueden anidar? –

Respuesta

4

Este es un algoritmo de planificación relativamente directo. Un par de problemas parece complicado al principio, pero realmente no lo son (señal/espera y ubicación del caché). Explicaré las técnicas, luego daré un código que escribí para ilustrar los conceptos, luego daré algunas notas finales sobre la afinación.

Algoritmos de usar

Manejo de la señal/esperar de manera eficiente es que parece difícil al principio, pero en realidad resulta ser extremadamente fácil. Dado que los pares de señal/espera no pueden anidarse ni superponerse, en realidad solo puede haber dos satisfechos y uno en espera en cualquier momento dado. Simplemente manteniendo un puntero "CurrentSignal" a la señal insatisfecha más reciente es todo lo necesario para hacer la contabilidad.

Asegurarse de que los núcleos no saltan entre listas demasiado y que una lista determinada no se comparte entre demasiados núcleos también es relativamente fácil: cada núcleo sigue tomando trabajos de la misma lista hasta que bloquea, luego cambia a otra lista. Para evitar que todos los núcleos se agrupen en una sola lista, se guarda un WorkerCount para cada lista que indica cuántos núcleos lo están usando, y las listas están organizadas para que los núcleos seleccionen listas con menos trabajadores primero.

El bloqueo se puede mantener simple al bloquear solo el programador o la lista en la que está trabajando en cualquier momento, nunca ambos.

Expresó cierta preocupación acerca de agregar trabajos a una lista una vez que la lista ya comenzó a ejecutarse. Resulta que admitir esto es casi trivial: todo lo que necesita es una llamada de la lista al planificador cuando se agrega un trabajo a una lista que está completa actualmente, por lo que el planificador puede programar el nuevo trabajo.

Las estructuras de datos

Aquí están las estructuras de datos básicos que necesita:

class Scheduler 
{ 
    LinkedList<JobList>[] Ready; // Indexed by number of cores working on list 
    LinkedList<JobList> Blocked; 
    int ReadyCount; 
    bool Exit; 

    public: 
    void AddList(JobList* joblist); 
    void DoWork(); 

    internal: 
    void UpdateQueues(JobList* joblist); 

    void NotifyBlockedCores(); 
    void WaitForNotifyBlockedCores(); 
} 

class JobList 
{ 
    Scheduler Scheduler; 
    LinkedList<JobList> CurrentQueue; 

    LinkedList<Job> Jobs;   // All jobs in the job list 
    LinkedList<SignalPoint> Signals; // All signal/wait pairs in the job list, 
             plus a dummy 

    Job* NextJob;     // The next job to schedule, if any 
    int NextJobIndex;    // The index of NextJob 

    SignalPoint* CurrentSignal;  // First signal not fully satisfied 

    int WorkerCount;     // # of cores executing in this list 

    public: 
    void AddJob(Job* job); 
    void AddSignal(); 
    void AddWait(); 

    internal: 
    void Ready { get; } 
    void GetNextReadyJob(Job& job, int& jobIndex); 
    void MarkJobCompleted(Job job, int jobIndex); 
} 
class SignalPoint 
{ 
    int SignalJobIndex = int.MaxValue; 
    int WaitJobIndex = int.MaxValue; 
    int IncompleteCount = 0; 
} 

Tenga en cuenta que los puntos de señal de una lista de tareas dado son más convenientemente almacenados por separado de la lista real de puestos de trabajo .

aplicación Programador

El planificador mantiene un registro de las listas de tareas, los asigna a los núcleos, y ejecuta los trabajos de las listas de trabajos.

AddList agrega un trabajo al planificador. Debe colocarse en la cola Listo o Bloqueado, dependiendo de si tiene algún trabajo pendiente (es decir, si ya se le han agregado trabajos), así que simplemente llame a UpdateQueues.

void Scheduler.AddList(JobList* joblist) 
{ 
    joblist.Scheduler = this; 
    UpdateQueues(joblist); 
} 

UpdateQueues centraliza la lógica de actualización de la cola. Note el algoritmo para seleccionar una nueva cola, así como la notificación al ralentí núcleos cuando el trabajo esté disponible:

void Scheduler.UpdateQueues(JobList* joblist) 
{ 
    lock(this) 
    { 
    // Remove from prior queue, if any 
    if(joblist.CurrentQueue!=null) 
    { 
     if(joblist.CurrentQueue!=Blocked) ReadyCount--; 
     joblist.CurrentQueue.Remove(joblist); 
    } 

    // Select new queue 
    joblist.CurrentQueue = joblist.Ready ? Ready[joblist.WorkerCount] : Blocked; 

    // Add to new queue 
    joblist.CurrentQueue.Add(joblist); 
    if(joblist.CurrentQueue!=Blocked) 
     if(++ReadyCount==1) NotifyBlockedCores(); 
    } 
} 

DoWork es una obra planificador normal, excepto: 1. Selecciona la lista de tareas con la menor cantidad de trabajadores, 2. Se trabaja trabajos de un empleado dado hasta que no pueda más, y 3. Almacena el índice de trabajo así como el trabajo para que el colaborador pueda actualizar fácilmente el estado de finalización (detalle de implementación).

void Scheduler.DoWork() 
{ 
    while(!Exit) 
    { 
    // Get a job list to work on 
    JobList *list = null; 
    lock(this) 
    { 
     for(int i=0; i<Ready.Length; i++) 
     if(!Ready[i].Empty) 
     { 
      list = Ready[i].First; 
      break; 
     } 
     if(list==null) // No work to do 
     { 
     WaitForNotifyBlockedCores(); 
     continue; 
     } 
     list.WorkerCount++; 
     UpdateQueues(list); 
    } 

    // Execute jobs in the list as long as possible 
    while(true) 
    { 
     int jobIndex; 
     Job job; 
     if(!GetNextReadyJob(&job, &jobIndex)) break; 

     job.Execute(); 

     list.MarkJobCompleted(job, jobIndex); 
    } 

    // Release the job list 
    lock(this) 
    { 
     list.WorkerCount--; 
     UpdateQueues(list); 
    } 
    } 
} 

aplicación lista de tareas

La lista de tareas realiza un seguimiento de cómo la señal/espera se intercalan con los puestos de trabajo y realiza un seguimiento de qué señal/espere pares ya han completado todo antes de su punto de señal.

El constructor crea un punto de señal ficticio para agregar trabajos a. Este punto de señal se convierte en un punto de señal real (y se agrega un nuevo dummy) cada vez que se agrega una nueva "señal".

JobList.JobList() 
{ 
    // Always have a dummy signal point at the end 
    Signals.Add(CurrentSignal = new SignalPoint()); 
} 

AddJob agrega un trabajo a la lista. Está marcado como incompleto en SignalPoint. Cuando el trabajo se ejecuta realmente, el IncompleteCount del mismo SignalPoint se reduce. También es necesario decirle al programador que las cosas podrían haber cambiado, ya que el nuevo trabajo podría ser inmediatamente ejecutable. Tenga en cuenta que se llama al programador después de liberar el bloqueo en "this" para evitar el interbloqueo.

void JobList.AddJob(Job job) 
{ 
    lock(this) 
    { 
    Jobs.Add(job); 
    Signals.Last.IncompleteCount++; 
    if(NextJob == null) 
     NextJob = job; 
    } 
    if(Scheduler!=null) 
    Scheduler.UpdateQueues(this); 
} 

AddSignal y AddWait agregan señales y esperas a la lista de trabajos. Tenga en cuenta que AddSignal realmente crea un nuevo SignalPoint, y AddWait simplemente completa la información del punto de espera en el SignalPoint creado anteriormente.

void JobList.AddSignal() 
{ 
    lock(this) 
    { 
    Signals.Last.SignalJobIndex = Jobs.Count; // Reify dummy signal point 
    Signals.Add(new SignalPoint());   // Create new dummy signal point 
    } 
} 


void JobList.AddWait() 
{ 
    lock(this) 
    { 
    Signals.Last.Previous.WaitJobIndex = Jobs.Count; 
    } 
} 

La propiedad Ready determina si la lista está lista para núcleos adicionales asignados. Puede haber dos o tres núcleos trabajando en la lista sin que la lista esté "lista" si el siguiente trabajo está esperando una señal antes de que pueda comenzar.

bool JobList.Ready 
{ 
    get 
    { 
    lock(this) 
    { 
     return NextJob!=null && 
     (CurrentSignal==Signals.Last || 
     NextJobIndex < CurrentSignal.WaitJobIndex); 
    } 
    } 
} 

GetNextReadyJob es muy simple: si estamos listos, simplemente devuelva el siguiente trabajo de la lista.

void JobList.GetNextReadyJob(Job& job, int& jobIndex) 
{ 
    lock(this) 
    { 
    if(!Ready) return false; 
    jobIndex = list.NextJobIndex++; 
    job = list.NextJob; list.NextJob = job.Next; 
    return true; 

    } 
} 

MarkJobCompleted es probablemente el más interesante de todos. Debido a la estructura de las señales y las esperas, el trabajo actual es antes de CurrentSignal o está entre CurrentSignal y CurrentSignal.Next (si es posterior a la última señal real, se contará como entre CurrentSignal y el dummy SignalPoint al final) Necesitamos reducir el conteo de trabajos incompletos. También es posible que necesitemos pasar a la siguiente señal si este conteo llega a cero. Por supuesto, nunca pasamos el SignalPoint ficticio al final.

Tenga en cuenta que este código no tiene una llamada a Scheduler.UpdateQueue porque sabemos que el planificador llamará GetNextReadyJob en solo un segundo y si devuelve falso llamará a UpdateQueue de todos modos.

void JobList.MarkJobCompleted(Job job, int jobIndex) 
{ 
    lock(this) 
    { 
    if(jobIndex >= CurrentSignal.SignalJobIndex) 
     CurrentSignal.Next.IncompleteCount--; 
    else 
    { 
     CurrentSignal.IncompleteCount--; 
     if(CurrentSignal.IncompleteCount==0) 
     if(CurrentSignal.WaitJobIndex < int.MaxValue) 
      CurrentSignal = CurrentSignal.Next; 
    } 
    } 
} 

sintonización basado en longitud de la lista, las estimaciones de la longitud de trabajo, etc

El código anterior no presta ninguna atención a la duración de las listas de trabajos son, por lo que si hay un centenar de pequeñas listas de trabajos y uno muy grande es posible que cada núcleo tome una pequeña lista de trabajos por separado y luego todos se congregan en el enorme, lo que conduce a la ineficiencia. Esto se puede resolver haciendo Ready [] una serie de colas de prioridad priorizadas en (joblist.Jobs.Count - joblist.NextJobIndex), pero con la prioridad solo actualizada en situaciones normales de UpdateQueue para mayor eficiencia.

Esto podría volverse aún más sofisticado al crear una heurística que tenga en cuenta el número y el espaciado de las combinaciones señal/espera para determinar la prioridad. Esta heurística sería mejor ajustada usando una distribución de duraciones de trabajo y uso de recursos.

Si se conocen las duraciones de trabajo individuales, o si hay buenas estimaciones disponibles para ellas, la heurística podría usar la duración restante estimada en lugar de solo la longitud de la lista.

Notas finales

Esta es una solución bastante estándar para el problema que se presente. Puede utilizar los algoritmos que he dado y que van a trabajar, incluyendo el bloqueo, pero no será capaz de compilar el código que he escrito anteriormente por varias razones:

  1. Es una mezcla loca de C++ y C# sintaxis. Originalmente comencé a escribir en C# y luego cambié un montón de sintaxis al estilo C++ ya que pensé que era más probable lo que usarías para un proyecto así. Pero me fui en bastantes C -ismos. Afortunadamente no hay LINQ ;-).

  2. Los detalles de LinkedList tienen algunas agitaciones manuales. Supongo que la lista puede hacer Primero, Último, Agregar y Eliminar y que los elementos en la lista pueden hacer Anterior y Siguiente. Pero no usé la API real para ninguna clase de lista ligada real que yo sepa.

  3. No lo compilé ni lo compruebo. Te garantizo que hay un error o dos allí en alguna parte.

Conclusión: Debería tratar el código anterior como seudocódigo aunque parezca el verdadero McCoy.

¡Disfrútalo!

+0

Hola, buena publicación. ¿Tiene alguna sugerencia para leer material sobre este tema? Gracias. –

+0

@Mark: Lo siento, sé que leí varios buenos artículos relacionados con esto hace 25-30 años, pero no pude darle una referencia ahora. Cosas como esta simplemente se quedan en tu cerebro y no desaparecen, si sabes a qué me refiero. –

1

Si usted tiene acceso a una work stealing framework en su entorno (por ejemplo, Cilk si se encuentra en C, o el fork/join framework de Doug Lea en Java), se puede obtener fácilmente una solución simple y limpia (en comparación con los de bajo nivel intentos ad-hoc, que probablemente tendrá que hacer si no puede usar algo como eso), lo que le brinda un equilibrio de carga automático y una buena ubicación de datos.

Aquí hay una descripción de alto nivel de una solución: inicia un hilo por núcleo. A cada uno se le asigna una lista hasta que se agoten (muchas maneras de hacerlo; esa es la tarea de muy buenos mecanismos de colas concurrentes, y esa es una razón por la que le gustaría evitar las soluciones de hágalo usted mismo si es posible). Cada trabajador revisa las filas de las listas una por una: - Se mantienen dos colas, una para esos trabajos antes de un token signal, y uno o aquellos posteriores. - Cuando se encuentra un puesto de trabajo, es bifurcado, y se añade a la cola respectiva (en función de si vimos un signal ficha o no) - Cuando se encuentra una ficha wait, nos sumamos a todos los puestos de trabajo antes de la señal (que es la semántica que describes si entendí correctamente). Tenga en cuenta que en el código que utilizo helpJoin(), significa que el hilo ayudará realmente (por reventar las tareas bifurcadas y ejecución de ellos hasta la unión puede proceder)

"Tenedor" significa poner la tarea en una cola local de subprocesos, que luego será ejecutado por el propio subproceso, o puede ser robado por otro subproceso que busca algún trabajo por hacer.

A modo ilustrativo, aquí hay una simulación de ~ 80 líneas de trabajo de este escenario, utilizando el marco java antes mencionado. Crea tantos hilos como núcleos disponibles, y algunas listas, y comienza a ejecutarlos. Tenga en cuenta lo simple que es el método run() mientras todavía tiene los beneficios del balanceo de carga y que los subprocesos ejecutan principalmente tareas de su propia lista, a menos que se queden sin trabajo y comiencen a robar para obtener algo. Por supuesto, si no estás en Java o C, tendrías que encontrar un marco similar, pero el mismo conjunto de ideas básicas simplificaría de manera similar tu código, independientemente del idioma.

import java.util.*; 
import java.util.concurrent.*; 
import jsr166y.ForkJoinPool; 
import jsr166y.ForkJoinTask; 
import jsr166y.RecursiveTask; 

public class FJTest { 
    public static void main(String[] args) throws Exception { 
     Iterable<List<TaskType>> lists = createLists(10); 

     ForkJoinPool pool = new ForkJoinPool(); 

     for (final List<TaskType> list : lists) { 
      pool.submit(new Runnable() { 
       public void run() { 
        List<ForkJoinTask> beforeSignal = new ArrayList<ForkJoinTask>(); 
        List<ForkJoinTask> afterSignal = new ArrayList<ForkJoinTask>(); 
        boolean signaled = false; 
        for (TaskType task : list) { 
         switch (task) { 
          case JOB: 
           ForkJoinTask job = new Job(); 
           if (signaled == false) 
            beforeSignal.add(job); 
           else 
            afterSignal.add(job); 
           job.fork(); 
           break; 
          case SIGNAL: 
           signaled = true; 
           break; 
          case WAIT: 
           signaled = false; 
           for (ForkJoinTask t : beforeSignal) { 
            t.helpJoin(); 
           } 
           beforeSignal = afterSignal; 
           afterSignal = new ArrayList<ForkJoinTask>(); 
         } 
        } 
       } 
      }); 
     } 

     pool.shutdown(); 
     pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 
    } 

    private static Iterable<List<TaskType>> createLists(int size) { 
     List<List<TaskType>> tasks = new ArrayList<List<TaskType>>(); 
     for (int i = 0; i < size; i++) { 
      tasks.add(createSomeList()); 
     } 
     return tasks; 
    } 

    private static List<TaskType> createSomeList() { 
     return Arrays.asList(
       TaskType.JOB, 
       TaskType.JOB, 
       TaskType.SIGNAL, 
       TaskType.JOB, 
       TaskType.WAIT, 
       TaskType.JOB); 
    } 

} 

enum TaskType { 
    JOB, SIGNAL, WAIT; 
} 
class Job extends RecursiveTask<Void> { 
    @Override 
    protected Void compute() { 
     long x = 1; 
     for (long i = 1; i < 200000001; i++) { 
      x = i * x; 
     } 
     System.out.println(x); //just to use x 
     return null; 
    } 
} 
Cuestiones relacionadas