2012-03-06 12 views
7

Divulgación completa, soy un estudiante y esta es una tarea. He estado trabajando en ello durante más de una semana casi sin parar (además del tiempo pasado anterior) y no puedo entender lo que estoy haciendo mal. Mi servidor sigue esperando epoll_wait solo después de hacer unos pocos recvs ("pocos" porque anticipo varios GB de datos y obtengo solo unas pocas docenas de MB). No creo que haya nada de malo en cómo funciona mi cliente, porque funciona bien con mis servidores selectos y de subprocesos múltiples. Por favor, eche un vistazo rápido y avíseme si hay algo que salta a usted como la causa de mi problema.¿Falta algo o simplemente no entiendo epoll?

La idea básica del cliente/servidor es bombardear el servidor con conexiones (10k +) y transferir una cantidad determinada de datos varias veces. Este servidor epoll está teniendo problemas con 2000, cuando mi servidor multiproceso manejó apenas por debajo del objetivo 10k.

No estoy pidiendo que hagas mi trabajo por mi (estoy a punto) Solo necesito ayuda para entender qué estoy haciendo mal aquí. Gracias de antemano por cualquier ayuda que pueda ofrecer :)

1 #include "common.h" 
    2 #include <sys/epoll.h> 
    3 
    4 uint16_t ready[MAX_CONNS]; 
    5 uint16_t next; 
    6 pthread_mutex_t mutex; 
    7 
    8 void *worker_thread(void *param) { 
    9  int my_sock, pos; 
10  struct conn_params *conn_ps = (struct conn_params *)param; 
11 
12  while (1) { 
13   pthread_mutex_lock(&mutex); 
14 
15   while (1) { 
16    if (next == MAX_CONNS) { 
17     printf("balls\n"); 
18     next = 4; 
19    } 
20 
21    if (ready[next] != 0) { 
22     pos = next; 
23     my_sock = ready[pos]; 
24     next++; 
25     break; 
26    } 
27   } 
28 
29   pthread_mutex_unlock(&mutex); 
30   /* handle recv/send */ 
31   if (echo_recv(&conn_ps[my_sock], MULTIPLE) == 0) { /* closed conn */ 
32    shutdown(my_sock, SHUT_RDWR); 
33    close(my_sock); 
34    serv_stats.active_connections--; 
35   } 
36   ready[pos] = 0; 
37 /*  print_conn_stats(&conn_ps[my_sock]);*/ 
38  } 
39 } 
40 
41 void *add_client_thread(void *param) { 
42  struct epoll_accept_thread *eat = (struct epoll_accept_thread *)param; 
43  struct sockaddr client; 
44  struct epoll_event event; 
45  socklen_t client_len; 
46  int new_sock, ret; 
47  char hostbuf[NI_MAXHOST], servbuf[NI_MAXSERV]; 
48 
49  bzero(&client, sizeof(struct sockaddr)); 
50  event.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLET; 
51 
52  while ((new_sock = accept(eat->listen_sock, &client, &client_len)) != -1) { 
53   set_nonblock(new_sock); 
54   event.data.fd = new_sock; 
55   if (epoll_ctl(eat->fd_epoll, EPOLL_CTL_ADD, new_sock, &event) == -1) { 
56    perror("epoll_ctl"); 
57    printf("%u\n", new_sock); 
58    continue; 
59   } 
60 
61   bzero(&(eat->conn_ps[new_sock]), sizeof(struct conn_params)); 
62   eat->conn_ps[new_sock].sock = new_sock; 
63   if ((ret = getnameinfo(&client, client_len, hostbuf, NI_MAXHOST, servbuf, NI_MAXSERV, NI_NUMERICHOST)) != 0) { 
64    gai_strerror(ret); 
65   } 
66 
67   update_server_stats(); 
68   printf("added client\n"); 
69  } 
70 
71  if (errno != EAGAIN) { 
72   perror("Couldn't accept connection"); 
73  } 
74 
75  pthread_exit(NULL); 
76 } 
77 
78 int main(int argc, char **argv) { 
79  char opt, *port = NULL; 
80  struct addrinfo hints, *results, *p; 
81  int listen_sock = new_tcp_sock(), nfds, i, ret; 
82  int fd_epoll, next_avail = 4; 
83  struct conn_params conn_ps[MAX_CONNS]; 
84  struct epoll_event evs[MAX_CONNS]; 
85  struct epoll_event event; 
86  struct epoll_accept_thread eat; 
87  pthread_t thread; 
88 
89  while ((opt = getopt(argc, argv, ":l:")) != -1) { 
90   switch (opt) { 
91    case 'l': /* port to listen on */ 
92     port = optarg; 
93     break; 
94    case '?': /* unknown option */ 
95     fprintf(stderr, "The option -%c is not supported.\n", opt); 
96     exit(1); 
97    case ':': /* required arg not supplied for option */ 
98     fprintf(stderr, "The option -%c requires an argument.\n", opt); 
99     exit(1); 
100   } 
101  } /* command line arg processing done */ 
102 
103  if (port == NULL) { 
104   fprintf(stderr, "You must provide the port to listen on (-l).\n"); 
105   exit(1); 
106  } 
107 
108  signal(SIGINT, handle_interrupt); 
109 
110  bzero(&hints, sizeof(struct addrinfo)); 
111  hints.ai_family = AF_INET; 
112  hints.ai_socktype = SOCK_STREAM; 
113  hints.ai_flags = AI_PASSIVE; 
114 
115  set_nonblock(listen_sock); 
116  set_reuseaddr(listen_sock); 
117 
118  if ((ret = getaddrinfo(NULL, port, &hints, &results) != 0)) { 
119   gai_strerror(ret); 
120   exit(1); 
121  } 
122 
123  for (p = results; p != NULL; p = p->ai_next) { /* attempt to connect to the host */ 
124   if (bind(listen_sock, p->ai_addr, p->ai_addrlen) == -1) { 
125    perror("Bind failed"); 
126   } else { 
127    break; 
128   } 
129  } 
130 
131  if (p == NULL) { /* we were unable to connect to anything */ 
132   fprintf(stderr, "Unable to bind to the specified port. Exiting...\n"); 
133   exit(1); 
134  } 
135 
136  freeaddrinfo(results); 
137 
138  if (listen(listen_sock, 5) == -1) { 
139   perror("Listen failed"); 
140   exit(1); 
141  } 
142 
143  /* everything is set up. method-specific code goes below */ 
144 
145  start_server_stats(); 
146  next = 4; 
147 
148  if ((fd_epoll = epoll_create(MAX_CONNS)) == -1) { 
149   perror("epoll_create"); 
150   exit(1); 
151  } 
152 
153  event.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLET; 
154  event.data.fd = listen_sock; 
155  if (epoll_ctl(fd_epoll, EPOLL_CTL_ADD, listen_sock, &event) == -1) { 
156   perror("epoll_ctl"); 
157   exit(1); 
158  } 
159 
160  signal(SIGPIPE, SIG_IGN); 
161  bzero(ready, MAX_CONNS * sizeof(uint16_t)); 
162  pthread_mutex_init(&mutex, NULL); 
163 
164  for (i = 0; i < 5; i++) { /* five workers should be enough */ 
165   pthread_create(&thread, NULL, worker_thread, (void *)&conn_ps); 
166  } 
167 
168  while (1) { 
169   if ((nfds = epoll_wait(fd_epoll, evs, MAX_CONNS, -1)) > 0 && errno == EINTR) { 
170    continue; 
171   } 
172   for (i = 0; i < nfds; i++) { /* loop through all FDs */ 
173    if (evs[i].events & (EPOLLERR | EPOLLHUP)) { /* if there's an error or a hangup */ 
174     /*fprintf(stderr, "Error! Danger, Will Robinson! Danger!");*/ 
175     close(evs[i].data.fd); 
176     continue; 
177    } else if (evs[i].data.fd == listen_sock) { /* we have a new connection coming in */ 
178     eat.listen_sock = listen_sock; 
179     eat.fd_epoll = fd_epoll; 
180     eat.conn_ps = conn_ps; 
181     pthread_create(&thread, NULL, add_client_thread, (void *)&eat); 
182    } else { /* inbound data */ 
183     while (ready[next_avail] != 0) { 
184      next_avail++; 
185 
186      if (next_avail == MAX_CONNS) { 
187       next_avail = 4; 
188      } 
189     } 
190     ready[next_avail] = evs[i].data.fd; 
191    } /* end inbound data */ 
192   } /* end iterating through FDs */ 
193  } /* end epoll_wait loop */ 
194 
195  perror("epoll_wait"); 
196 
197  return 0; 
198 } 

