2010-12-13 19 views
12

Después de jugar con agentes F #, traté de hacer un mapa para reducir su uso.Mapa Reducir con agentes F #

La estructura básica que utilizo es:

  • mapa supervisor de la cual pone en cola todo el trabajo que hacer en su estado y recibe solicitud de trabajo de los trabajadores mapa
  • reduzcan el supervisor hace lo mismo como mapa supervisor reduce el trabajo
  • un montón de mapas y reduce a los trabajadores que mapean y reducen, si uno falla su trabajo lo envía de vuelta al supervisor respectivo para ser reprocesado.

Las preguntas que se preguntan acerca es:

  • ¿Tiene esto algún sentido en comparación con un mapa más tradicional (pero muy agradable) reducir al igual que (http://tomasp.net/blog/fsharp- parallel-aggregate.aspx) que usa PSeq?
  • la forma en que implementé el mapa y reduzco a los trabajadores parece feo ¿hay alguna manera mejor?
  • parece que puedo crear 1000 000 trabajadores del mapa y 1000 0000 reducir los trabajadores jajaja, ¿cómo debo elegir estos números, cuantos más mejor?

Muchas gracias,

type Agent<'T> = MailboxProcessor<'T> 

//This is the response the supervisor 
//gives to the worker request for work 
type 'work SupervisorResponse = 
| Work of 'work //a piece of work 
| NoWork//no work left to do 

//This is the message to the supervisor 
type 'work WorkMsg = 
| ToDo of 'work //piles up work in the Supervisor queue 
| WorkReq of AsyncReplyChannel<SupervisorResponse<'work>> //' 

//The supervisor agent can be interacted with 
type AgentOperation = 
| Stop //stop the agent 
| Status //yield the current status of supervisor 

type 'work SupervisorMsg = 
| WorkRel of 'work WorkMsg 
| Operation of AgentOperation 

//Supervises Map and Reduce workers 
module AgentSupervisor= 
    let getNew (name:string) = 
     new Agent<SupervisorMsg<'work>>(fun inbox -> //' 
      let rec loop state = async { 
       let! msg = inbox.Receive() 
       match msg with 
       | WorkRel(m) -> 
        match m with 
        | ToDo(work) -> 
         let newState = work:state 
         return! loop newState 
        | WorkReq(replyChannel) -> 
         match state with 
         | [] -> 
          replyChannel.Reply(NoWork) 
          return! loop [] 
         | [item] -> 
          replyChannel.Reply(Work(item)) 
          return! loop [] 
         | (item::remaining) -> 
          replyChannel.Reply(Work(item)) 
          return! loop remaining 
       | Operation(op) -> 
        match op with 
        | Status -> 
         Console.WriteLine(name+" current Work Queue "+ 
              string (state.Length)) 
         return! loop state 
        | Stop -> 
         Console.WriteLine("Stoppped SuperVisor Agent "+name) 
         return() 
      } 
      loop []) 
    let stop (agent:Agent<SupervisorMsg<'work>>) = agent.Post(Operation(Stop)) 
    let status (agent:Agent<SupervisorMsg<'work>>) =agent.Post(Operation(Status)) 

//Code for the workers 
type 'success WorkOutcome = 
| Success of 'success 
| Fail 

type WorkerMsg = 
| Start 
| Stop 
| Continue 

module AgentWorker = 
    type WorkerSupervisors<'reduce,'work> = 
     { Map:Agent<SupervisorMsg<'work>> ; Reduce:Agent<SupervisorMsg<'reduce>> } 

    let stop (agent:Agent<WorkerMsg>) = agent.Post(Stop) 
    let start (agent:Agent<WorkerMsg>) = agent.Start() 
             agent.Post(Start) 

    let getNewMapWorker(map, supervisors:WorkerSupervisors<'reduce,'work> ) = 
     new Agent<WorkerMsg>(fun inbox -> 
      let rec loop() = async { 
       let! msg = inbox.Receive() 
       match msg with 
       | Start -> inbox.Post(Continue) 
          return! loop() 
       | Continue -> 
        let! supervisorOrder = 
        supervisors.Map.PostAndAsyncReply(
         fun replyChannel -> 
          WorkRel(WorkReq(replyChannel))) 
        match supervisorOrder with 
        | Work(work) -> 
         let! res = map work 
         match res with 
         | Success(toReduce) -> 
          supervisors.Reduce 
           .Post(WorkRel(ToDo(toReduce))) 
         | Fail -> 
          Console.WriteLine("Map Fail") 
          supervisors.Map 
           .Post(WorkRel(ToDo(work))) 
          inbox.Post(Continue) 
        | NoWork -> 
          inbox.Post(Continue) 
          return! loop() 
       | Stop -> 
        Console.WriteLine("Map worker stopped") 
        return() 
       } 
      loop() ) 


    let getNewReduceWorker(reduce,reduceSupervisor:Agent<SupervisorMsg<'work>>)=//' 
     new Agent<WorkerMsg>(fun inbox -> 
      let rec loop() = async { 
       let! msg = inbox.Receive() 
       match msg with 
       | Start -> inbox.Post(Continue) 
          return! loop() 
       | Continue -> 
        let! supervisorOrder = 
         reduceSupervisor.PostAndAsyncReply(fun replyChannel -> 
          WorkRel(WorkReq(replyChannel))) 
        match supervisorOrder with 
        | Work(work) -> 
         let! res = reduce work 
         match res with 
         | Success(toReduce) -> inbox.Post(Continue) 
         | Fail -> 
          Console.WriteLine("ReduceFail") 
          reduceSupervisor.Post(WorkRel(ToDo(work))) 
          inbox.Post(Continue) 
        | NoWork -> inbox.Post(Continue) 
        return! loop() 
       |Stop ->Console.WriteLine("Reduce worker stopped"); return() 
       } 
      loop()) 

open AgentWorker 

type MapReduce<'work,'reduce>(numberMap:int , 
           numberReduce: int, 
           toProcess:'work list, 
           map:'work->Async<'reduce WorkOutcome>, 
           reduce:'reduce-> Async<unit WorkOutcome>) = 

    let mapSupervisor= AgentSupervisor.getNew("MapSupervisor") 
    let reduceSupervisor = AgentSupervisor.getNew("ReduceSupervisor") 

    let workerSupervisors = {Map = mapSupervisor ; Reduce = reduceSupervisor } 

    let mapWorkers = 
     [for i in 1..numberMap -> 
      AgentWorker.getNewMapWorker(map,workerSupervisors) ] 
    let reduceWorkers = 
     [for i in 1..numberReduce -> 
      AgentWorker.getNewReduceWorker(reduce,workerSupervisors.Reduce) ] 

    member this.Start() = 
     //Post work to do 
     toProcess 
     |>List.iter(fun elem -> mapSupervisor.Post(WorkRel(ToDo(elem)))) 
     //Start supervisors 
     mapSupervisor.Start() 
     reduceSupervisor.Start() 
     //start workers 
     List.iter(fun mapper -> mapper |>start) mapWorkers 
     List.iter(fun reducer ->reducer|>start) reduceWorkers 

    member this.Status() = (mapSupervisor|>AgentSupervisor.status) 
          (reduceSupervisor|>AgentSupervisor.status) 
    member this.Stop() = 
     List.map2(fun mapper reducer -> 
      mapper |>stop; reducer|>stop) mapWorkers reduceWorkers 

//Run some tests 
let map = function (n:int64) -> async{ return Success(n) } 

let reduce = function (toto: int64) -> async{ return Success() } 

let mp = MapReduce<int64,int64>(1,1,[for i in 1L..1000000L->i],map,reduce) 

mp.Start() 
mp.Status() 
mp.Stop() 
+5

FYI, no leo una pregunta que contenga más de 120 líneas de código (con un formato incorrecto). – Brian

+0

@Brian, lo siento por el desorden, he intentado reformatear las cosas un poco, pero todavía tengo el problema de color que hace que todo sea terriblemente feo. Tenga en cuenta que realmente no espero que alguien lea las 120 líneas de mi código, solo lo puse en caso de que pueda aclarar mi pregunta. Gracias – jlezard

+2

Hice algunas ediciones para limpiarlo más. En particular, usa menos espacios en blanco horizontales y verticales (no es necesario desplazarse a la derecha, no líneas en blanco múltiples en una fila). Además, tenga en cuenta el uso de // 'como una forma de evitar SO mal colorear líneas múltiples – Brian

Respuesta

6

me gusta usar MailboxProcessor para reducir la parte del algoritmo, y el bloque asíncrono que se invoca con Async.Parallel de la parte del mapa. Hace las cosas más explícitas, ofreciéndole un control más preciso sobre el manejo de excepciones, los tiempos de espera y la cancelación.

El siguiente código fue diseñado con la ayuda de Brian, y con la ayuda de su excelente bloque F # destacando el complemento "F # Depth Colorizer" para VS2010.

Este código está destinado a extraer los canales RSS del servidor meteorológico de yahoo en un patrón map-reduce. Demuestra cómo podemos controlar el flujo de ejecución desde el exterior del algoritmo real.

fetchWeather es la parte del mapa, y mailboxLoop es la parte reducida del algoritmo.

#r "System.Xml.Linq.dll" 

#r "FSharp.PowerPack.dll" 

open System 
open System.Diagnostics 
open System.IO 
open System.Linq 
open System.Net 
open System.Xml.Linq 

open Microsoft.FSharp.Control.WebExtensions 

type Weather (city, region, temperature) = class 
    member x.City = city 
    member x.Region = region 
    member x.Temperature : int = temperature 

    override this.ToString() = 
     sprintf "%s, %s: %d F" this.City this.Region this.Temperature 
end 

type MessageForActor = 
    | ProcessWeather of Weather 
    | ProcessError of int 
    | GetResults of (Weather * Weather * Weather list) AsyncReplyChannel 

let parseRss woeid (rssStream : Stream) = 
    let xn str = XName.Get str 
    let yweather elementName = XName.Get(elementName, "http://xml.weather.yahoo.com/ns/rss/1.0") 

    let channel = (XDocument.Load rssStream).Descendants(xn "channel").First() 
    let location = channel.Element(yweather "location") 
    let condition = channel.Element(xn "item").Element(yweather "condition") 

    // If the RSS server returns error, condition XML element won't be available. 
    if not(condition = null) then 
     let temperature = Int32.Parse(condition.Attribute(xn "temp").Value) 
     ProcessWeather(new Weather(
        location.Attribute(xn "city").Value, 
        location.Attribute(xn "region").Value, 
        temperature)) 
    else 
     ProcessError(woeid) 

let fetchWeather (actor : MessageForActor MailboxProcessor) woeid = 
    async { 
     let rssAddress = sprintf "http://weather.yahooapis.com/forecastrss?w=%d&u=f" woeid 
     let webRequest = WebRequest.Create rssAddress 
     use! response = webRequest.AsyncGetResponse() 
     use responseStream = response.GetResponseStream() 
     let weather = parseRss woeid responseStream 
     //do! Async.Sleep 1000 // enable this line to see amplified timing that proves concurrent flow 
     actor.Post(weather) 
    } 

let mailboxLoop initialCount = 
    let chooseCityByTemperature op (x : Weather) (y : Weather) = 
     if op x.Temperature y.Temperature then x else y 

    let sortWeatherByCityAndState (weatherList : Weather list) = 
     weatherList 
     |> List.sortWith (fun x y -> x.City.CompareTo(y.City)) 
     |> List.sortWith (fun x y -> x.Region.CompareTo(y.Region)) 

    MailboxProcessor.Start(fun inbox -> 
     let rec loop minAcc maxAcc weatherList remaining = 
     async { 
      let! message = inbox.Receive() 
      let remaining = remaining - 1 

      match message with 
      | ProcessWeather weather -> 
       let colderCity = chooseCityByTemperature (<) minAcc weather 
       let warmerCity = chooseCityByTemperature (>) maxAcc weather 
       return! loop colderCity warmerCity (weather :: weatherList) remaining 
      | ProcessError woeid -> 
       let errorWeather = new Weather(sprintf "Error with woeid=%d" woeid, "ZZ", 99999) 
       return! loop minAcc maxAcc (errorWeather :: weatherList) remaining 
      | GetResults replyChannel -> 
       replyChannel.Reply(minAcc, maxAcc, sortWeatherByCityAndState weatherList) 
     } 

     let minValueInitial = new Weather("", "", Int32.MaxValue) 
     let maxValueInitial = new Weather("", "", Int32.MinValue) 
     loop minValueInitial maxValueInitial [] initialCount 
    ) 

let RunSynchronouslyWithExceptionAndTimeoutHandlers computation = 
    let timeout = 30000 
    try 
     Async.RunSynchronously(Async.Catch(computation), timeout) 
     |> function Choice1Of2 answer    -> answer |> ignore 
       | Choice2Of2 (except : Exception) -> printfn "%s" except.Message; printfn "%s" except.StackTrace; exit -4 
    with 
    | :? System.TimeoutException -> printfn "Timed out waiting for results for %d seconds!" <| timeout/1000; exit -5 

let main = 
    // Should have script name, sync/async select, and at least one woeid 
    if fsi.CommandLineArgs.Length < 3 then 
     printfn "Expecting at least two arguments!" 
     printfn "There were %d arguments" (fsi.CommandLineArgs.Length - 1) 
     exit -1 

    let woeids = 
     try 
     fsi.CommandLineArgs 
     |> Seq.skip 2 // skip the script name and sync/async select 
     |> Seq.map Int32.Parse 
     |> Seq.toList 
     with 
     | except -> printfn "One of supplied arguments was not an integer: %s" except.Message; exit -2 

    let actor = mailboxLoop woeids.Length 

    let processWeatherItemsConcurrently woeids = 
     woeids 
     |> Seq.map (fetchWeather actor) 
     |> Async.Parallel 
     |> RunSynchronouslyWithExceptionAndTimeoutHandlers 

    let processOneWeatherItem woeid = 
     woeid 
     |> fetchWeather actor 
     |> RunSynchronouslyWithExceptionAndTimeoutHandlers 

    let stopWatch = new Stopwatch() 
    stopWatch.Start() 
    match fsi.CommandLineArgs.[1].ToUpper() with 
    | "C" -> printfn "Concurrent execution: "; processWeatherItemsConcurrently woeids 
    | "S" -> printfn "Synchronous execution: "; woeids |> Seq.iter processOneWeatherItem 
    | _ -> printfn "Unexpected run options!"; exit -3 

    let (min, max, weatherList) = actor.PostAndReply GetResults 
    stopWatch.Stop() 
    assert (weatherList.Length = woeids.Length) 

    printfn "{" 
    weatherList |> List.iter (printfn " %O") 
    printfn "}" 
    printfn "Coldest place: %O" min 
    printfn "Hottest place: %O" max 
    printfn "Completed in %d millisec" stopWatch.ElapsedMilliseconds 

main 
Cuestiones relacionadas