2012-02-13 9 views
7

estoy tratando de lograr lo siguiente:Perl colas y con hilo

  1. tener un hilo que lee datos de un archivo muy grande decir sobre 10GB y empujarlos en la cola. (No deseo para la cola a ser muy grandes tampoco)

  2. Mientras que el hilo buildQueue está empujando a la cola de datos al mismo tiempo, tener hilos alrededor del 5 trabajadores los datos del proceso de-cola y.

he hecho un intento pero mis otros hilos son inalcanzables debido a un bucle continuo en mi buildQueue hilo.

Mi enfoque puede ser totalmente incorrecto. Gracias por cualquier ayuda, es muy apreciado.

Aquí está el código para buildQueue:

sub buildQueue { 
    print "Enter a file name: "; 
    my $dict_path = <STDIN>; 
    chomp($dict_path); 
    open DICT_FILE, $dict_path or die("Sorry, could not open file!"); 
    while (1) { 
     if (<DICT_FILE>) { 
      if ($queue->pending() < 100) { 
       my $query = <DICT_FILE>; 
       chomp($query); 
       $queue->enqueue($query); 
       my $count = $queue->pending(); 
       print "Queue Size: $count Query: $query\n"; 
      } 
     } 
    } 
} 

Y como he esperado cuando este hilo es ejecutado nada más después se ejecutará porque este hilo no va a terminar.

my $builder = new Thread(&buildQueue); 

Dado que el hilo del constructor estará en funcionamiento durante mucho tiempo, nunca consigo crear subprocesos de trabajo.

Aquí está el código completo:

#!/usr/bin/perl -w 
use strict; 
use Thread; 
use Thread::Queue; 


my $queue = new Thread::Queue(); 
my @threads; 

sub buildQueue { 
    print "Enter a file name: "; 
    my $dict_path = <STDIN>; 
    chomp($dict_path); 
    open dict_file, $dict_path or die("Sorry, could not open file!"); 
    while (1) { 
     if (<dict_file>) { 
      if ($queue->pending() < 100) { 
       my $query = <dict_file>; 
       chomp($query); 
       $queue->enqueue($query); 
       my $count = $queue->pending(); 
       print "Queue Size: $count Query: $query\n"; 
      } 
     } 
    } 
} 

sub processor { 
    my $query; 
    while (1) { 
     if ($query = $queue->dequeue) { 
      print "$query\n"; 
     } 
    } 
} 

my $builder = new Thread(&buildQueue); 
push @threads, new Thread(&processor) for 1..5; 
+0