Y aquí está la función echo_recv, como supongo que alguien va a querer ver que así:

14 int echo_recv(struct conn_params *conn_p, int single) { 
15  char client_buf[CLIENT_BUF_SIZE], buffer[BUF_SIZE]; 
16  int nread, nwrite, nsent = 0, i; 
17 
18  while ((nread = recv(conn_p->sock, client_buf, CLIENT_BUF_SIZE, 0)) > 0) { 
19   /* create buffer of MULTIPLIER(int) times what was received */ 
20   for (i = 0; i < MULTIPLIER && nread*i < BUF_SIZE; i++) { 
21    memcpy(buffer+(nread*i), client_buf, nread); 
22   } 
23 
24   /* send the created buffer */ 
25   while ((nwrite = send(conn_p->sock, buffer+nsent, (nread*MULTIPLIER)-nsent, 0)) > 0) { 
26    nsent += nwrite; 
27   } 
28 
29   conn_p->total_recvd += nread; /* update our stats for this conn */ 
30   conn_p->total_sent += nsent; /* update our status for this conn */ 
31   serv_stats.total_recvd += nread; 
32   serv_stats.total_sent += nsent; 
33   nsent = 0; 
34 
35   if (single) { 
36    return 1; 
37   } 
38  } 
39 
40  if (nread == -1 && (errno & EAGAIN)) { 
41   return 1; 
42  } 
43 
44  if (nread == -1) { 
45   perror("wtf?"); 
46  } 
47 
48  shutdown(conn_p->sock, SHUT_RDWR); 
49  close(conn_p->sock); 
50 
51  return 0; /* recv failed */ 
52 } 
+2

