2011-01-12 10 views
25

tengo un oyente:Multi-roscar con .Net HttpListener

listener = new HttpListener(); 
listener.Prefixes.Add(@"http://+:8077/"); 
listener.Start(); 
listenerThread = new Thread(HandleRequests); 
listenerThread.Start(); 

Y estoy solicitudes de control:

private void HandleRequests() 
{ 
    while (listener.IsListening) 
    { 
     var context = listener.BeginGetContext(new AsyncCallback(ListenerCallback), listener); 
     context.AsyncWaitHandle.WaitOne(); 
    } 
} 

private void ListenerCallback(IAsyncResult ar) 
{ 
    var listener = ar.AsyncState as HttpListener; 

    var context = listener.EndGetContext(ar); 

    //do some stuff 
} 

me gustaría escribir void Stop() de tal manera, que:

  1. Se bloqueará hasta que finalicen todas las solicitudes actualizadas (es decir, esperará a que todos los hilos "hagan algunas cosas").
  2. Mientras espera las solicitudes ya iniciadas, no permitirá más solicitudes (es decir, volver al comienzo de ListenerCallback).
  3. Después de eso llamará a listener.Stop() (listener.IsListening se convirtió en falso).

¿Cómo podría ser escribir?

EDIT: ¿Qué opinas de esta solución? ¿Es seguro?

public void Stop() 
{ 
    lock (this) 
    { 
     isStopping = true; 
    } 
    resetEvent.WaitOne(); //initially set to true 
    listener.Stop(); 
} 

private void ListenerCallback(IAsyncResult ar) 
{ 
    lock (this) 
    { 
     if (isStopping) 
      return; 

     resetEvent.Reset(); 
     numberOfRequests++; 
    } 

    var listener = ar.AsyncState as HttpListener; 

    var context = listener.EndGetContext(ar); 

    //do some stuff 

    lock (this) 
    { 
     if (--numberOfRequests == 0) 
      resetEvent.Set(); 
    } 
} 

Respuesta

2

he consultado mi código en EDITAR parte de mi pregunta y he decidido aceptarlo con algunas modificaciones:

public void Stop() 
{ 
    lock (locker) 
    { 
     isStopping = true; 
    } 
    resetEvent.WaitOne(); //initially set to true 
    listener.Stop(); 
} 

private void ListenerCallback(IAsyncResult ar) 
{ 
    lock (locker) //locking on this is a bad idea, but I forget about it before 
    { 
     if (isStopping) 
      return; 

     resetEvent.Reset(); 
     numberOfRequests++; 
    } 

    try 
    { 
     var listener = ar.AsyncState as HttpListener; 

     var context = listener.EndGetContext(ar); 

     //do some stuff 
    } 
    finally //to make sure that bellow code will be executed 
    { 
     lock (locker) 
     { 
      if (--numberOfRequests == 0) 
       resetEvent.Set(); 
     } 
    } 
} 
0

Simplemente llamando a listener.Stop() debería hacer el truco. Esto no terminará ninguna conexión que ya se haya establecido, pero evitará cualquier conexión nueva.

+1

Esto no es verdad. Si llama a 'listener.Stop()' durante la ejecución de 'ListenerCallback', obtendrá una excepción, por ejemplo. cuando se llama a 'EndGetContext' o incluso más tarde, cuando se utiliza el flujo de salida. Puedo ver las excepciones, por supuesto, pero preferiría no hacerlo. – prostynick

+0

En mi código uso una bandera y ya no me refiero al oyente después de haber llamado "stop", pero al cerrar el oyente no se cierran las conexiones ya aceptadas, solo el oyente. –

+0

No sé a qué te refieres con "Usar una bandera". El problema es que en 'ListenerCallback' estoy usando listener y si otro hilo lo cierra, mientras lo estoy usando, terminaré con excepciones, que mencioné. – prostynick

4

Bueno, hay varias maneras de resolver esto ... Este es un ejemplo simple que usa un semáforo para seguir el trabajo en curso, y una señal que se genera cuando todos los trabajadores terminan. Esto debería darte una idea básica para trabajar.

La solución a continuación no es ideal, idealmente deberíamos adquirir el semáforo antes de llamar a BeginGetContext. Eso hace que el cierre sea más difícil, así que he optado por utilizar este enfoque más simplificado. Si estuviera haciendo esto para "real", probablemente escribiría mi propia gestión de hilos en lugar de confiar en ThreadPool. Esto permitiría un cierre más confiable.

De todos modos aquí está el ejemplo completo:

class TestHttp 
{ 
    static void Main() 
    { 
     using (HttpServer srvr = new HttpServer(5)) 
     { 
      srvr.Start(8085); 
      Console.WriteLine("Press [Enter] to quit."); 
      Console.ReadLine(); 
     } 
    } 
} 


class HttpServer : IDisposable 
{ 
    private readonly int _maxThreads; 
    private readonly HttpListener _listener; 
    private readonly Thread _listenerThread; 
    private readonly ManualResetEvent _stop, _idle; 
    private readonly Semaphore _busy; 

