2010-11-30 13 views
7

(Cumplir con un ejemplo común con asíncrono ha podido recuperar de muchas páginas web)¿Cómo lograr la asincronía en lugar de paralelismo en F #

¿Cómo podría escindir múltiples (cientos) de las solicitudes de páginas web de forma asíncrona, y luego esperar a que todos solicitudes para completar antes de ir al siguiente paso? Async.AsParallel procesa algunas solicitudes a la vez, controladas por el número de núcleos en la CPU. Tomar una página web no es una operación vinculada a la CPU. No estoy satisfecho con la aceleración de Async.AsParallel, estoy buscando alternativas.

Intenté conectar los puntos entre Async.StartAsTask y Task []. WaitAll. Instintivamente, escribí el siguiente código, pero no compila.

let processItemsConcurrently (items : int seq) = 
    let tasks = items |> Seq.map (fun item -> Async.StartAsTask(fetchAsync item)) 
    Tasks.Task.WaitAll(tasks) 

¿Cómo te acercarías a esto?

Respuesta

7

Async.Parallel es casi definitivamente aquí. No estoy seguro de lo que no te gusta; la fuerza de F # asyncs radica más en la informática asincrónica que en las tareas vinculadas a la CPU paralelas a tareas (que está más adaptada a Task s y .NET 4.0 TPL). Aquí hay un ejemplo completo:

open System.Diagnostics 
open System.IO 
open System.Net 
open Microsoft.FSharp.Control.WebExtensions 

let sites = [| 
    "http://bing.com" 
    "http://google.com" 
    "http://cnn.com" 
    "http://stackoverflow.com" 
    "http://yahoo.com" 
    "http://msdn.com" 
    "http://microsoft.com" 
    "http://apple.com" 
    "http://nfl.com" 
    "http://amazon.com" 
    "http://ebay.com" 
    "http://expedia.com" 
    "http://twitter.com" 
    "http://reddit.com" 
    "http://hulu.com" 
    "http://youtube.com" 
    "http://wikipedia.org" 
    "http://live.com" 
    "http://msn.com" 
    "http://wordpress.com" 
    |] 

let print s = 
    // careful, don't create a synchronization bottleneck by printing 
    //printf "%s" s 
    () 

let printSummary info fullTimeMs = 
    Array.sortInPlaceBy (fun (i,_,_) -> i) info 
// for i, size, time in info do 
//  printfn "%2d %7d %5d" i size time 
    let longest = info |> Array.map (fun (_,_,time) -> time) |> Array.max 
    printfn "longest request took %dms" longest 
    let bytes = info |> Array.sumBy (fun (_,size,_) -> float size) 
    let seconds = float fullTimeMs/1000. 
    printfn "sucked down %7.2f KB/s" (bytes/1024.0/seconds) 

let FetchAllSync() = 
    let allsw = Stopwatch.StartNew() 
    let info = sites |> Array.mapi (fun i url -> 
     let sw = Stopwatch.StartNew() 
     print "S" 
     let req = WebRequest.Create(url) 
     use resp = req.GetResponse() 
     use stream = resp.GetResponseStream() 
     use reader = new StreamReader(stream, 
          System.Text.Encoding.UTF8, true, 4096) 
     print "-" 
     let contents = reader.ReadToEnd() 
     print "r" 
     i, contents.Length, sw.ElapsedMilliseconds) 
    let time = allsw.ElapsedMilliseconds 
    printSummary info time 
    time, info |> Array.sumBy (fun (_,size,_) -> size) 

let FetchAllAsync() = 
    let allsw = Stopwatch.StartNew() 
    let info = sites |> Array.mapi (fun i url -> async { 
     let sw = Stopwatch.StartNew() 
     print "S" 
     let req = WebRequest.Create(url) 
     use! resp = req.AsyncGetResponse() 
     use stream = resp.GetResponseStream() 
     use reader = new AsyncStreamReader(stream, // F# PowerPack 
          System.Text.Encoding.UTF8, true, 4096) 
     print "-" 
     let! contents = reader.ReadToEnd() // in F# PowerPack 
     print "r" 
     return i, contents.Length, sw.ElapsedMilliseconds }) 
        |> Async.Parallel 
        |> Async.RunSynchronously 
    let time = allsw.ElapsedMilliseconds 
    printSummary info time 
    time, info |> Array.sumBy (fun (_,size,_) -> size) 

// By default, I think .NET limits you to 2 open connections at once 
ServicePointManager.DefaultConnectionLimit <- sites.Length 

for i in 1..3 do // to warmup and show variance 
    let time1,r1 = FetchAllSync() 
    printfn "Sync took %dms, result was %d" time1 r1 
    let time2,r2 = FetchAllAsync() 
    printfn "Async took %dms, result was %d (speedup=%2.2f)" 
     time2 r2 (float time1/ float time2) 
    printfn "" 

En mi caja de 4 núcleos, esto siempre da una aceleración de casi 4x.

EDITAR

En respuesta a su comentario, He actualizado el código. Tienes razón en que he agregado más sitios y no estoy viendo la aceleración esperada (aún manteniéndome estable alrededor de 4x). He empezado a añadir un poco por encima de la salida de depuración, continuará investigando para ver si algo más está estrangulando las conexiones ...

EDITAR

Editted el código de nuevo. Bueno, encontré lo que podría ser el cuello de botella. Aquí está la implementación de AsyncReadToEnd en el PowerPack:

type System.IO.StreamReader with 
    member s.AsyncReadToEnd() = 
     FileExtensions.UnblockViaNewThread (fun() -> s.ReadToEnd()) 

