如何在Java 5中使用ExecutorService实现任务优先级?

我正在实现一个线程池机制,我想执行不同优先级的任务。 我想有一个很好的机制,我可以提交一个高优先级任务的服务,并安排在其他任务之前。 任务的优先级是任务本身的一个固有属性(无论我将这个任务表示为Callable还是可运行对我来说都不重要)。

现在,从表面上看,我可以在我的ThreadPoolExecutor使用PriorityBlockingQueue作为任务队列,但该队列包含Runnable对象,这可能是也可能不是我已经提交给它的Runnable任务。 而且,如果我已经提交了Callable任务,目前还不清楚这将如何映射。

有没有办法做到这一点? 我真的不想为此而推出自己的产品,因为我更有可能错误地这样做。

(撇开,是的,我意识到在这样的事情中可能会出现对于优先级较低的工作的匮乏,对于公平合理保证的解决scheme,加分(?!)

乍一看,您似乎可以为扩展RunnableCallable<T>Comparable任务定义一个接口。 然后用PriorityBlockingQueue作为队列包装一个ThreadPoolExecutor ,并且只接受实现你的接口的任务。

考虑到你的评论,看起来像一个选项是扩展ThreadPoolExecutor ,并覆盖submit()方法。 请参阅AbstractExecutorService以查看默认的外观。 他们所做的只是在FutureTask包装RunnableCallableexecute()它。 我可能会通过编写一个实现ExecutorService的包装器类来委托给一个匿名的ThreadPoolExecutor 。 把它们包装在你的优先事项中,以便你的Comparator可以得到它。

我已经以合理的方式解决了这个问题,我将在下面对它进行描述,以供将来参考我自己和其他任何使用Java Concurrent库时遇到此问题的人员。

使用PriorityBlockingQueue作为保持任务供以后执行的手段的确是一个正确的方向。 问题是PriorityBlockingQueue必须一般实例化为包含Runnable实例,并且不可能在Runnable接口上调用compareTo (或类似)。

在解决这个问题上。 在创buildExecutor时,必须给它一个PriorityBlockingQueue 。 该队列应进一步给定一个自定义的比较器,以适当的地方sorting:

 new PriorityBlockingQueue<Runnable>(size, new CustomTaskComparator()); 

现在,查看CustomTaskComparator

 public class CustomTaskComparator implements Comparator<MyType> { @Override public int compare(MyType first, MyType second) { return comparison; } } 

一切都看起来很直截了当。 这里有点粘。 我们接下来的问题是处理从Executor创buildFutureTasks。 在Executor中,我们必须重写newTaskFor如下所示:

 @Override protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) { //Override the default FutureTask creation and retrofit it with //a custom task. This is done so that prioritization can be accomplished. return new CustomFutureTask(c); } 

c是我们试图执行的Callable任务。 现在,我们来看看CustomFutureTask

 public class CustomFutureTask extends FutureTask { private CustomTask task; public CustomFutureTask(Callable callable) { super(callable); this.task = (CustomTask) callable; } public CustomTask getTask() { return task; } } 

注意getTask方法。 稍后我们将使用它来抓取我们创build的CustomFutureTask的原始任务。

最后,让我们修改我们尝试执行的原始任务:

 public class CustomTask implements Callable<MyType>, Comparable<CustomTask> { private final MyType myType; public CustomTask(MyType myType) { this.myType = myType; } @Override public MyType call() { //Do some things, return something for FutureTask implementation of `call`. return myType; } @Override public int compareTo(MyType task2) { return new CustomTaskComparator().compare(this.myType, task2.myType); } } 

您可以看到我们在任务中实现了Comparable ,将其委托给实际的MyType Comparator器。

在那里,您可以使用Java库为Executor自定义优先级! 这需要一些弯曲,但是我能够想出最清洁的。 我希望这对某人有帮助!

你可以使用这些辅助类:

 public class PriorityFuture<T> implements RunnableFuture<T> { private RunnableFuture<T> src; private int priority; public PriorityFuture(RunnableFuture<T> other, int priority) { this.src = other; this.priority = priority; } public int getPriority() { return priority; } public boolean cancel(boolean mayInterruptIfRunning) { return src.cancel(mayInterruptIfRunning); } public boolean isCancelled() { return src.isCancelled(); } public boolean isDone() { return src.isDone(); } public T get() throws InterruptedException, ExecutionException { return src.get(); } public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return src.get(timeout, unit); } public void run() { src.run(); } public static Comparator<Runnable> COMP = new Comparator<Runnable>() { public int compare(Runnable o1, Runnable o2) { if (o1 == null && o2 == null) return 0; else if (o1 == null) return -1; else if (o2 == null) return 1; else { int p1 = ((PriorityFuture<?>) o1).getPriority(); int p2 = ((PriorityFuture<?>) o2).getPriority(); return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1); } } }; } 

 public interface PriorityCallable<T> extends Callable<T> { int getPriority(); } 

这个帮手方法:

 public static ThreadPoolExecutor getPriorityExecutor(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) { protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { RunnableFuture<T> newTaskFor = super.newTaskFor(callable); return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority()); } }; } 