Un par de preguntas: Usted menciona que el hilo de cola-constructor no va a terminar, pero es lo que hace nada en absoluto? ¿Alguna vez el tamaño de la cola cae por debajo de 100 o va por encima de 0? Además, [no estoy seguro de que estés creando tus hilos correctamente] (http://perldoc.perl.org/perlthrtut.html). ¿No debería ser 'my $ builder = threads-> create (\ & buildQueue);'? –

+0

El generador de colas se compila bien, pero dado que no se llegó a crear subprocesos de trabajo, no se puede eliminar nada de la cola, por lo que la cola está bloqueada en 100 mientras la cola de compilación continúa ejecutándose debido al bucle continuo. – Sinista

+0

Hmmm, necesitaré ver más código para establecer el contexto, especialmente donde crea los hilos. No está 'join'ing o' detach'ing the queue builder antes de crear los hilos de trabajo, ¿verdad? –

Respuesta

10

Tendrá que marcar cuando se desea que sus hilos para salir (ya sea a través joinor detach). El hecho de que tenga bucles infinitos sin declaraciones last para salir de ellos también es un problema.

Edit: ¡También olvidé una parte muy importante! Each worker thread will block, waiting for another item to process off of the queue until they get an undef in the queue. De ahí por qué encolamos específicamente undef una vez por cada hilo después de que el generador de colas haya finalizado.

Probar:

#!/usr/bin/perl -w 
use strict; 
use threads; 
use Thread::Queue; 


my $queue = new Thread::Queue(); 
our @threads; #Do you really need our instead of my? 

sub buildQueue 
{ 
    print "Enter a file name: "; 
    my $dict_path = <STDIN>; 
    chomp($dict_path); 

    #Three-argument open, please! 
    open my $dict_file, "<",$dict_path or die("Sorry, could not open file!"); 
    while(my $query=<$dict_file>) 
    { 
     chomp($query); 
     while(1) 
     { #Wait to see if our queue has < 100 items... 
      if ($queue->pending() < 100) 
      { 
       $queue->enqueue($query); 
       print "Queue Size: " . $queue->pending . "\n"; 
       last; #This breaks out of the infinite loop 
      } 
     } 
    } 
    close($dict_file); 
    foreach(1..5) 
    { 
     $queue->enqueue(undef); 
    } 
} 

sub processor 
{ 
    my $query; 
    while ($query = $queue->dequeue) 
    { 
     print "Thread " . threads->tid . " got $query\n"; 
    } 
} 

my $builder=threads->create(\&buildQueue); 
push @threads,threads->create(\&process) for 1..5; 

#Waiting for our threads to finish. 
$builder->join; 
foreach(@threads) 
{ 
    $_->join; 
} 
+1

Parece que el problema fue el módulo de subprograma en desuso Cambié al módulo de subprocesos en su lugar y mi código funciona como debería. Gracias Jack Many por señalarme en la dirección correcta. – Sinista

0

Un enfoque diferente: También puede utilizar user_tasks en MCE 1.2+ y crear dos múltiples trabajadortasks, una tarea para la lectura (ya que es un archivo de gran tamaño, que también podría beneficiarse de la lectura paralela, preservando archivo leído buscar) y una tarea para procesar, etc.

El siguiente código todavía usa Thread::Queue para administrar su cola de almacenamiento en búfer.

El sub buildQueue tiene su control de tamaño de cola y envía los datos directamente al proceso de administrador '$ R_QUEUE ya que hemos utilizado hilos, por lo que tiene acceso al espacio de memoria del padre. Si desea utilizar las horquillas en su lugar, aún puede acceder a la cola a través de una función de devolución de llamada. Pero aquí elegí simplemente empujar a la cola.

El sub processQueue simplemente eliminará la cola de lo que esté en la cola hasta que no haya nada más pendiente.

El sub task_end en cada tarea se ejecuta una sola vez por el proceso de administrador al final de cada tarea, por lo que lo usamos para señalar una parada en nuestros procesos de trabajo.

Obviamente, hay mucha libertad en la forma en que desea trozo de sus datos a los trabajadores, para que pueda decidir sobre el tamaño del trozo o incluso cómo sorber sus datos en.

#!/usr/bin/env perl 
use strict; 
use warnings; 
use threads; 
use threads::shared; 
use Thread::Queue; 
use MCE; 

my $R_QUEUE = Thread::Queue->new; 
my $queue_workers = 8; 
my $process_workers = 8; 
my $chunk_size = 1; 

print "Enter a file name: "; 
my $input_file = <STDIN>; 
chomp($input_file); 

sub buildQueue { 
    my ($self, $chunk_ref, $chunk_id) = @_; 
    if ($R_QUEUE->pending() < 100) { 
     $R_QUEUE->enqueue($chunk_ref); 
     $self->sendto('stdout', "Queue Size: " . $R_QUEUE->pending ."\n"); 
    } 
} 

sub processQueue { 
    my $self = shift; 
    my $wid = $self->wid; 
    while (my $buff = $R_QUEUE->dequeue) { 
     $self->sendto('stdout', "Thread " . $wid . " got $$buff"); 
    } 
} 

my $mce = MCE->new(
    input_data => $input_file, # this could be a filepath or a file handle or even a scalar to treat like a file, check the documentation for more details. 
    chunk_size => $chunk_size, 
    use_slurpio => 1, 

    user_tasks => [ 
     { # queueing task 
      max_workers => $queue_workers, 
      user_func => \&buildQueue, 
      use_threads => 1, # we'll use threads to have access to the parent's variables in shared memory. 
      task_end => sub { $R_QUEUE->enqueue((undef) x $process_workers) } # signal stop to our process workers when they hit the end of the queue. Thanks > Jack Maney! 
     }, 
     { # process task 
      max_workers => $process_workers, 
      user_func => \&processQueue, 
      use_threads => 1, # we'll use threads to have access to the parent's variables in shared memory 
      task_end => sub { print "Finished processing!\n"; } 
     } 
    ] 
); 

$mce->run(); 

exit; 
3

El El módulo MCE para Perl adora los archivos grandes. Con MCE, uno puede dividir muchas líneas a la vez, sorber un gran trozo como una cadena escalar, o leer 1 línea a la vez. Chunking muchas líneas a la vez reduce la sobrecarga para IPC.

MCE 1.504 está disponible ahora. Proporciona MCE :: Queue con soporte para procesos secundarios que incluyen subprocesos. Además, la versión 1.5 viene con 5 modelos (MCE :: Flow, MCE :: Grep, MCE :: Loop, MCE :: Map y MCE :: Stream) que se encargan de crear instancias de la instancia de MCE así como sintonizando max_workers y chunk_size. Uno puede anular estas opciones por cierto.

A continuación, MCE :: Loop se utiliza para la demostración.

use MCE::Loop; 

print "Enter a file name: "; 
my $dict_path = <STDIN>; 
chomp($dict_path); 

mce_loop_f { 
    my ($mce, $chunk_ref, $chunk_id) = @_; 

    foreach my $line (@$chunk_ref) { 
     chomp $line; 
     ## add your code here to process $line 
    } 

} $dict_path; 

Si desea especificar la cantidad de trabajadores y/o el tamaño de porción, existen 2 formas de hacerlo.

use MCE::Loop max_workers => 5, chunk_size => 300000; 

O ...

use MCE::Loop; 

MCE::Loop::init { 
    max_workers => 5, 
    chunk_size => 300000 
}; 

Aunque se prefiere fragmentación de archivos de gran tamaño, se puede comparar el tiempo con la fragmentación de una línea a la vez. Uno puede omitir la primera línea dentro del bloque (comentada). Observe cómo no hay necesidad de un ciclo for interno. $ chunk_ref sigue siendo una referencia de matriz que contiene 1 línea. La entrada escalar $ _ contiene la línea cuando chunk_size es igual a 1, de lo contrario apunta a $ chunk_ref.

use MCE::Loop; 

MCE::Loop::init { 
    max_workers => 5, 
    chunk_size => 1 
}; 

print "Enter a file name: "; 
my $dict_path = <STDIN>; 
chomp($dict_path); 

mce_loop_f { 
# my ($mce, $chunk_ref, $chunk_id) = @_; 

    my $line = $_; 
    ## add your code here to process $line or $_ 

} $dict_path; 

Espero que esta demostración haya sido útil para las personas que desean procesar un archivo en paralelo.

:) Mario

Cuestiones relacionadas