J'ai créé un one-off pour moi il y a quelques années lorsque j'ai eu un 8-core de la machine, mais je n'étais pas très heureux avec elle. Je n'ai jamais réussi à être aussi simple à utiliser que je l'avais espéré, et la mémoire des tâches à forte intensité n'était pas bien.
Si vous n'obtenez pas de réelles réponses, je peux partager plus, mais l'essentiel est:
public class LocalMapReduce<TMapInput, TMapOutput, TOutput> {
private int m_threads;
private Mapper<TMapInput, TMapOutput> m_mapper;
private Reducer<TMapOutput, TOutput> m_reducer;
...
public TOutput mapReduce(Iterator<TMapInput> inputIterator) {
ExecutorService pool = Executors.newFixedThreadPool(m_threads);
Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>();
while (inputIterator.hasNext()) {
TMapInput m = inputIterator.next();
Future<TMapOutput> f = pool.submit(m_mapper.makeWorker(m));
futureSet.add(f);
Thread.sleep(10);
}
while (!futureSet.isEmpty()) {
Thread.sleep(5);
for (Iterator<Future<TMapOutput>> fit = futureSet.iterator(); fit.hasNext();) {
Future<TMapOutput> f = fit.next();
if (f.isDone()) {
fit.remove();
TMapOutput x = f.get();
m_reducer.reduce(x);
}
}
}
return m_reducer.getResult();
}
}
EDIT: Basé sur un commentaire, ci-dessous est une version sans sleep
. L'astuce est d'utiliser CompletionService
qui fournit essentiellement un blocage de la file d'attente de la fin de l' Future
s.
public class LocalMapReduce<TMapInput, TMapOutput, TOutput> {
private int m_threads;
private Mapper<TMapInput, TMapOutput> m_mapper;
private Reducer<TMapOutput, TOutput> m_reducer;
...
public TOutput mapReduce(Collection<TMapInput> input) {
ExecutorService pool = Executors.newFixedThreadPool(m_threads);
CompletionService<TMapOutput> futurePool =
new ExecutorCompletionService<TMapOutput>(pool);
Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>();
for (TMapInput m : input) {
futureSet.add(futurePool.submit(m_mapper.makeWorker(m)));
}
pool.shutdown();
int n = futureSet.size();
for (int i = 0; i < n; i++) {
m_reducer.reduce(futurePool.take().get());
}
return m_reducer.getResult();
}
Je vais aussi noter que ce est une très distillée réduire la carte de l'algorithme, y compris un seul de réduire travailleur qui occupe à la fois de la réduction et de l'opération de fusion.