2012-02-07 14 views
9

Estoy usando la biblioteca Netty (versión 4 de GitHub). Funciona muy bien en Scala, pero espero que mi biblioteca pueda usar el estilo de continuación de paso para la espera asíncrona.Uso de continuaciones de scala con oyentes netty/NIO

Tradicionalmente con Netty que haría algo como esto (un ejemplo asincrónica operación de conexión):

//client is a ClientBootstrap 
val future:ChannelFuture = client.connect(remoteAddr); 
future.addListener(new ChannelFutureListener { 
    def operationComplete (f:ChannelFuture) = { 
     //here goes the code that happens when the connection is made 
    } 
}) 

Si va a implementar una biblioteca (que soy), entonces usted tiene básicamente tres opciones simples para acceder al usuario de la biblioteca para hacer cosas después de realizada la conexión:

  1. Simplemente envíe la ChannelFuture de su método de conexión y permitir que el acuerdo del usuario con él - esto no proporciona mucha abstracción de Netty.
  2. Tome un ChannelFutureListener como parámetro de su método de conexión y agréguelo como detector al ChannelFuture.
  3. Tome un objeto función de devolución de llamada como parámetro del método de conexión y llamada que desde dentro de la ChannelFutureListener que se crea (esto haría para un estilo de devolución de llamada impulsada por algo así como Node.js)

Lo que soy intentar hacer es una cuarta opción; No lo incluí en el conteo anterior porque no es simple.

Quiero usar continuaciones Scala delimitado para hacer que el uso de la biblioteca sea algo así como una biblioteca de bloqueo, pero será sin bloqueo detrás de las escenas:

class MyLibraryClient { 
    def connect(remoteAddr:SocketAddress) = { 
     shift { retrn: (Unit => Unit) => { 
       val future:ChannelFuture = client.connect(remoteAddr); 
       future.addListener(new ChannelFutureListener { 
        def operationComplete(f:ChannelFuture) = { 
         retrn(); 
        } 
       }); 
      } 
     } 
    } 
} 

imaginar otras operaciones de lectura/escritura están aplicando en la misma moda El objetivo de este ser que el código del usuario puede verse más como esto:

reset { 
    val conn = new MyLibraryClient(); 
    conn.connect(new InetSocketAddress("127.0.0.1", 1337)); 
    println("This will happen after the connection is finished"); 
} 

En otras palabras, el programa se verá como un simple programa de estilo de bloqueo pero detrás de las escenas que no habrá ningún bloqueo o roscar .

El problema con el que me estoy encontrando es que no entiendo completamente cómo funciona el tipeo de continuaciones delimitadas. Cuando intento implementarlo de la manera anterior, el compilador se queja de que mi implementación operationComplete realmente devuelve Unit @scala.util.continuations.cpsParam[Unit,Unit => Unit] en lugar de Unit. Entiendo que hay una especie de "gotcha" en Scala's CPS en que debe anotar un tipo de devolución del método shift con @suspendable, que se pasa por la pila de llamadas hasta el reset, pero no parece haber ninguna forma de conciliar eso con una biblioteca de Java preexistente que no tiene ningún concepto de continuaciones delimitadas.

Siento que realmente debe haber una forma de evitar esto: si Swarm puede serializar continuaciones y atascarlas en la red para computarlas en otro lugar, entonces debe ser posible simplemente invocar una continuación de una clase Java preexistente. Pero no puedo entender cómo se puede hacer. ¿Tendría que volver a escribir partes enteras de Netty en Scala para que esto ocurra?

+0

No sé Cómo corregir las cosas Scala pero sugiero contra de su idea. Déjame decirte por qué. Pero haciendo que el usuario "desconozca" la naturaleza asíncrona de su biblioteca, le dirá que está bien que "bloquee" las llamadas en el código de escucha. De hecho, él no sabría que incluso escribe su código en un oyente. Hacer una llamada de bloqueo en un oyente puede generar todo tipo de problemas. El problema que verá la mayoría de las veces es que "ralentiza" otras tareas y limita el rendimiento. –

+1

Tiene un buen punto, pero no estoy de acuerdo. Creo que el usuario de mi biblioteca, si es que hay alguno además de mí, probablemente tendrá que entender qué hace 'reset' para empezar, y así comprenderá que las llamadas no son de bloqueo. Esto es realmente solo una manera de A) obtener una comprensión más profunda de las continuaciones delimitadas, y B) experimentar escribiendo esencialmente el código de devolución de llamada de una manera más limpia. – Jeremy

Respuesta

4

Encontré esta explicación de Scala's continuations extremadamente útil cuando comencé. En particular, preste atención a las partes donde explica shift[A, B, C] y reset[B, C]. Agregar una dummy null como la última declaración de operationComplete debería ayudar.

Por cierto, necesita invocar retrn() dentro de otra reset si puede tener un shift anidado en su interior.

Editar: Aquí está un ejemplo de trabajo

import scala.util.continuations._ 
import java.util.concurrent.Executors 

object Test { 

    val execService = Executors.newFixedThreadPool(2) 

    def main(args: Array[String]): Unit = { 
    reset { 
     val conn = new MyLibraryClient(); 
     conn.connect("127.0.0.1"); 
     println("This will happen after the connection is finished"); 
    } 
    println("Outside reset"); 
    } 
} 

class ChannelFuture { 
    def addListener(listener: ChannelFutureListener): Unit = { 
    val future = this 
    Test.execService.submit(new Runnable { 
     def run(): Unit = { 
     listener.operationComplete(future) 
     } 
    }) 
    } 
} 

trait ChannelFutureListener { 
    def operationComplete(f: ChannelFuture): Unit 
} 

class MyLibraryClient { 
    def connect(remoteAddr: String): [email protected][Unit] = { 
    shift { 
     retrn: (Unit => Unit) => { 
     val future: ChannelFuture = new ChannelFuture() 
     future.addListener(new ChannelFutureListener { 
      def operationComplete(f: ChannelFuture): Unit = { 
      println("operationComplete starts") 
      retrn(); 
      null 
      } 
     }); 
     } 
    } 
    } 
} 

con una posible salida:

Outside reset 
operationComplete starts 
This will happen after the connection is finished 
+0

Esto de hecho hace feliz al compilador e incluso parece funcionar correctamente. Creo que la clave es que movió el 'shift' fuera del anónimo' ChannelFutureListener' y usó el cierre para llamar a la continuación desde 'operationComplete '. No estoy seguro de entender por qué esto funciona y el otro no, pero lo tomaré. ¡Gracias! – Jeremy

+0

Y esa es una muy buena lectura sobre las continuaciones de Scala. Deben eliminar los ejemplos sin valor de la página de scala-lang.org sobre las continuaciones y reemplazarlos por el artículo que vinculó. – Jeremy

+0

@Jeremy, sí, ese artículo es muy bueno :) – shams

Cuestiones relacionadas