Dos puntos: el primero es que 'errno' no es un campo de bits, por lo que' errno & EAGAIN' no es correcto, use 'errno == EAGAIN'. El segundo es que estás indexando matrices con el descriptor de socket, y pueden ser cualquier número que encaje en un 'int', ¿estás seguro de que son menores que el tamaño de las matrices? –

+0

@Joachim Pileborg: solo curiosidad, ¿puedes nombrar un sistema donde los fd-s no están asignados a los números más bajos disponibles? –

+0

@KarolyHorvath No, pero tan pronto como sé que no hay garantía de que deba ser así, o que los números deben ser consecutivos. Además, nunca obtendrá un socket con "valores" de 0 a 2 a menos que cierre el estándar in/out/err. –

Respuesta

2

Aquí están algunas ideas:

  1. Debería ver realmente cómo se accede a la matriz compartida ready. En su hilo de trabajo, adquiere un mutex para leerlo, sin embargo, hay ocasiones en que modifica este fuera del del candado, adicionalmente, no adquiere este candado en su bucle de sondeo (hilo principal), simplemente escribe en la matriz - esto es completamente incorrecto.
  2. Usted no guarda los identificadores de hilo para todos los subprocesos de trabajo, cómo se propone para matarlos (o esperar a que se completen - normalmente que había necesidad de llamar pthread_join)
  3. Se crea un subproceso independiente para acepte la conexión, pero nuevamente modifica la estructura epoll_accept_thread compartida en este hilo, y no hay ningún bloqueo a su alrededor.