然后像这样使用它:

 class LenthyJob implements PriorityCallable<Long> { private int priority; public LenthyJob(int priority) { this.priority = priority; } public Long call() throws Exception { System.out.println("Executing: " + priority); long num = 1000000; for (int i = 0; i < 1000000; i++) { num *= Math.random() * 1000; num /= Math.random() * 1000; if (num == 0) num = 1000000; } return num; } public int getPriority() { return priority; } } public class TestPQ { public static void main(String[] args) throws InterruptedException, ExecutionException { ThreadPoolExecutor exec = getPriorityExecutor(2); for (int i = 0; i < 20; i++) { int priority = (int) (Math.random() * 100); System.out.println("Scheduling: " + priority); LenthyJob job = new LenthyJob(priority); exec.submit(job); } } } 

我将尝试用全function的代码来解释这个问题。 但在深入代码之前,我想解释一下PriorityBlockingQueue

PriorityBlockingQueue :PriorityBlockingQueue是BlockingQueue的一个实现。 它接受任务和优先级,并首先提交具有最高优先级的任务。 如果任何两个任务具有相同的优先级,那么我们需要提供一些自定义逻辑来决定哪个任务先行。

现在让我们直接进入代码。

驱动程序类 :这个类创build一个执行程序,接受任务,然后提交它们执行。 在这里我们创build两个任务,一个用低优先级,另一个用高优先级。 在这里,我们告诉执行者运行1个线程的MAX并使用PriorityBlockingQueue。

  public static void main(String[] args) { /* Minimum number of threads that must be running : 0 Maximium number of threads that can be created : 1 If a thread is idle, then the minimum time to keep it alive : 1000 Which queue to use : PriorityBlockingQueue */ PriorityBlockingQueue queue = new PriorityBlockingQueue(); ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1, 1000, TimeUnit.MILLISECONDS,queue); MyTask task = new MyTask(Priority.LOW,"Low"); executor.execute(new MyFutureTask(task)); task = new MyTask(Priority.HIGH,"High"); executor.execute(new MyFutureTask(task)); task = new MyTask(Priority.MEDIUM,"Medium"); executor.execute(new MyFutureTask(task)); } 

