2010-02-04 8 views
8

Estoy trabajando en un servidor TCP multiproceso. En el hilo principal, escucho en un socket y creo un nuevo hilo para nuevas conexiones entrantes. Quiero guardar todas las conexiones entrantes en un hash para poder acceder a ellas desde otro hilo.¿Cómo guardo los sockets en un hash y los repito desde otro thread?

Desde el hilo del monitor, no puedo leer las conexiones recién agregadas. Parece que se crea un nuevo hash de clientes al crear el subproceso del monitor.

¿Cómo guardo la lista de todos los sockets y los bucleo de la secuencia de mi monitor?

código actual:

#!/usr/bin/perl 
use strict; 
use IO::Socket; 
use threads; 
use Thread::Queue; 

# init 
my $clients = {}; 
my $queue = Thread::Queue->new; 

# thread that monitors 
threads->create("monitor"); 

# create the listen socket 
my $listenSocket = IO::Socket::INET->new(LocalPort => 12345, 
             Listen => 10, 
             Proto => 'tcp', 
             Reuse => 1); 

# make sure we are bound to the port 
die "Cant't create a listening socket: [email protected]" unless $listenSocket; 

print "Server ready. Waiting for connections on 34567 ... \n"; 

# wait for connections at the accept call 
while (my $connection = $listenSocket->accept) { 
    # set client socket to non blocking 
    my $nonblocking = 1; 
    ioctl($connection, 0x8004667e, \\$nonblocking); 

    # autoflush 
    $connection->autoflush(1); 

    # debug 
    print "Accepted new connection\n"; 

    # add to list 
    $clients->{time()} = $connection; 

    # start new thread and listen on the socket 
    threads->create("readData", $connection); 
} 

sub readData { 
    # socket parameter 
    my ($client) = @_; 

    # read client 
    while (<$client>) { 
     # remove newline 
     chomp $_; 

    # add to queue 
     $queue->enqueue($_); 
    } 

    close $client; 
} 

sub monitor { 
    # endless loop 
    while (1) { 

     # loop while there is something in the queue 
     while ($queue->pending) { 

      # get data from a queue 
      my $data = $queue->dequeue; 

      # loop all sockets 
      while (my ($key, $value) = each(%$clients)) { 

       # send to socket 
       print $value "$data\n"; 

      } 
     } 

     # wait 0,25 seconds 
     select(undef, undef, undef, 0.25); 
    } 
} 

close $listenSocket; 
+0

Sugerencia, posiblemente útil para usted, tal vez no: ¿Alguna vez ha visto un módulo llamado 'IO :: Multiplex'? – fennec

Respuesta

8

Necesita compartir $clients a través de sharethreads::shared:

my $clients = &share({}); 

La sintaxis antigua es debido a un problema documentado con prototipos de Perl. Si tiene at least Perl 5.8.9, utilice el más agradable

my $clients = shared_clone({}); 

en su lugar.

También desea proteger $clients con un candado, p.,

my $clients_lock : shared; 
{ 
    lock $clients_lock; 
    $clients->{time()} = fileno $connection; 
} 

Por último, debido a IO::Socket::INET casos son typeglobs Perl, no se puede compartirlos, así que en vez de agregar sus descriptores de socket (de fileno) a $clients y luego fdopen la toma cuando es necesario con

open my $fh, ">&=", $sockdesc or warn ... 

El programa siguiente repite los datos entrantes a los otros zócalos conectados:

#!/usr/bin/perl 

use strict; 
use IO::Socket; 
use threads; 
use threads::shared; 
use Thread::Queue; 

# init 
my $clients = &share({}); 
my $clients_lock : shared; 

my $queue = Thread::Queue->new; 

# thread that monitors 
threads->create("monitor"); 

# create the listen socket 
my $port = 12345; 
my $listenSocket = IO::Socket::INET->new(
    LocalPort => $port, 
    Listen  => 10, 
    Proto  => 'tcp', 
    Reuse  => 1 
); 

# make sure we are bound to the port 
die "Can't create a listening socket: [email protected]" unless $listenSocket; 

print "Server ready. Waiting for connections on $port ... \n"; 

# wait for connections at the accept call 
while (my $connection = $listenSocket->accept) { 
    # set client socket to non blocking 
    my $nonblocking = 1; 
    ioctl($connection, 0x8004667e, \\$nonblocking); 

    # autoflush 
    $connection->autoflush(1); 

    # debug 
    print "Accepted new connection\n"; 

    # add to list 
    { 
    lock $clients_lock; 
    $clients->{time()} = fileno $connection; 
    } 

    # start new thread and listen on the socket 
    threads->create("readData", $connection); 
} 

sub readData { 
    # socket parameter 
    my ($client) = @_; 

    # read client 
    while (<$client>) { 
    chomp; 
    $queue->enqueue($_); 
    } 

    close $client; 
} 

sub monitor { 
    # endless loop 
    while (1) { 
    # loop while there is something in the queue 
    while ($queue->pending) { 
     # get data from a queue 
     my $data = $queue->dequeue; 

     # loop all sockets 
     { 
     lock $clients_lock; 
     while (my ($key, $value) = each(%$clients)) { 
      # send to socket 
      if (open my $fh, ">&=", $value) { 
      print $fh "$data\n"; 
      } 
      else { 
      warn "$0: fdopen $value: $!"; 
      } 
     } 
     } 
    } 

    # wait 0,25 seconds 
    select(undef, undef, undef, 0.25); 
    } 
} 

close $listenSocket; 
+1

+1 Acababa de descifrar lo mismo. –

+0

¡Gracias! Funciona como un encanto ahora :) – Dieterve

+0

@Dieterve ¡De nada! –

1

No tienen demasiada experiencia con hilos en Perl, pero creo que lo que desea es compartir su lista de clientes:

 
    use threads::shared ; 
    my $clients : shared = {}; 


Actualización:

Perl se queja de:

my $hash : shared = {}; 

pero parece estar bien con:

my $hash = {}; 
share($hash); 

Además, este código:

my $hash = { key1 => "value1" }; 
share($hash); 

parece borrar la tabla hash, pero

my $hash = {}; 
share($hash); 
$hash->{key1} = "value1"; 

funciona como esperaba.

+0

Esto no funciona, la segunda línea da un error "Valor inválido para el escalar compartido en la línea test.pl 9." – Dieterve

Cuestiones relacionadas