    public HttpServer(int maxThreads) 
    { 
     _maxThreads = maxThreads; 
     _stop = new ManualResetEvent(false); 
     _idle = new ManualResetEvent(false); 
     _busy = new Semaphore(maxThreads, maxThreads); 
     _listener = new HttpListener(); 
     _listenerThread = new Thread(HandleRequests); 
    } 

    public void Start(int port) 
    { 
     _listener.Prefixes.Add(String.Format(@"http://+:{0}/", port)); 
     _listener.Start(); 
     _listenerThread.Start(); 
    } 

    public void Dispose() 
    { Stop(); } 

    public void Stop() 
    { 
     _stop.Set(); 
     _listenerThread.Join(); 
     _idle.Reset(); 

     //aquire and release the semaphore to see if anyone is running, wait for idle if they are. 
     _busy.WaitOne(); 
     if(_maxThreads != 1 + _busy.Release()) 
      _idle.WaitOne(); 

     _listener.Stop(); 
    } 

    private void HandleRequests() 
    { 
     while (_listener.IsListening) 
     { 
      var context = _listener.BeginGetContext(ListenerCallback, null); 

      if (0 == WaitHandle.WaitAny(new[] { _stop, context.AsyncWaitHandle })) 
       return; 
     } 
    } 

    private void ListenerCallback(IAsyncResult ar) 
    { 
     _busy.WaitOne(); 
     try 
     { 
      HttpListenerContext context; 
      try 
      { context = _listener.EndGetContext(ar); } 
      catch (HttpListenerException) 
      { return; } 

      if (_stop.WaitOne(0, false)) 
       return; 

      Console.WriteLine("{0} {1}", context.Request.HttpMethod, context.Request.RawUrl); 
      context.Response.SendChunked = true; 
      using (TextWriter tw = new StreamWriter(context.Response.OutputStream)) 
      { 
       tw.WriteLine("<html><body><h1>Hello World</h1>"); 
       for (int i = 0; i < 5; i++) 
       { 
        tw.WriteLine("<p>{0} @ {1}</p>", i, DateTime.Now); 
        tw.Flush(); 
        Thread.Sleep(1000); 
       } 
       tw.WriteLine("</body></html>"); 
      } 
     } 
     finally 
     { 
      if (_maxThreads == 1 + _busy.Release()) 
       _idle.Set(); 
     } 
    } 
} 
56

Para completar, aquí es lo que se vería como si usted maneja sus propios subprocesos de trabajo:

class HttpServer : IDisposable 
{ 
    private readonly HttpListener _listener; 
    private readonly Thread _listenerThread; 
    private readonly Thread[] _workers; 
    private readonly ManualResetEvent _stop, _ready; 
    private Queue<HttpListenerContext> _queue; 

    public HttpServer(int maxThreads) 
    { 
     _workers = new Thread[maxThreads]; 
     _queue = new Queue<HttpListenerContext>(); 
     _stop = new ManualResetEvent(false); 
     _ready = new ManualResetEvent(false); 
     _listener = new HttpListener(); 
     _listenerThread = new Thread(HandleRequests); 
    } 

    public void Start(int port) 
    { 
     _listener.Prefixes.Add(String.Format(@"http://+:{0}/", port)); 
     _listener.Start(); 
     _listenerThread.Start(); 

     for (int i = 0; i < _workers.Length; i++) 
     { 
      _workers[i] = new Thread(Worker); 
      _workers[i].Start(); 
     } 
    } 

    public void Dispose() 
    { Stop(); } 

    public void Stop() 
    { 
     _stop.Set(); 
     _listenerThread.Join(); 
     foreach (Thread worker in _workers) 
      worker.Join(); 
     _listener.Stop(); 
    } 

    private void HandleRequests() 
    { 
     while (_listener.IsListening) 
     { 
      var context = _listener.BeginGetContext(ContextReady, null); 

      if (0 == WaitHandle.WaitAny(new[] { _stop, context.AsyncWaitHandle })) 
       return; 
     } 
    } 

    private void ContextReady(IAsyncResult ar) 
    { 
     try 
     { 
      lock (_queue) 
      { 
       _queue.Enqueue(_listener.EndGetContext(ar)); 
       _ready.Set(); 
      } 
     } 
     catch { return; } 
    } 

    private void Worker() 
    { 
     WaitHandle[] wait = new[] { _ready, _stop }; 
     while (0 == WaitHandle.WaitAny(wait)) 
     { 
      HttpListenerContext context; 
      lock (_queue) 
      { 
       if (_queue.Count > 0) 
        context = _queue.Dequeue(); 
       else 
       { 
        _ready.Reset(); 
        continue; 
       } 
      } 

      try { ProcessRequest(context); } 
      catch (Exception e) { Console.Error.WriteLine(e); } 
     } 
    } 

