creé una sola vez para mí hace un par de años cuando llegué a una máquina de 8 núcleos, pero no estaba terriblemente feliz con él. Nunca conseguí que fuera tan simple de usar como esperaba, y las tareas que requerían mucha memoria no se escalaron bien.
Si usted no recibe ningún reales respuestas que puedo compartir más, pero el núcleo de la misma es:
public class LocalMapReduce<TMapInput, TMapOutput, TOutput> {
private int m_threads;
private Mapper<TMapInput, TMapOutput> m_mapper;
private Reducer<TMapOutput, TOutput> m_reducer;
...
public TOutput mapReduce(Iterator<TMapInput> inputIterator) {
ExecutorService pool = Executors.newFixedThreadPool(m_threads);
Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>();
while (inputIterator.hasNext()) {
TMapInput m = inputIterator.next();
Future<TMapOutput> f = pool.submit(m_mapper.makeWorker(m));
futureSet.add(f);
Thread.sleep(10);
}
while (!futureSet.isEmpty()) {
Thread.sleep(5);
for (Iterator<Future<TMapOutput>> fit = futureSet.iterator(); fit.hasNext();) {
Future<TMapOutput> f = fit.next();
if (f.isDone()) {
fit.remove();
TMapOutput x = f.get();
m_reducer.reduce(x);
}
}
}
return m_reducer.getResult();
}
}
EDIT: Basado en un comentario, a continuación es una versión sin sleep
. El truco es usar CompletionService
que esencialmente proporciona una cola de bloqueo de Future
s completa.
public class LocalMapReduce<TMapInput, TMapOutput, TOutput> {
private int m_threads;
private Mapper<TMapInput, TMapOutput> m_mapper;
private Reducer<TMapOutput, TOutput> m_reducer;
...
public TOutput mapReduce(Collection<TMapInput> input) {
ExecutorService pool = Executors.newFixedThreadPool(m_threads);
CompletionService<TMapOutput> futurePool =
new ExecutorCompletionService<TMapOutput>(pool);
Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>();
for (TMapInput m : input) {
futureSet.add(futurePool.submit(m_mapper.makeWorker(m)));
}
pool.shutdown();
int n = futureSet.size();
for (int i = 0; i < n; i++) {
m_reducer.reduce(futurePool.take().get());
}
return m_reducer.getResult();
}
También habrá dado cuenta de que esto es un mapa a reducir el algoritmo muy destilada, incluyendo un solo reducir trabajador que realiza tanto la operación de reducir y fusionar.
me encontré con este video que ANUNCIA sobre nueva característica en Java 8. parece que habrá API mapreduce en la nueva versión. http://www.youtube.com/watch?v=47_Em-zc7_Q – gigadot
Tengo curiosidad por saber cuál es su solución actual para este problema. Solo estoy buscando formas rápidas y fáciles de hacer Lists.transform (función) en paralelo en una sola máquina. – Snekse
LeoTask funciona. Es una ejecución paralela de tareas y un marco de agregación de resultados en una máquina multinúcleo. https://github.com/mleoking/leotask –