“Parallel.For”for Java?
我想知道是否有一个Parallel.For相当于.net版本的Java?
如果有人可以提供一个例子? 谢谢!
我想最接近的是:
ExecutorService exec = Executors.newFixedThreadPool(SOME_NUM_OF_THREADS); try { for (final Object o : list) { exec.submit(new Runnable() { @Override public void run() { // do stuff with o. } }); } } finally { exec.shutdown(); }
基于TheLQ的注释,您可以将SUM_NUM_THREADS设置为Runtime.getRuntime().availableProcessors();
编辑:决定添加一个基本的“Parallel.For”实现
public class Parallel { private static final int NUM_CORES = Runtime.getRuntime().availableProcessors(); private static final ExecutorService forPool = Executors.newFixedThreadPool(NUM_CORES * 2, new NamedThreadFactory("Parallel.For")); public static <T> void For(final Iterable<T> elements, final Operation<T> operation) { try { // invokeAll blocks for us until all submitted tasks in the call complete forPool.invokeAll(createCallables(elements, operation)); } catch (InterruptedException e) { e.printStackTrace(); } } public static <T> Collection<Callable<Void>> createCallables(final Iterable<T> elements, final Operation<T> operation) { List<Callable<Void>> callables = new LinkedList<Callable<Void>>(); for (final T elem : elements) { callables.add(new Callable<Void>() { @Override public Void call() { operation.perform(elem); return null; } }); } return callables; } public static interface Operation<T> { public void perform(T pParameter); } }
Parallel.For的示例用法
// Collection of items to process in parallel Collection<Integer> elems = new LinkedList<Integer>(); for (int i = 0; i < 40; ++i) { elems.add(i); } Parallel.For(elems, // The operation to perform with each item new Parallel.Operation<Integer>() { public void perform(Integer param) { System.out.println(param); }; });
我想这个实现更类似于Parallel.ForEach
编辑我把这个在GitHub上,如果任何人有兴趣。 并行在GitHub上
MLaw的解决scheme是一个非常实用的Parallel.ForEach。 我添加了一些修改,以使Parallel.For。
public class Parallel { static final int iCPU = Runtime.getRuntime().availableProcessors(); public static <T> void ForEach(Iterable <T> parameters, final LoopBody<T> loopBody) { ExecutorService executor = Executors.newFixedThreadPool(iCPU); List<Future<?>> futures = new LinkedList<Future<?>>(); for (final T param : parameters) { Future<?> future = executor.submit(new Runnable() { public void run() { loopBody.run(param); } }); futures.add(future); } for (Future<?> f : futures) { try { f.get(); } catch (InterruptedException e) { } catch (ExecutionException e) { } } executor.shutdown(); } public static void For(int start, int stop, final LoopBody<Integer> loopBody) { ExecutorService executor = Executors.newFixedThreadPool(iCPU); List<Future<?>> futures = new LinkedList<Future<?>>(); for (int i=start; i<stop; i++) { final Integer k = i; Future<?> future = executor.submit(new Runnable() { public void run() { loopBody.run(k); } }); futures.add(future); } for (Future<?> f : futures) { try { f.get(); } catch (InterruptedException e) { } catch (ExecutionException e) { } } executor.shutdown(); } } public interface LoopBody <T> { void run(T i); } public class ParallelTest { int k; public ParallelTest() { k = 0; Parallel.For(0, 10, new LoopBody <Integer>() { public void run(Integer i) { k += i; System.out.println(i); } }); System.out.println("Sum = "+ k); } public static void main(String [] argv) { ParallelTest test = new ParallelTest(); } }
build立在mlaw的build议,添加CountDownLatch。 添加chunksize以减lesssubmit()。
当使用4百万个物品arrays进行testing时,在Core i7 2630QM CPU上,这个testing的速度是()的5倍。
public class Loop { public interface Each { void run(int i); } private static final int CPUs = Runtime.getRuntime().availableProcessors(); public static void withIndex(int start, int stop, final Each body) { int chunksize = (stop - start + CPUs - 1) / CPUs; int loops = (stop - start + chunksize - 1) / chunksize; ExecutorService executor = Executors.newFixedThreadPool(CPUs); final CountDownLatch latch = new CountDownLatch(loops); for (int i=start; i<stop;) { final int lo = i; i += chunksize; final int hi = (i<stop) ? i : stop; executor.submit(new Runnable() { public void run() { for (int i=lo; i<hi; i++) body.run(i); latch.countDown(); } }); } try { latch.await(); } catch (InterruptedException e) {} executor.shutdown(); } public static void main(String [] argv) { Loop.withIndex(0, 9, new Loop.Each() { public void run(int i) { System.out.println(i*10); } }); } }
Java 7中的叉连接框架是用于并发支持的。 但是我不知道Parallel.For
的确切等价物。
这是我对这个主题的贡献https://github.com/pablormier/parallel-loops 。 用法很简单:
Collection<String> upperCaseWords = Parallel.ForEach(words, new Parallel.F<String, String>() { public String apply(String s) { return s.toUpperCase(); } });
也可以改变一些行为方面,比如线程的数量(默认情况下它使用caching的线程池):
Collection<String> upperCaseWords = new Parallel.ForEach<String, String>(words) .withFixedThreads(4) .apply(new Parallel.F<String, String>() { public String apply(String s) { return s.toUpperCase(); } }).values();
所有的代码都是独立的,只有一个Java类 ,没有比JDK更多的依赖关系。 我也鼓励你检查一下Java 8中以function方式并行化的新方法
一个更简单的select将是
// A thread pool which runs for the life of the application. private static final ExecutorService EXEC = Executors.newFixedThreadPool(SOME_NUM_OF_THREADS); //later EXEC.invokeAll(tasks); // you can optionally specify a timeout.
Parallel.For有一个相当于java的扩展。 它被称为Ateji PX,他们有一个免费的版本,你可以玩。 http://www.ateji.com/px/index.html
这与parallel.for完全相同,看起来相似。
For ||
更多的例子和维基百科解释: http : //en.wikipedia.org/wiki/Ateji_PX
爪哇国际海事组织closures的事
同步通常会杀死并行循环的加速。 因此,并行for循环往往需要他们的私有数据和减less机制来减less所有线程的私有数据才能构成一个单一的结果。
所以我通过减less机制扩展了Weimin Xiao
的Parallel.For版本。
public class Parallel { public static interface IntLoopBody { void run(int i); } public static interface LoopBody<T> { void run(T i); } public static interface RedDataCreator<T> { T run(); } public static interface RedLoopBody<T> { void run(int i, T data); } public static interface Reducer<T> { void run(T returnData, T addData); } private static class ReductionData<T> { Future<?> future; T data; } static final int nCPU = Runtime.getRuntime().availableProcessors(); public static <T> void ForEach(Iterable <T> parameters, final LoopBody<T> loopBody) { ExecutorService executor = Executors.newFixedThreadPool(nCPU); List<Future<?>> futures = new LinkedList<Future<?>>(); for (final T param : parameters) { futures.add(executor.submit(() -> loopBody.run(param) )); } for (Future<?> f : futures) { try { f.get(); } catch (InterruptedException | ExecutionException e) { System.out.println(e); } } executor.shutdown(); } public static void For(int start, int stop, final IntLoopBody loopBody) { final int chunkSize = (stop - start + nCPU - 1)/nCPU; final int loops = (stop - start + chunkSize - 1)/chunkSize; ExecutorService executor = Executors.newFixedThreadPool(loops); List<Future<?>> futures = new LinkedList<Future<?>>(); for (int i=start; i < stop; ) { final int iStart = i; i += chunkSize; final int iStop = (i < stop) ? i : stop; futures.add(executor.submit(() -> { for (int j = iStart; j < iStop; j++) loopBody.run(j); })); } for (Future<?> f : futures) { try { f.get(); } catch (InterruptedException | ExecutionException e) { System.out.println(e); } } executor.shutdown(); } public static <T> void For(int start, int stop, T result, final RedDataCreator<T> creator, final RedLoopBody<T> loopBody, final Reducer<T> reducer) { final int chunkSize = (stop - start + nCPU - 1)/nCPU; final int loops = (stop - start + chunkSize - 1)/chunkSize; ExecutorService executor = Executors.newFixedThreadPool(loops); List<ReductionData<T>> redData = new LinkedList<ReductionData<T>>(); for (int i = start; i < stop; ) { final int iStart = i; i += chunkSize; final int iStop = (i < stop) ? i : stop; final ReductionData<T> rd = new ReductionData<T>(); rd.data = creator.run(); rd.future = executor.submit(() -> { for (int j = iStart; j < iStop; j++) { loopBody.run(j, rd.data); } }); redData.add(rd); } for (ReductionData<T> rd : redData) { try { rd.future.get(); if (rd.data != null) { reducer.run(result, rd.data); } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } executor.shutdown(); } }
这里有一个简单的testing例子:一个使用非同步映射的并行字符计数器。
import java.util.*; public class ParallelTest { static class Counter { int cnt; Counter() { cnt = 1; } } public static void main(String[] args) { String text = "More formally, if this map contains a mapping from a key k to a " + "value v such that key compares equal to k according to the map's ordering, then " + "this method returns v; otherwise it returns null."; Map<Character, Counter> charCounter1 = new TreeMap<Character, Counter>(); Map<Character, Counter> charCounter2 = new TreeMap<Character, Counter>(); // first sequentially for(int i=0; i < text.length(); i++) { char c = text.charAt(i); Counter cnt = charCounter1.get(c); if (cnt == null) { charCounter1.put(c, new Counter()); } else { cnt.cnt++; } } for(Map.Entry<Character, Counter> entry: charCounter1.entrySet()) { System.out.println(entry.getKey() + ": " + entry.getValue().cnt); } // now parallel without synchronization Parallel.For(0, text.length(), charCounter2, // Creator () -> new TreeMap<Character, Counter>(), // Loop Body (i, map) -> { char c = text.charAt(i); Counter cnt = map.get(c); if (cnt == null) { map.put(c, new Counter()); } else { cnt.cnt++; } }, // Reducer (result, map) -> { for(Map.Entry<Character, Counter> entry: map.entrySet()) { Counter cntR = result.get(entry.getKey()); if (cntR == null) { result.put(entry.getKey(), entry.getValue()); } else { cntR.cnt += entry.getValue().cnt; } } } ); // compare results assert charCounter1.size() == charCounter2.size() : "wrong size: " + charCounter1.size() + ", " + charCounter2.size(); Iterator<Map.Entry<Character, Counter>> it2 = charCounter2.entrySet().iterator(); for(Map.Entry<Character, Counter> entry: charCounter1.entrySet()) { Map.Entry<Character, Counter> entry2 = it2.next(); assert entry.getKey() == entry2.getKey() && entry.getValue().cnt == entry2.getValue().cnt : "wrong content"; } System.out.println("Well done!"); } }
我有一个更新的Java并行类,可以做Parallel.For,Parallel.ForEach,Parallel.Tasks和分区并行循环。 源代码如下:
使用这些并行循环的例子如下:
public static void main(String [] argv) { //sample data final ArrayList<String> ss = new ArrayList<String>(); String [] s = {"a", "b", "c", "d", "e", "f", "g"}; for (String z : s) ss.add(z); int m = ss.size(); //parallel-for loop System.out.println("Parallel.For loop:"); Parallel.For(0, m, new LoopBody<Integer>() { public void run(Integer i) { System.out.println(i +"\t"+ ss.get(i)); } }); //parallel for-each loop System.out.println("Parallel.ForEach loop:"); Parallel.ForEach(ss, new LoopBody<String>() { public void run(String p) { System.out.println(p); } }); //partitioned parallel loop System.out.println("Partitioned Parallel loop:"); Parallel.ForEach(Parallel.create(0, m), new LoopBody<Partition>() { public void run(Partition p) { for(int i=p.start; i<p.end; i++) System.out.println(i +"\t"+ ss.get(i)); } }); //parallel tasks System.out.println("Parallel Tasks:"); Parallel.Tasks(new Task [] { //task-1 new Task() {public void run() { for(int i=0; i<3; i++) System.out.println(i +"\t"+ ss.get(i)); }}, //task-2 new Task() {public void run() { for (int i=3; i<6; i++) System.out.println(i +"\t"+ ss.get(i)); }} }); }
源代码: https : //github.com/crammeur/MyInvoices/blob/master/crammeurLibrairy/src/main/java/ca/qc/bergeron/marcantoine/crammeur/librairy/utils/Parallel.java
[UPDATE]
平行class:
private static final int NUM_CORES = Runtime.getRuntime().availableProcessors(); private static final int MAX_THREAD = NUM_CORES*2; @NotNull public static <T,R> Collection<R> For(final Collection<T> elements, final Operation<T,R> operation) { ExecutorService executor = Executors.newFixedThreadPool(MAX_THREAD, new NamedThreadFactory("Parallel.Executor")); final List<R> result = new ArrayList<>(elements.size()); final Iterator<T> iterator = elements.iterator(); final Runnable runnable = new Runnable() { final Callable<R> callable = new Callable<R>() { @Override public R call() throws Exception { T result; synchronized (iterator) { result = iterator.next(); } return operation.perform(result); } }; @Override public void run() { while (iterator.hasNext()) { R r; try { if (operation.async()) { r = callable.call(); if (operation.result()) { synchronized (result) { result.add(r); } } if (!operation.follow()) { break; } } else { synchronized (callable) { r = callable.call(); if (operation.result()) { synchronized (result) { result.add(r); } } if (!operation.follow()) { break; } } } } catch (NoSuchElementException e) { break; } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); } } } }; for (int threadIndex=0; threadIndex<MAX_THREAD; threadIndex++) { executor.execute(runnable); } executor.shutdown(); while (!executor.isTerminated()) { try { Thread.sleep(0,1); } catch (InterruptedException e) { e.printStackTrace(); throw new RuntimeException(e); } } return result; } @NotNull public static <T,R> Iterable<R> For(final Iterable<T> elements, final Operation<T,R> operation) { ExecutorService executor = Executors.newFixedThreadPool(MAX_THREAD, new NamedThreadFactory("Parallel.Executor")); final List<R> result = new LinkedList<>(); final Iterator<T> iterator = elements.iterator(); final Runnable runnable = new Runnable() { final Callable<R> callable = new Callable<R>() { @Override public R call() throws Exception { T result; synchronized (iterator) { result = iterator.next(); } return operation.perform(result); } }; @Override public void run() { while (iterator.hasNext()) { R r; try { if (operation.async()) { r = callable.call(); if (operation.result()) { synchronized (result) { result.add(r); } } if (!operation.follow()) { break; } } else { synchronized (callable) { r = callable.call(); if (operation.result()) { synchronized (result) { result.add(r); } } if (!operation.follow()) { break; } } } } catch (NoSuchElementException e) { break; } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); } } } }; for (int threadIndex=0; threadIndex<MAX_THREAD; threadIndex++) { executor.execute(runnable); } executor.shutdown(); while (!executor.isTerminated()) { try { Thread.sleep(0,1); } catch (InterruptedException e) { e.printStackTrace(); throw new RuntimeException(e); } } return result; } @NotNull public static <T,R> Collection<Callable<R>> createCallables(final Collection<T> elements, final Operation<T,R> operation) { ExecutorService executor = Executors.newFixedThreadPool(MAX_THREAD, new NamedThreadFactory("Parallel.Executor")); final Collection<Callable<R>> callables = new ArrayList<>(elements.size()); final Runnable runnable = new Runnable() { final Iterator<T> iterator = elements.iterator(); final int maxIndex = elements.size()-1; final Callable<R> callable = new Callable<R>() { @Override public R call() throws Exception { T result; synchronized (iterator) { result = iterator.next(); } if (operation.async()) { return operation.perform(result); } else { synchronized (operation) { return operation.perform(result); } } } }; int index = -1; @Override public void run() { while (index < maxIndex) { //Performance sync this only synchronized (this) { index++; } callables.add(callable); } } }; for (int indexThread = 0; indexThread< MAX_THREAD; indexThread++) { executor.execute(runnable); } executor.shutdown(); while (!executor.isTerminated()) { try { Thread.sleep(0,1); } catch (InterruptedException e) { e.printStackTrace(); throw new RuntimeException(e); } } return callables; } @NotNull public static <T,R> Iterable<Callable<R>> createCallables(final Iterable<T> elements, final Operation<T,R> operation) { ExecutorService executor = Executors.newFixedThreadPool(MAX_THREAD, new NamedThreadFactory("Parallel.Executor")); final Collection<Callable<R>> result = new LinkedList<>(); final Iterator<T> iterator = elements.iterator(); final Runnable runnable = new Runnable() { @Override public void run() { while (true) { try { final T element; if (operation.async()) { element = iterator.next(); synchronized (result) { result.add(new Callable<R>() { @Override public R call() throws Exception { return operation.perform(element); } }); } } else { synchronized (iterator) { element = iterator.next(); synchronized (result) { result.add(new Callable<R>() { @Override public R call() throws Exception { return operation.perform(element); } }); } } } } catch (NoSuchElementException e) { break; } } } }; for (int indexThread = 0; indexThread< MAX_THREAD; indexThread++) { executor.execute(runnable); } executor.shutdown(); while (!executor.isTerminated()) { try { Thread.sleep(0,1); } catch (InterruptedException e) { e.printStackTrace(); throw new RuntimeException(e); } } return result; } public interface Operation<T,R> { R perform(T pParameter); boolean follow(); boolean result(); boolean async(); }
例如:
Parallel.For(collection, new Parallel.Operation<Data<Long>, Long>() { boolean async = false; @Override public synchronized Long perform(Data<Long> pParameter) { System.out.println(pParameter); async = !async; return pParameter.getId(); } //break loop if false @Override public boolean follow() { return true; } //add result if true @Override public boolean result() { return true; } //Performance if true @Override public boolean async() { return async; } });
java 并行 multithreading