Después de jugar con agentes F #, traté de hacer un mapa para reducir su uso.Mapa Reducir con agentes F #
La estructura básica que utilizo es:
- mapa supervisor de la cual pone en cola todo el trabajo que hacer en su estado y recibe solicitud de trabajo de los trabajadores mapa
- reduzcan el supervisor hace lo mismo como mapa supervisor reduce el trabajo
- un montón de mapas y reduce a los trabajadores que mapean y reducen, si uno falla su trabajo lo envía de vuelta al supervisor respectivo para ser reprocesado.
Las preguntas que se preguntan acerca es:
- ¿Tiene esto algún sentido en comparación con un mapa más tradicional (pero muy agradable) reducir al igual que ( parallel-aggregate.aspx) que usa PSeq?
- la forma en que implementé el mapa y reduzco a los trabajadores parece feo ¿hay alguna manera mejor?
- parece que puedo crear 1000 000 trabajadores del mapa y 1000 0000 reducir los trabajadores jajaja, ¿cómo debo elegir estos números, cuantos más mejor?
Muchas gracias,
type Agent<'T> = MailboxProcessor<'T>
//This is the response the supervisor
//gives to the worker request for work
type 'work SupervisorResponse =
| Work of 'work //a piece of work
| NoWork//no work left to do
//This is the message to the supervisor
type 'work WorkMsg =
| ToDo of 'work //piles up work in the Supervisor queue
| WorkReq of AsyncReplyChannel<SupervisorResponse<'work>> //'
//The supervisor agent can be interacted with
type AgentOperation =
| Stop //stop the agent
| Status //yield the current status of supervisor
type 'work SupervisorMsg =
| WorkRel of 'work WorkMsg
| Operation of AgentOperation
//Supervises Map and Reduce workers
module AgentSupervisor=
let getNew (name:string) =
new Agent<SupervisorMsg<'work>>(fun inbox -> //'
let rec loop state = async {
let! msg = inbox.Receive()
match msg with
| WorkRel(m) ->
match m with
| ToDo(work) ->
let newState = work:state
return! loop newState
| WorkReq(replyChannel) ->
match state with
| [] ->
return! loop []
| [item] ->
return! loop []
| (item::remaining) ->
return! loop remaining
| Operation(op) ->
match op with
| Status ->
Console.WriteLine(name+" current Work Queue "+
string (state.Length))
return! loop state
| Stop ->
Console.WriteLine("Stoppped SuperVisor Agent "+name)
loop [])
let stop (agent:Agent<SupervisorMsg<'work>>) = agent.Post(Operation(Stop))
let status (agent:Agent<SupervisorMsg<'work>>) =agent.Post(Operation(Status))
//Code for the workers
type 'success WorkOutcome =
| Success of 'success
| Fail
type WorkerMsg =
| Start
| Stop
| Continue
module AgentWorker =
type WorkerSupervisors<'reduce,'work> =
{ Map:Agent<SupervisorMsg<'work>> ; Reduce:Agent<SupervisorMsg<'reduce>> }
let stop (agent:Agent<WorkerMsg>) = agent.Post(Stop)
let start (agent:Agent<WorkerMsg>) = agent.Start()
let getNewMapWorker(map, supervisors:WorkerSupervisors<'reduce,'work> ) =
new Agent<WorkerMsg>(fun inbox ->
let rec loop() = async {
let! msg = inbox.Receive()
match msg with
| Start -> inbox.Post(Continue)
return! loop()
| Continue ->
let! supervisorOrder =
fun replyChannel ->
match supervisorOrder with
| Work(work) ->
let! res = map work
match res with
| Success(toReduce) ->
| Fail ->
Console.WriteLine("Map Fail")
| NoWork ->
return! loop()
| Stop ->
Console.WriteLine("Map worker stopped")
loop() )
let getNewReduceWorker(reduce,reduceSupervisor:Agent<SupervisorMsg<'work>>)=//'
new Agent<WorkerMsg>(fun inbox ->
let rec loop() = async {
let! msg = inbox.Receive()
match msg with
| Start -> inbox.Post(Continue)
return! loop()
| Continue ->
let! supervisorOrder =
reduceSupervisor.PostAndAsyncReply(fun replyChannel ->
match supervisorOrder with
| Work(work) ->
let! res = reduce work
match res with
| Success(toReduce) -> inbox.Post(Continue)
| Fail ->
| NoWork -> inbox.Post(Continue)
return! loop()
|Stop ->Console.WriteLine("Reduce worker stopped"); return()
open AgentWorker
type MapReduce<'work,'reduce>(numberMap:int ,
numberReduce: int,
toProcess:'work list,
map:'work->Async<'reduce WorkOutcome>,
reduce:'reduce-> Async<unit WorkOutcome>) =
let mapSupervisor= AgentSupervisor.getNew("MapSupervisor")
let reduceSupervisor = AgentSupervisor.getNew("ReduceSupervisor")
let workerSupervisors = {Map = mapSupervisor ; Reduce = reduceSupervisor }
let mapWorkers =
[for i in 1..numberMap ->
AgentWorker.getNewMapWorker(map,workerSupervisors) ]
let reduceWorkers =
[for i in 1..numberReduce ->
AgentWorker.getNewReduceWorker(reduce,workerSupervisors.Reduce) ]
member this.Start() =
//Post work to do
|>List.iter(fun elem -> mapSupervisor.Post(WorkRel(ToDo(elem))))
//Start supervisors
//start workers
List.iter(fun mapper -> mapper |>start) mapWorkers
List.iter(fun reducer ->reducer|>start) reduceWorkers
member this.Status() = (mapSupervisor|>AgentSupervisor.status)
member this.Stop() =
List.map2(fun mapper reducer ->
mapper |>stop; reducer|>stop) mapWorkers reduceWorkers
//Run some tests
let map = function (n:int64) -> async{ return Success(n) }
let reduce = function (toto: int64) -> async{ return Success() }
let mp = MapReduce<int64,int64>(1,1,[for i in 1L..1000000L->i],map,reduce)
FYI, no leo una pregunta que contenga más de 120 líneas de código (con un formato incorrecto). – Brian
@Brian, lo siento por el desorden, he intentado reformatear las cosas un poco, pero todavía tengo el problema de color que hace que todo sea terriblemente feo. Tenga en cuenta que realmente no espero que alguien lea las 120 líneas de mi código, solo lo puse en caso de que pueda aclarar mi pregunta. Gracias – jlezard
Hice algunas ediciones para limpiarlo más. En particular, usa menos espacios en blanco horizontales y verticales (no es necesario desplazarse a la derecha, no líneas en blanco múltiples en una fila). Además, tenga en cuenta el uso de // 'como una forma de evitar SO mal colorear líneas múltiples – Brian