    public event Action<HttpListenerContext> ProcessRequest; 
} 
+0

Esto es asombroso: sirve como un gran candidato para probar el rendimiento de HttpListener. – Jonno

+0

¡Muchas gracias por ese pedazo de código! Hay dos pequeños problemas: 1. ProcessRequest podría ser nulo 2. HttpListenerContext no es seguro para los hilos a menos que sea estático –

+0

@MartinMeeser gracias por el comentario. para 1. en lugar de envolverlo en try catch block, podríamos usar este 'ProcessRequest? .Invoke (context);'. Para 2. Sin embargo, si estática no es una opción, ¿qué recomiendas? – JohnTube

0

This utiliza la cola tipada BlockingCollection para atender solicitudes. Es utilizable como es. Debe derivar una clase de esta y anular la respuesta.

using System; 
using System.Collections.Concurrent; 
using System.Net; 
using System.Text; 
using System.Threading; 

namespace Service 
{ 
    class HttpServer : IDisposable 
    { 
     private HttpListener httpListener; 
     private Thread listenerLoop; 
     private Thread[] requestProcessors; 
     private BlockingCollection<HttpListenerContext> messages; 

     public HttpServer(int threadCount) 
     { 
      requestProcessors = new Thread[threadCount]; 
      messages = new BlockingCollection<HttpListenerContext>(); 
      httpListener = new HttpListener(); 
     } 

     public virtual int Port { get; set; } = 80; 

     public virtual string[] Prefixes 
     { 
      get { return new string[] {string.Format(@"http://+:{0}/", Port)}; } 
     } 

     public void Start(int port) 
     { 
      listenerLoop = new Thread(HandleRequests); 

      foreach(string prefix in Prefixes) httpListener.Prefixes.Add(prefix); 

      listenerLoop.Start(); 

      for (int i = 0; i < requestProcessors.Length; i++) 
      { 
       requestProcessors[i] = StartProcessor(i, messages); 
      } 
     } 

     public void Dispose() { Stop(); } 

     public void Stop() 
     { 
      messages.CompleteAdding(); 

      foreach (Thread worker in requestProcessors) worker.Join(); 

      httpListener.Stop(); 
      listenerLoop.Join(); 
     } 

     private void HandleRequests() 
     { 
      httpListener.Start(); 
      try 
      { 
       while (httpListener.IsListening) 
       { 
        Console.WriteLine("The Linstener Is Listening!"); 
        HttpListenerContext context = httpListener.GetContext(); 

        messages.Add(context); 
        Console.WriteLine("The Linstener has added a message!"); 
       } 
      } 
      catch(Exception e) 
      { 
       Console.WriteLine (e.Message); 
      } 
     } 

     private Thread StartProcessor(int number, BlockingCollection<HttpListenerContext> messages) 
     { 
      Thread thread = new Thread(() => Processor(number, messages)); 
      thread.Start(); 
      return thread; 
     } 

     private void Processor(int number, BlockingCollection<HttpListenerContext> messages) 
     { 
      Console.WriteLine ("Processor {0} started.", number); 
      try 
      { 
       for (;;) 
       { 
        Console.WriteLine ("Processor {0} awoken.", number); 
        HttpListenerContext context = messages.Take(); 
        Console.WriteLine ("Processor {0} dequeued message.", number); 
        Response (context); 
       } 
      } catch { } 

      Console.WriteLine ("Processor {0} terminated.", number); 
     } 

     public virtual void Response(HttpListenerContext context) 
     { 
      SendReply(context, new StringBuilder("<html><head><title>NULL</title></head><body>This site not yet implementd.</body></html>")); 
     } 

     public static void SendReply(HttpListenerContext context, StringBuilder responseString) 
     { 
      byte[] buffer = System.Text.Encoding.UTF8.GetBytes(responseString.ToString()); 
      context.Response.ContentLength64 = buffer.Length; 
      System.IO.Stream output = context.Response.OutputStream; 
      output.Write(buffer, 0, buffer.Length); 
      output.Close(); 
     } 
    } 
} 

Esta es una muestra de cómo usarlo. No es necesario usar eventos o bloques de bloqueo. El BlockingCollection resuelve todos estos problemas.

using System; 
using System.Collections.Concurrent; 
using System.IO; 
using System.Net; 
using System.Text; 
using System.Threading; 

namespace Service 
{ 
    class Server 
    { 
    public static void Main (string[] args) 
    { 
     HttpServer Service = new QuizzServer (8); 
     Service.Start (80); 
     for (bool coninute = true; coninute ;) 
     { 
      string input = Console.ReadLine().ToLower(); 
      switch (input) 
      { 
       case "stop": 
        Console.WriteLine ("Stop command accepted."); 
        Service.Stop(); 
        coninute = false; 
        break; 
       default: 
        Console.WriteLine ("Unknown Command: '{0}'.",input); 
        break; 
      } 
     } 
    } 
    } 
} 
Cuestiones relacionadas