MyTask类 :MyTask实现了Runnable,并在构造函数中接受优先级作为参数。 当这个任务运行时,它打印一条消息,然后使线程hibernate1秒钟。

  public class MyTask implements Runnable { public int getPriority() { return priority.getValue(); } private Priority priority; public String getName() { return name; } private String name; public MyTask(Priority priority,String name){ this.priority = priority; this.name = name; } @Override public void run() { System.out.println("The following Runnable is getting executed "+getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } 

MyFutureTask类 :因为我们使用PriorityBlocingQueue来保存我们的任务,所以我们的任务必须包装在FutureTask中,我们的FutureTask实现必须实现Comparable接口。 Comparable接口比较两个不同任务的优先级,并提交具有最高执行优先级的任务。

  public class MyFutureTask extends FutureTask<MyFutureTask> implements Comparable<MyFutureTask> { private MyTask task = null; public MyFutureTask(MyTask task){ super(task,null); this.task = task; } @Override public int compareTo(MyFutureTask another) { return task.getPriority() - another.task.getPriority(); } } 

优先级 :自我解释优先级。

 public enum Priority { HIGHEST(0), HIGH(1), MEDIUM(2), LOW(3), LOWEST(4); int value; Priority(int val) { this.value = val; } public int getValue(){ return value; } } 

现在,当我们运行这个例子时,我们得到以下输出

 The following Runnable is getting executed High The following Runnable is getting executed Medium The following Runnable is getting executed Low 

尽pipe我们先提交了低优先级,但稍后再提交高优先级任务,但由于我们使用的是PriorityBlockingQueue,因此任何具有更高优先级的任务都将首先执行。

我的解决scheme保留了相同优先级的任务的子级顺序。 这是对这个答案的改进

任务执行顺序基于:

  1. 优先
  2. 提交订单(在相同的优先级)

testing人员类别:

 public class Main { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executorService = PriorityExecutors.newFixedThreadPool(1); //Priority=0 executorService.submit(newCallable("A1", 200)); //Defaults to priority=0 executorService.execute(newRunnable("A2", 200)); //Defaults to priority=0 executorService.submit(PriorityCallable.of(newCallable("A3", 200), 0)); executorService.submit(PriorityRunnable.of(newRunnable("A4", 200), 0)); executorService.execute(PriorityRunnable.of(newRunnable("A5", 200), 0)); executorService.submit(PriorityRunnable.of(newRunnable("A6", 200), 0)); executorService.execute(PriorityRunnable.of(newRunnable("A7", 200), 0)); executorService.execute(PriorityRunnable.of(newRunnable("A8", 200), 0)); //Priority=1 executorService.submit(PriorityRunnable.of(newRunnable("B1", 200), 1)); executorService.submit(PriorityRunnable.of(newRunnable("B2", 200), 1)); executorService.submit(PriorityCallable.of(newCallable("B3", 200), 1)); executorService.execute(PriorityRunnable.of(newRunnable("B4", 200), 1)); executorService.submit(PriorityRunnable.of(newRunnable("B5", 200), 1)); executorService.shutdown(); } private static Runnable newRunnable(String name, int delay) { return new Runnable() { @Override public void run() { System.out.println(name); sleep(delay); } }; } private static Callable<Integer> newCallable(String name, int delay) { return new Callable<Integer>() { @Override public Integer call() throws Exception { System.out.println(name); sleep(delay); return 10; } }; } private static void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } } } 

结果:

A1 B1 B2 B3 B4 B5 A2 A3 A4 A5 A6 A7 A8

第一个任务是A1,因为插入队列时没有更高的优先级。 B任务是先执行的1个优先级,A任务是0优先执行,但是执行顺序是子命令顺序:B1,B2,B3,… A2,A3,A4 …

解决scheme:

 public class PriorityExecutors { public static ExecutorService newFixedThreadPool(int nThreads) { return new PriorityExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS); } private static class PriorityExecutor extends ThreadPoolExecutor { private static final int DEFAULT_PRIORITY = 0; private static AtomicLong instanceCounter = new AtomicLong(); @SuppressWarnings({"unchecked"}) public PriorityExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10, ComparableTask.comparatorByPriorityAndSequentialOrder())); } @Override public void execute(Runnable command) { // If this is ugly then delegator pattern needed if (command instanceof ComparableTask) //Already wrapped super.execute(command); else { super.execute(newComparableRunnableFor(command)); } } private Runnable newComparableRunnableFor(Runnable runnable) { return new ComparableRunnable(ensurePriorityRunnable(runnable)); } @Override protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new ComparableFutureTask<>(ensurePriorityCallable(callable)); } @Override protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value); } private <T> PriorityCallable<T> ensurePriorityCallable(Callable<T> callable) { return (callable instanceof PriorityCallable) ? (PriorityCallable<T>) callable : PriorityCallable.of(callable, DEFAULT_PRIORITY); } private PriorityRunnable ensurePriorityRunnable(Runnable runnable) { return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable : PriorityRunnable.of(runnable, DEFAULT_PRIORITY); } private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask { private Long sequentialOrder = instanceCounter.getAndIncrement(); private HasPriority hasPriority; public ComparableFutureTask(PriorityCallable<T> priorityCallable) { super(priorityCallable); this.hasPriority = priorityCallable; } public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) { super(priorityRunnable, result); this.hasPriority = priorityRunnable; } @Override public long getInstanceCount() { return sequentialOrder; } @Override public int getPriority() { return hasPriority.getPriority(); } } private static class ComparableRunnable implements Runnable, ComparableTask { private Long instanceCount = instanceCounter.getAndIncrement(); private HasPriority hasPriority; private Runnable runnable; public ComparableRunnable(PriorityRunnable priorityRunnable) { this.runnable = priorityRunnable; this.hasPriority = priorityRunnable; } @Override public void run() { runnable.run(); } @Override public int getPriority() { return hasPriority.getPriority(); } @Override public long getInstanceCount() { return instanceCount; } } private interface ComparableTask extends Runnable { int getPriority(); long getInstanceCount(); public static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() { return (o1, o2) -> { int priorityResult = o2.getPriority() - o1.getPriority(); return priorityResult != 0 ? priorityResult : (int) (o1.getInstanceCount() - o2.getInstanceCount()); }; } } } private static interface HasPriority { int getPriority(); } public interface PriorityCallable<V> extends Callable<V>, HasPriority { public static <V> PriorityCallable<V> of(Callable<V> callable, int priority) { return new PriorityCallable<V>() { @Override public V call() throws Exception { return callable.call(); } @Override public int getPriority() { return priority; } }; } } public interface PriorityRunnable extends Runnable, HasPriority { public static PriorityRunnable of(Runnable runnable, int priority) { return new PriorityRunnable() { @Override public void run() { runnable.run(); } @Override public int getPriority() { return priority; } }; } } } 

是否有可能有一个ThreadPoolExecutor为每个级别的优先级? 一个ThreadPoolExecutor可以被一个ThreadFactory实例化,并且你可以拥有一个ThreadFactory的实现来设置不同的优先级。

  class MaxPriorityThreadFactory implements ThreadFactory { public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setPriority(Thread.MAX_PRIORITY); } }