En otras palabras, sólo se bloquea un hilo de subprocesos y lee de forma sincrónica. Argh !!! Déjame ver si puedo evitar eso.

EDITAR

Ok, la AsyncStreamReader en el PowerPack hace lo correcto, y estoy usando ahora.

Sin embargo, la cuestión clave parece ser varianza.

Cuando pulse, digamos, cnn.com, la mayoría de las veces el resultado volverá en 500ms. Pero de vez en cuando obtienes esa solicitud que requiere 4s, y esto, por supuesto, puede acabar con el aparente rendimiento asincrónico, ya que el tiempo total es el momento de la solicitud desafortunada.

Ejecutando el programa anterior, veo aceleraciones de aproximadamente 2,5x a 9x en mi caja de 2 núcleos en casa. Sin embargo, es muy variable. Todavía es posible que haya algún cuello de botella en el programa que me he perdido, pero creo que la varianza de la web puede dar cuenta de todo lo que estoy viendo en este momento.

+0

Uso un código bastante similar. Como una verificación de cordura, agregué un "do! Async, Sleep 1000" en un lugar estratégico. Tengo dos de las cuatro HT desactivadas, y obtengo una aceleración de 4 veces con 4 solicitudes de página Y con 20 solicitudes. Que te dice eso? Se está haciendo tarde aquí. Probaré tu truco DefaultConnectionLimit mañana. – GregC

+0

Agregué un código de muestra que evita las API de acceso web como una prueba de concepto. Como dijiste, Async.Paralelo parece ser el adecuado aquí. – GregC

+0

Para mañana, echaremos un vistazo a las API de acceso web que llegan a localhost. – GregC

2

Usando las extensiones reactivas para .NET combinadas con F #, puede escribir una solución muy elegante: mire la muestra en http://blog.paulbetts.org/index.php/2010/11/16/making-async-io-work-for-you-reactive-style/ (esto usa C#, pero usar F # también es fácil; la tecla usa los métodos Begin/End en lugar del método de sincronización, que incluso si puede compilarlo, bloqueará n hilos de ThreadPool innecesariamente, en lugar de que Threadpool solo seleccione las rutinas de finalización a medida que entren)

+0

estoy especialmente interesado en la creación de un punto de sincronización al final de todas las solicitudes asíncronas, por lo que se puede contabilizar los resultados. Tampoco estoy seguro acerca de su comentario sobre el bloqueo del grupo de subprocesos. – GregC

+0

Observable.SelectMany hará la agregación de todas las solicitudes asíncronas. Piénselo: si usa los métodos de sincronización de HttpWebRequest, ¿cómo podría no bloquear el grupo de subprocesos? Algún hilo en algún lado está esperando en HttpWebResponse, a menos que uses los métodos asincrónicos Begin/End. –

+0

¿Por qué los downmods? Estoy bien con ellos, si hay una explicación adjunta ... –

1

No soy un chico F #, pero desde una perspectiva pura de .NET, lo que estás buscando es TaskFactory :: FromAsync, donde la llamada asíncrona que estarías envolviendo en una tarea sería algo así como HttpRequest :: BeginGetResponse. También puede resumir el modelo de EAP que expone WebClient utilizando TaskCompletionSource. Más información sobre estos dos topics here en MSDN.

Afortunadamente, con este conocimiento puede encontrar el enfoque de F # nativo más cercano para lograr lo que está tratando de hacer.

+0

TaskFactory.ContinueWhenAll() (http://msdn.microsoft.com/en-us/library/dd321473.aspx) es una entidad familiar. Sin embargo, la promesa de F # es que oculta el estilo de continuación de paso, haciendo que el algoritmo real se destaque. – GregC

2

Mi apuesta es que la aceleración que estás experimentando no es lo suficientemente significativa para tu gusto porque estás usando un subtipo de WebRequest o una clase que depende de él (como WebClient).
Si ese es el caso, debe configurar el MaxConnection en ConnectionManagementElement (y le sugiero que solo lo configure si es necesario, de lo contrario se convertirá en una operación bastante lenta) a un valor alto, dependiendo de la cantidad de conexiones simultáneas que Quiero iniciar desde tu aplicación.

+1

Vea también 'ServicePointManager.DefaultConnectionLimit' en mi código. – Brian

1

Aquí hay algunos códigos que evitan las incógnitas, como la latencia de acceso web. Estoy obteniendo una utilización de la CPU inferior al 5% y una eficacia de alrededor del 60-80% para las rutas de sincronización y de código asíncrono.

open System.Diagnostics 

let numWorkers = 200 
let asyncDelay = 50 

let main = 
    let codeBlocks = [for i in 1..numWorkers -> 
         async { do! Async.Sleep asyncDelay } ] 

    while true do 
     printfn "Concurrent started..." 
     let sw = new Stopwatch() 
     sw.Start() 
     codeBlocks |> Async.Parallel |> Async.RunSynchronously |> ignore 
     sw.Stop() 
     printfn "Concurrent in %d millisec" sw.ElapsedMilliseconds 
     printfn "efficiency: %d%%" (int64 (asyncDelay * 100)/sw.ElapsedMilliseconds) 

     printfn "Synchronous started..." 
     let sw = new Stopwatch() 
     sw.Start() 
     for codeBlock in codeBlocks do codeBlock |> Async.RunSynchronously |> ignore 
     sw.Stop() 
     printfn "Synchronous in %d millisec" sw.ElapsedMilliseconds 
     printfn "efficiency: %d%%" (int64 (asyncDelay * numWorkers * 100)/sw.ElapsedMilliseconds) 

main 
Cuestiones relacionadas