Solucionaré todos los problemas de sincronización primero, y eso puede revelar otros problemas.

+0

Si se supone que el proceso se ejecutará para siempre, puede que no sea necesario eliminar los subprocesos de trabajo. He escrito bastantes programas como ese, donde los hilos nunca salen, a menos que esté en pánico absoluto porque algo salió terriblemente mal. Sin embargo, tuve que matar todo el proceso, no solo un hilo. –

+0

@ X-Istence, ese puede ser el caso, pero si quiere asegurarse de que espera a que todos los hilos completen por completo sus tareas antes de apagarse, únase es el camino a seguir. – Nim

+0

En este momento, simplemente dejo que el proceso se ejecute para siempre (o lo elimine en caso de que necesite volver a probarlo), así que no creo que sea importante hacer un seguimiento de los ID de los hilos. En cuanto a cómo se accede a 'listo' en el hilo principal, estoy buscando un punto que no está ocupado, y este es el único hilo que está llenando esas ranuras, así que pensé que no era necesario un mutex . Sin embargo, se corrigió el mutex en los hilos de trabajo. Además, elimino las asignaciones adicionales en main al 'epoll_accept_thread'. Esos cambios no han arrojado ningún resultado diferente :( –

1

Yo quería publicar esto en un comentario anterior pero tiene mucho más tiempo de lo que permitiría:

Intenta implementar un servidor basado sencilla epoll que es totalmente asíncronos (pasos)

  1. Configure su aceptar toma ...
  2. Añadir a epoll.
  3. entrar bucle de espera:
    1. Compruebe si caso está en aceptar la toma de corriente, o la toma normal de
    2. Si acepta zócalo, aceptar la conexión, añadir a epoll, volver a 3
    3. Si el evento de toma normal para la lectura , lea X ​​bytes, guarde para escribir búfer y habilite write event en epoll para socket, vuelva a 3
    4. Si el evento en el socket normal para escritura, escriba bytes del búfer en la red, deshabilite el evento de escritura si el búfer de escritura está vacío, volver a 3.
    5. Si se produce un error quitar el enchufe de epoll
  4. No hay cuarto paso ... el programa de bucle debe siempre.

Esto debería eliminar cualquier complejidad que haya agregado al enhebrar que podría causar problemas. Esto mueve epoll de vuelta al mismo tipo de dominio como select(), excepto que generalmente es mucho más rápido. La idea de utilizar una biblioteca de eventos es saber cuándo se puede leer/escribir en lugar de establecer un socket para no bloquear e intentar leer desde él/escribir en él.

También parece que nunca comprueba el valor de retorno de write() que puede haber fallado debido a la recepción de un SIGPIPE (sé que ignoró la señal, pero igual obtendrá un EAGAIN/EINTR errno).

La otra cosa que veo es que estás haciendo un bucle ocupado dentro de tu hilo que está esperando a que los sockets estén listos. Cuando utiliza select() o epoll en este caso, es para que se le notifique que hay algo nuevo, por lo que no tiene que hacer un bucle ocupado ...

No estoy seguro de lo que está intentando lograr, pero su código es extremadamente ineficiente.

Lo que podría hacer, después de implementar sólo un ejemplo asíncrona sencilla mediante los pasos anteriores se puesta en marcha de múltiples subprocesos de trabajo que todos escuchar (usando epoll) para read eventos en la toma listener/accept y que tienen cada uno de los hilos de control Varios conexiones (aún usando lo que publiqué arriba).

+0

Sí, sé que mi código es terriblemente ineficiente :( Honestamente, solo quería que FUNCIONARA antes de comenzar a refactorizar. He estado golpeando mi cabeza en el escritorio por un tiempo con este. –