2010-12-03 29 views
10

Estoy experimentando con ZeroMQ y tratando de obtener algo funcionando. Lo primero que pensé fue configurar un REP/REQ usando el transporte interno para ver si podía enviar mensajes entre dos hilos. La mayoría del código a continuación se toma de los ejemplos clzmq, pero parece que no funciona.Usando ZeroMQ con C# con inproc transporte

Tanto el servidor como el cliente están vinculados al transporte, pero cuando el cliente intenta hacer un Send, bloquea y se queda allí. No tengo experiencia con ZeroMQ, así que no estoy seguro de dónde buscar primero, cualquier ayuda sería muy apreciada. Aquí está el infractor (ofensivo) código:

using System; 
using System.Diagnostics; 
using System.Threading; 
using NUnit.Framework; 
using ZMQ; 

namespace PostBox 
{ 
    [TestFixture] 
    public class Class1 
    { 

     private const string Address = "inproc://test"; 
     private const uint MessageSize = 10; 
     private const int RoundtripCount = 100; 

     [Test] 
     public void Should() 
     { 
      var clientThread = new Thread(StartClient); 
      clientThread.Start(); 

      var serverThread = new Thread(StartServer); 
      serverThread.Start(); 

      clientThread.Join(); 
      serverThread.Join(); 

      Console.WriteLine("Done with life"); 
     } 

     private void StartServer() 
     { 


      // Initialise 0MQ infrastructure 
      using (var ctx = new Context(1)) 
      { 
       using (var skt = ctx.Socket(SocketType.REP)) 
       { 
        skt.Bind(Address); 

        Console.WriteLine("Server has bound"); 

        // Bounce the messages. 
        for (var i = 0; i < RoundtripCount; i++) 
        { 
         var msg = skt.Recv(); 
         Debug.Assert(msg.Length == MessageSize); 
         skt.Send(msg); 
        } 
        Thread.Sleep(1000); 
       } 
      } 

      Console.WriteLine("Done with server"); 
     } 

     private void StartClient() 
     { 
      Thread.Sleep(2000); 

      // Initialise 0MQ infrastructure 
      using (var ctx = new Context(1)) 
      { 
       using (var skt = ctx.Socket(SocketType.REQ)) 
       { 
        skt.Bind(Address); 

        Console.WriteLine("Client has bound"); 

        // Create a message to send. 
        var msg = new byte[MessageSize]; 

        // Start measuring the time. 
        var watch = new Stopwatch(); 
        watch.Start(); 

        // Start sending messages. 
        for (var i = 0; i < RoundtripCount; i++) 
        { 
         skt.Send(msg); 
         msg = skt.Recv(); 
         Debug.Assert(msg.Length == MessageSize); 

         Console.Write("."); 
        } 

        // Stop measuring the time. 
        watch.Stop(); 
        var elapsedTime = watch.ElapsedTicks; 

        // Print out the test parameters. 
        Console.WriteLine("message size: " + MessageSize + " [B]"); 
        Console.WriteLine("roundtrip count: " + RoundtripCount); 

        // Compute and print out the latency. 
        var latency = (double)(elapsedTime)/RoundtripCount/2 * 
         1000000/Stopwatch.Frequency; 
        Console.WriteLine("Your average latency is {0} [us]", 
         latency.ToString("f2")); 
       } 
      } 

      Console.WriteLine("Done with client"); 
     } 

    } 
} 

Editar:

Tengo este trabajo con la ayuda de la respuesta de abajo, pero también me necesario para cambiar un Bind a un Connect, lo cual tiene sentido cuando lo piensas, ya que tenemos un servidor vinculante para un transporte local y un cliente que se conecta a un transporte remoto. Aquí está el código actualizado:

using System; 
using System.Diagnostics; 
using System.Threading; 
using NUnit.Framework; 
using ZMQ; 

namespace PostBox 
{ 
    [TestFixture] 
    public class Class1 
    { 

     private const string Address = "inproc://test"; 
     private const uint MessageSize = 10; 
     private const int RoundtripCount = 100; 

     private static Context ctx; 

     [Test] 
     public void Should() 
     { 
      using (ctx = new Context(1)) 
      { 
       var clientThread = new Thread(StartClient); 
       clientThread.Start(); 

       var serverThread = new Thread(StartServer); 
       serverThread.Start(); 

       clientThread.Join(); 
       serverThread.Join(); 

       Console.WriteLine("Done with life"); 
      } 
     } 

     private void StartServer() 
     { 
      try 
      { 
       using (var skt = ctx.Socket(SocketType.REP)) 
       { 
        skt.Bind(Address); 

        Console.WriteLine("Server has bound"); 

        // Bounce the messages. 
        for (var i = 0; i < RoundtripCount; i++) 
        { 
         var msg = skt.Recv(); 
         Debug.Assert(msg.Length == MessageSize); 
         skt.Send(msg); 
        } 
        Thread.Sleep(1000); 
       } 

       Console.WriteLine("Done with server"); 
      } 
      catch (System.Exception e) 
      { 
       Console.WriteLine(e.Message); 
      } 
     } 

     private void StartClient() 
     { 
      Thread.Sleep(2000); 

      try 
      { 
       // Initialise 0MQ infrastructure 
       using (var skt = ctx.Socket(SocketType.REQ)) 
       { 
        skt.Connect(Address); 

        Console.WriteLine("Client has bound"); 

        // Create a message to send. 
        var msg = new byte[MessageSize]; 

        // Start measuring the time. 
        var watch = new Stopwatch(); 
        watch.Start(); 

        // Start sending messages. 
        for (var i = 0; i < RoundtripCount; i++) 
        { 
         skt.Send(msg); 
         msg = skt.Recv(); 
         Debug.Assert(msg.Length == MessageSize); 

         Console.Write("."); 
        } 

        // Stop measuring the time. 
        watch.Stop(); 
        var elapsedTime = watch.ElapsedTicks; 

        // Print out the test parameters. 
        Console.WriteLine("message size: " + MessageSize + " [B]"); 
        Console.WriteLine("roundtrip count: " + RoundtripCount); 

        // Compute and print out the latency. 
        var latency = (double)(elapsedTime)/RoundtripCount/2 * 
            1000000/Stopwatch.Frequency; 
        Console.WriteLine("Your average latency is {0} [us]", 
             latency.ToString("f2")); 
       } 

       Console.WriteLine("Done with client"); 
      } 
      catch (System.Exception e) 
      { 
       Console.WriteLine(e.Message); 
      } 
     } 

    } 
} 

Respuesta

14

Creo que ambos subprocesos necesitan usar el mismo Contexto. La guía Zeromq recomienda no utilizar más de un contexto en un proceso. Crea un contexto, comparte ese contexto entre ambos subprocesos. Esto debería funcionar.

De http://zguide.zeromq.org/chapter:all

debe crear un objeto 'contexto' para su proceso, y pasar a que todas las discusiones. El contexto recoge el estado de ØMQ. Para crear una conexión en el proceso: transporte, tanto el servidor como el cliente deben compartir el mismo objeto de contexto.

+0

Esto fue muy útil, ¡gracias! – jonnii

2

Solo un extremo puede enlazar el otro debe Conectarse, puede tener varias conexiones.