49 votes

Cadre Java Map / Reduce simple

Quelqu'un peut me pointer à un simple, open-source Map/reduce cadre/API pour Java? Il ne semble pas beaucoup de preuves d'une telle chose existante, mais quelqu'un d'autre le savez peut-être différent.

Le mieux que je puisse trouver est, bien sûr, Hadoop MapReduce, mais qui ne parvient pas à la "simple". Je n'ai pas besoin de la capacité à exécuter les travaux distribués, juste quelque chose à me laisser exécuter map/reduce-style emplois sur une machine multi-coeur, en une seule JVM, en utilisant la norme Java5-style de simultanéité.

Ce n'est pas une chose difficile à écrire soi-même, mais je préfère ne pas avoir à le faire.

18voto

chaostheory Points 840

Avez-vous découvrez Akka? Tout akka est vraiment un distribué modèle de l'Acteur en fonction de simultanéité cadre, vous pouvez mettre beaucoup de choses simplement, avec peu de code. Il est tellement facile de diviser le travail en morceaux avec elle, et il prend automatiquement le plein avantage d'un multi-core de la machine, ainsi que d'être en mesure d'utiliser plusieurs machines de processus de travail. Contrairement à l'utilisation de threads, il se sent plus naturel pour moi.

J'ai un Java carte de réduire l'exemple à l'aide de akka. Ce n'est pas la meilleure carte de réduire l'exemple, car il rend l'utilisation de contrats à terme; mais il devrait vous donner une idée approximative de ce qui est impliqué. Il y a plusieurs choses importantes que ma carte de réduire montre l'exemple:

  • Comment répartir le travail.
  • Comment affecter le travail: akka a vraiment un simple système de messagerie, ainsi qu'un travail partioner, dont le programme, vous pouvez les configurer. Une fois que j'ai appris à l'utiliser, je ne pouvais pas l'arrêter. C'est tellement simple et flexible. J'ai été en utilisant tous les quatre de mes cœurs de PROCESSEUR en un rien de temps. C'est vraiment génial pour la mise en œuvre des services.
  • Comment savoir quand le travail est fait et le résultat est prêt à traiter: C'est en fait la partie qui peut être la plus difficile et déroutant, sauf si vous êtes déjà familier avec le Terme. Vous n'avez pas besoin d'utiliser des contrats à Terme, puisqu'il existe d'autres options. Je viens de les utiliser parce que je voulais quelque chose de plus court pour les personnes à grok.

Si vous avez des questions, StackOverflow a effectivement une impressionnantes akka QA section.

11voto

Lukas Eder Points 48046

Je pense qu'il est utile de mentionner que ces problèmes seront (espérons-le) historiques à partir de Java 8. Un exemple:

 int heaviestBlueBlock =
    blocks.filter(b -> b.getColor() == BLUE)
          .map(Block::getWeight)
          .reduce(0, Integer::max);
 

Pour plus de détails, voir la présentation de Brian Goetz sur le projet lambda

10voto

Peter Lawrey Points 229686

J'utilise la structure suivante

 int procs = Runtime.getRuntime().availableProcessors();
ExecutorService es = Executors.newFixedThreadPool(procs);

List<Future<TaskResult>> results = new ArrayList();
for(int i=0;i<tasks;i++)
    results.add(es.submit(new Task(i)));
for(Future<TaskResult> future:results)
    reduce(future);
 

8voto

Gareth Davis Points 16190

Je me rends compte que cela se fera peut-être un peu après le fait, mais vous voudrez peut-être jeter un coup d'œil aux classes JSR166y ForkJoin de JDK7.

Il existe une bibliothèque rétroportée qui fonctionne sous JDK6 sans aucun problème, vous n'avez donc pas à attendre jusqu'au prochain millénaire pour l'essayer. Il se situe quelque part entre un exécuteur brut et hadoop, ce qui donne un cadre pour travailler sur la carte et réduire le travail au sein de la JVM actuelle.

6voto

xan Points 3597

J'ai créé un one-off pour moi il y a quelques années lorsque j'ai eu un 8-core de la machine, mais je n'étais pas très heureux avec elle. Je n'ai jamais réussi à être aussi simple à utiliser que je l'avais espéré, et la mémoire des tâches à forte intensité n'était pas bien.

Si vous n'obtenez pas de réelles réponses, je peux partager plus, mais l'essentiel est:

public class LocalMapReduce<TMapInput, TMapOutput, TOutput> {
    private int m_threads;
    private Mapper<TMapInput, TMapOutput> m_mapper;
    private Reducer<TMapOutput, TOutput> m_reducer;
    ...
    public TOutput mapReduce(Iterator<TMapInput> inputIterator) {
        ExecutorService pool = Executors.newFixedThreadPool(m_threads);
        Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>();
        while (inputIterator.hasNext()) {
            TMapInput m = inputIterator.next();
            Future<TMapOutput> f = pool.submit(m_mapper.makeWorker(m));
            futureSet.add(f);
            Thread.sleep(10);
        }
        while (!futureSet.isEmpty()) {
            Thread.sleep(5);
            for (Iterator<Future<TMapOutput>> fit = futureSet.iterator(); fit.hasNext();) {
                Future<TMapOutput> f = fit.next();
                if (f.isDone()) {
                    fit.remove();
                    TMapOutput x = f.get();
                    m_reducer.reduce(x);
                }
            }
        }
        return m_reducer.getResult();
    }
}

EDIT: Basé sur un commentaire, ci-dessous est une version sans sleep. L'astuce est d'utiliser CompletionService qui fournit essentiellement un blocage de la file d'attente de la fin de l' Futures.

 public class LocalMapReduce<TMapInput, TMapOutput, TOutput> {
    private int m_threads;
    private Mapper<TMapInput, TMapOutput> m_mapper;
    private Reducer<TMapOutput, TOutput> m_reducer;
    ...
    public TOutput mapReduce(Collection<TMapInput> input) {
        ExecutorService pool = Executors.newFixedThreadPool(m_threads);
        CompletionService<TMapOutput> futurePool = 
                  new ExecutorCompletionService<TMapOutput>(pool);
        Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>();
        for (TMapInput m : input) {
            futureSet.add(futurePool.submit(m_mapper.makeWorker(m)));
        }
        pool.shutdown();
        int n = futureSet.size();
        for (int i = 0; i < n; i++) {
            m_reducer.reduce(futurePool.take().get());
        }
        return m_reducer.getResult();
    }

Je vais aussi noter que ce est une très distillée réduire la carte de l'algorithme, y compris un seul de réduire travailleur qui occupe à la fois de la réduction et de l'opération de fusion.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X