简单的Java Map / Reduce框架

任何人都可以指向我一个简单的,开源的Map / Reduce框架/ API的Java? 似乎没有太多证据表明这种事情存在,但其他人可能会有所不同。

当然,我能find的最好的就是Hadoop MapReduce,但是没有通过“简单”的标准。 我不需要运行分布式作业的能力,只需要让我使用标准的Java5风格的并发在单个JVM上在多核机器上运行map / reduce-style作业。

写自己并不难,但我宁愿不必这样做。

我认为值得一提的是,这些问题是Java 8的历史。例如:

int heaviestBlueBlock = blocks.filter(b -> b.getColor() == BLUE) .map(Block::getWeight) .reduce(0, Integer::max); 

换句话说: 单节点MapReduce在Java 8中可用

有关更多详细信息,请参阅Brian Goetz关于项目lambda的介绍

你看看阿卡吗? 虽然akka实际上是一个基于分布式Actor模型的并发框架,但是只需很less的代码就可以实现很多事情。 把工作分成几部分就很容易,它可以自动充分利用多核机器,并且可以使用多台机器来处理工作。 不像使用线程,我感觉更自然。

我有一个Java 地图使用AKK 减less的例子 。 这不是最简单的地图减less的例子,因为它利用了期货; 但它应该给你一个粗略的想法。 我的地图缩小示例演示了几个主要的东西:

  • 如何划分工作。
  • 如何分配工作:阿卡有一个非常简单的消息系统是一个工作分区,可以configuration其时间表。 一旦我学会了如何使用它,我无法停下来。 它非常简单而灵活。 我立即使用了全部四个CPU内核。 这对于实现服务非常有用。
  • 如何知道工作完成的时间以及结果是否已经准备好处理:除非您已经熟悉期货,否则这实际上是可能是最难理解和难以理解的部分。 您不需要使用期货,因为还有其他的select。 我只是用了它们,因为我想让人们更短的时间。

如果您有任何问题,StackOverflow实际上有一个很棒的akka​​ QA部分。

我使用以下结构

 int procs = Runtime.getRuntime().availableProcessors(); ExecutorService es = Executors.newFixedThreadPool(procs); List<Future<TaskResult>> results = new ArrayList(); for(int i=0;i<tasks;i++) results.add(es.submit(new Task(i))); for(Future<TaskResult> future:results) reduce(future); 

我意识到这可能是一个小事,但你可能想看看JDK7的JSR166y ForkJoin类。

有一个在JDK6下运行的后端库,没有任何问题,所以你不必等到下一个千年才能顺利完成。 它位于原始执行程序和hadoop之间,在当前的JVM中提供了一个用于缩小作业的框架。

几年前,当我有一台8核心机器时,我为自己创造了一次性,但我并不满意。 我从来没有像我所希望的那样简单地使用它,而内存密集型的任务并没有很好地扩展。

如果你没有得到任何真正的答案,我可以分享更多,但它的核心是:

 public class LocalMapReduce<TMapInput, TMapOutput, TOutput> { private int m_threads; private Mapper<TMapInput, TMapOutput> m_mapper; private Reducer<TMapOutput, TOutput> m_reducer; ... public TOutput mapReduce(Iterator<TMapInput> inputIterator) { ExecutorService pool = Executors.newFixedThreadPool(m_threads); Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>(); while (inputIterator.hasNext()) { TMapInput m = inputIterator.next(); Future<TMapOutput> f = pool.submit(m_mapper.makeWorker(m)); futureSet.add(f); Thread.sleep(10); } while (!futureSet.isEmpty()) { Thread.sleep(5); for (Iterator<Future<TMapOutput>> fit = futureSet.iterator(); fit.hasNext();) { Future<TMapOutput> f = fit.next(); if (f.isDone()) { fit.remove(); TMapOutput x = f.get(); m_reducer.reduce(x); } } } return m_reducer.getResult(); } } 

编辑:根据评论,下面是一个没有sleep的版本。 诀窍是使用CompletionService ,它本质上提供了已完成Future的阻塞队列。

  public class LocalMapReduce<TMapInput, TMapOutput, TOutput> { private int m_threads; private Mapper<TMapInput, TMapOutput> m_mapper; private Reducer<TMapOutput, TOutput> m_reducer; ... public TOutput mapReduce(Collection<TMapInput> input) { ExecutorService pool = Executors.newFixedThreadPool(m_threads); CompletionService<TMapOutput> futurePool = new ExecutorCompletionService<TMapOutput>(pool); Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>(); for (TMapInput m : input) { futureSet.add(futurePool.submit(m_mapper.makeWorker(m))); } pool.shutdown(); int n = futureSet.size(); for (int i = 0; i < n; i++) { m_reducer.reduce(futurePool.take().get()); } return m_reducer.getResult(); } 

我还会注意到这是一个非常简单的map-reducealgorithm,包括一个reduce工作器,它同时执行reduce和merge操作。

我喜欢在Java中使用Skandium进行并行处理。 该框架为具有共享内存的多核机器实现了一定的并行模式(即主从,Map / Reduce,Pipe,Fork和Divide&Conquer)。 这种技术被称为“algorithm骨架”。 模式可以嵌套。

详细的有骨骼和肌肉。 肌肉做实际的工作(分裂,合并,执行和条件)。 除了“While”,“For”和“If”之外,骨架代表并行性的模式,嵌套模式时可能会有用。

例子可以在框架内find。 我需要一点点来了解如何使用肌肉和骨骼,但在克服这个障碍之后,我真的很喜欢这个框架。 🙂

你有没有看过GridGain ?

你可能想看看Functionals 4 Java的项目网站: http : //f4j.rethab.ch/它在8之前引入了filter,map和reduce到java版本。

MapReduce API被引入到Hazelcast的v3.2中(参见文档中的MapReduce API部分 )。 虽然Hazelcast打算在分布式系统中使用,但它在单个节点设置中运行得非常好,而且相当轻量级。

您可以尝试LeoTask:并行任务运行和结果汇总框架

它是免费的,开放源代码: https : //github.com/mleoking/leotask

这里是一个简短的介绍,显示其API: https : //github.com/mleoking/leotask/blob/master/leotask/introduction.pdf?raw=true

这是一个使用所有可用的CPU核心在单台计算机上工作的轻量级框架。

它具有以下特点:

  • 自动和并行参数空间探索
  • 灵活和基于configuration的结果聚合
  • 编程模型只关注关键逻辑
  • 可靠的自动中断恢复

和公用事业:

  • dynamic和可复制的networking结构。
  • 与Gnuplot集成
  • 根据通用networking模型生成networking
  • DelimitedReader:一个复杂的阅读器,像数据库一样探索CSV(逗号分隔值)文件
  • 基于Mersenne Twisteralgorithm的快速随机数生成器
  • 来自ImageJ项目的集成CurveFitter