如何在Java 5中使用ExecutorService实现任务优先级?
我正在实现一个线程池机制,我想执行不同优先级的任务。 我想有一个很好的机制,我可以提交一个高优先级任务的服务,并安排在其他任务之前。 任务的优先级是任务本身的一个固有属性(无论我将这个任务表示为Callable
还是可运行对我来说都不重要)。
现在,从表面上看,我可以在我的ThreadPoolExecutor
使用PriorityBlockingQueue
作为任务队列,但该队列包含Runnable
对象,这可能是也可能不是我已经提交给它的Runnable
任务。 而且,如果我已经提交了Callable
任务,目前还不清楚这将如何映射。
有没有办法做到这一点? 我真的不想为此而推出自己的产品,因为我更有可能错误地这样做。
(撇开,是的,我意识到在这样的事情中可能会出现对于优先级较低的工作的匮乏,对于公平合理保证的解决scheme,加分(?!)
乍一看,您似乎可以为扩展Runnable
或Callable<T>
和Comparable
任务定义一个接口。 然后用PriorityBlockingQueue
作为队列包装一个ThreadPoolExecutor
,并且只接受实现你的接口的任务。
考虑到你的评论,看起来像一个选项是扩展ThreadPoolExecutor
,并覆盖submit()
方法。 请参阅AbstractExecutorService
以查看默认的外观。 他们所做的只是在FutureTask
包装Runnable
或Callable
并execute()
它。 我可能会通过编写一个实现ExecutorService
的包装器类来委托给一个匿名的ThreadPoolExecutor
。 把它们包装在你的优先事项中,以便你的Comparator
可以得到它。
我已经以合理的方式解决了这个问题,我将在下面对它进行描述,以供将来参考我自己和其他任何使用Java Concurrent库时遇到此问题的人员。
使用PriorityBlockingQueue
作为保持任务供以后执行的手段的确是一个正确的方向。 问题是PriorityBlockingQueue
必须一般实例化为包含Runnable
实例,并且不可能在Runnable
接口上调用compareTo
(或类似)。
在解决这个问题上。 在创buildExecutor时,必须给它一个PriorityBlockingQueue
。 该队列应进一步给定一个自定义的比较器,以适当的地方sorting:
new PriorityBlockingQueue<Runnable>(size, new CustomTaskComparator());
现在,查看CustomTaskComparator
:
public class CustomTaskComparator implements Comparator<MyType> { @Override public int compare(MyType first, MyType second) { return comparison; } }
一切都看起来很直截了当。 这里有点粘。 我们接下来的问题是处理从Executor创buildFutureTasks。 在Executor中,我们必须重写newTaskFor
如下所示:
@Override protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) { //Override the default FutureTask creation and retrofit it with //a custom task. This is done so that prioritization can be accomplished. return new CustomFutureTask(c); }
c
是我们试图执行的Callable
任务。 现在,我们来看看CustomFutureTask
:
public class CustomFutureTask extends FutureTask { private CustomTask task; public CustomFutureTask(Callable callable) { super(callable); this.task = (CustomTask) callable; } public CustomTask getTask() { return task; } }
注意getTask
方法。 稍后我们将使用它来抓取我们创build的CustomFutureTask
的原始任务。
最后,让我们修改我们尝试执行的原始任务:
public class CustomTask implements Callable<MyType>, Comparable<CustomTask> { private final MyType myType; public CustomTask(MyType myType) { this.myType = myType; } @Override public MyType call() { //Do some things, return something for FutureTask implementation of `call`. return myType; } @Override public int compareTo(MyType task2) { return new CustomTaskComparator().compare(this.myType, task2.myType); } }
您可以看到我们在任务中实现了Comparable
,将其委托给实际的MyType
Comparator
器。
在那里,您可以使用Java库为Executor自定义优先级! 这需要一些弯曲,但是我能够想出最清洁的。 我希望这对某人有帮助!
你可以使用这些辅助类:
public class PriorityFuture<T> implements RunnableFuture<T> { private RunnableFuture<T> src; private int priority; public PriorityFuture(RunnableFuture<T> other, int priority) { this.src = other; this.priority = priority; } public int getPriority() { return priority; } public boolean cancel(boolean mayInterruptIfRunning) { return src.cancel(mayInterruptIfRunning); } public boolean isCancelled() { return src.isCancelled(); } public boolean isDone() { return src.isDone(); } public T get() throws InterruptedException, ExecutionException { return src.get(); } public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return src.get(timeout, unit); } public void run() { src.run(); } public static Comparator<Runnable> COMP = new Comparator<Runnable>() { public int compare(Runnable o1, Runnable o2) { if (o1 == null && o2 == null) return 0; else if (o1 == null) return -1; else if (o2 == null) return 1; else { int p1 = ((PriorityFuture<?>) o1).getPriority(); int p2 = ((PriorityFuture<?>) o2).getPriority(); return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1); } } }; }
和
public interface PriorityCallable<T> extends Callable<T> { int getPriority(); }
和这个帮手方法:
public static ThreadPoolExecutor getPriorityExecutor(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) { protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { RunnableFuture<T> newTaskFor = super.newTaskFor(callable); return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority()); } }; }
然后像这样使用它:
class LenthyJob implements PriorityCallable<Long> { private int priority; public LenthyJob(int priority) { this.priority = priority; } public Long call() throws Exception { System.out.println("Executing: " + priority); long num = 1000000; for (int i = 0; i < 1000000; i++) { num *= Math.random() * 1000; num /= Math.random() * 1000; if (num == 0) num = 1000000; } return num; } public int getPriority() { return priority; } } public class TestPQ { public static void main(String[] args) throws InterruptedException, ExecutionException { ThreadPoolExecutor exec = getPriorityExecutor(2); for (int i = 0; i < 20; i++) { int priority = (int) (Math.random() * 100); System.out.println("Scheduling: " + priority); LenthyJob job = new LenthyJob(priority); exec.submit(job); } } }
我将尝试用全function的代码来解释这个问题。 但在深入代码之前,我想解释一下PriorityBlockingQueue
PriorityBlockingQueue :PriorityBlockingQueue是BlockingQueue的一个实现。 它接受任务和优先级,并首先提交具有最高优先级的任务。 如果任何两个任务具有相同的优先级,那么我们需要提供一些自定义逻辑来决定哪个任务先行。
现在让我们直接进入代码。
驱动程序类 :这个类创build一个执行程序,接受任务,然后提交它们执行。 在这里我们创build两个任务,一个用低优先级,另一个用高优先级。 在这里,我们告诉执行者运行1个线程的MAX并使用PriorityBlockingQueue。
public static void main(String[] args) { /* Minimum number of threads that must be running : 0 Maximium number of threads that can be created : 1 If a thread is idle, then the minimum time to keep it alive : 1000 Which queue to use : PriorityBlockingQueue */ PriorityBlockingQueue queue = new PriorityBlockingQueue(); ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1, 1000, TimeUnit.MILLISECONDS,queue); MyTask task = new MyTask(Priority.LOW,"Low"); executor.execute(new MyFutureTask(task)); task = new MyTask(Priority.HIGH,"High"); executor.execute(new MyFutureTask(task)); task = new MyTask(Priority.MEDIUM,"Medium"); executor.execute(new MyFutureTask(task)); }
MyTask类 :MyTask实现了Runnable,并在构造函数中接受优先级作为参数。 当这个任务运行时,它打印一条消息,然后使线程hibernate1秒钟。
public class MyTask implements Runnable { public int getPriority() { return priority.getValue(); } private Priority priority; public String getName() { return name; } private String name; public MyTask(Priority priority,String name){ this.priority = priority; this.name = name; } @Override public void run() { System.out.println("The following Runnable is getting executed "+getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
MyFutureTask类 :因为我们使用PriorityBlocingQueue来保存我们的任务,所以我们的任务必须包装在FutureTask中,我们的FutureTask实现必须实现Comparable接口。 Comparable接口比较两个不同任务的优先级,并提交具有最高执行优先级的任务。
public class MyFutureTask extends FutureTask<MyFutureTask> implements Comparable<MyFutureTask> { private MyTask task = null; public MyFutureTask(MyTask task){ super(task,null); this.task = task; } @Override public int compareTo(MyFutureTask another) { return task.getPriority() - another.task.getPriority(); } }
优先级 :自我解释优先级。
public enum Priority { HIGHEST(0), HIGH(1), MEDIUM(2), LOW(3), LOWEST(4); int value; Priority(int val) { this.value = val; } public int getValue(){ return value; } }
现在,当我们运行这个例子时,我们得到以下输出
The following Runnable is getting executed High The following Runnable is getting executed Medium The following Runnable is getting executed Low
尽pipe我们先提交了低优先级,但稍后再提交高优先级任务,但由于我们使用的是PriorityBlockingQueue,因此任何具有更高优先级的任务都将首先执行。
我的解决scheme保留了相同优先级的任务的子级顺序。 这是对这个答案的改进
任务执行顺序基于:
- 优先
- 提交订单(在相同的优先级)
testing人员类别:
public class Main { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executorService = PriorityExecutors.newFixedThreadPool(1); //Priority=0 executorService.submit(newCallable("A1", 200)); //Defaults to priority=0 executorService.execute(newRunnable("A2", 200)); //Defaults to priority=0 executorService.submit(PriorityCallable.of(newCallable("A3", 200), 0)); executorService.submit(PriorityRunnable.of(newRunnable("A4", 200), 0)); executorService.execute(PriorityRunnable.of(newRunnable("A5", 200), 0)); executorService.submit(PriorityRunnable.of(newRunnable("A6", 200), 0)); executorService.execute(PriorityRunnable.of(newRunnable("A7", 200), 0)); executorService.execute(PriorityRunnable.of(newRunnable("A8", 200), 0)); //Priority=1 executorService.submit(PriorityRunnable.of(newRunnable("B1", 200), 1)); executorService.submit(PriorityRunnable.of(newRunnable("B2", 200), 1)); executorService.submit(PriorityCallable.of(newCallable("B3", 200), 1)); executorService.execute(PriorityRunnable.of(newRunnable("B4", 200), 1)); executorService.submit(PriorityRunnable.of(newRunnable("B5", 200), 1)); executorService.shutdown(); } private static Runnable newRunnable(String name, int delay) { return new Runnable() { @Override public void run() { System.out.println(name); sleep(delay); } }; } private static Callable<Integer> newCallable(String name, int delay) { return new Callable<Integer>() { @Override public Integer call() throws Exception { System.out.println(name); sleep(delay); return 10; } }; } private static void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } } }
结果:
A1 B1 B2 B3 B4 B5 A2 A3 A4 A5 A6 A7 A8
第一个任务是A1,因为插入队列时没有更高的优先级。 B任务是先执行的1个优先级,A任务是0优先执行,但是执行顺序是子命令顺序:B1,B2,B3,… A2,A3,A4 …
解决scheme:
public class PriorityExecutors { public static ExecutorService newFixedThreadPool(int nThreads) { return new PriorityExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS); } private static class PriorityExecutor extends ThreadPoolExecutor { private static final int DEFAULT_PRIORITY = 0; private static AtomicLong instanceCounter = new AtomicLong(); @SuppressWarnings({"unchecked"}) public PriorityExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10, ComparableTask.comparatorByPriorityAndSequentialOrder())); } @Override public void execute(Runnable command) { // If this is ugly then delegator pattern needed if (command instanceof ComparableTask) //Already wrapped super.execute(command); else { super.execute(newComparableRunnableFor(command)); } } private Runnable newComparableRunnableFor(Runnable runnable) { return new ComparableRunnable(ensurePriorityRunnable(runnable)); } @Override protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new ComparableFutureTask<>(ensurePriorityCallable(callable)); } @Override protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value); } private <T> PriorityCallable<T> ensurePriorityCallable(Callable<T> callable) { return (callable instanceof PriorityCallable) ? (PriorityCallable<T>) callable : PriorityCallable.of(callable, DEFAULT_PRIORITY); } private PriorityRunnable ensurePriorityRunnable(Runnable runnable) { return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable : PriorityRunnable.of(runnable, DEFAULT_PRIORITY); } private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask { private Long sequentialOrder = instanceCounter.getAndIncrement(); private HasPriority hasPriority; public ComparableFutureTask(PriorityCallable<T> priorityCallable) { super(priorityCallable); this.hasPriority = priorityCallable; } public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) { super(priorityRunnable, result); this.hasPriority = priorityRunnable; } @Override public long getInstanceCount() { return sequentialOrder; } @Override public int getPriority() { return hasPriority.getPriority(); } } private static class ComparableRunnable implements Runnable, ComparableTask { private Long instanceCount = instanceCounter.getAndIncrement(); private HasPriority hasPriority; private Runnable runnable; public ComparableRunnable(PriorityRunnable priorityRunnable) { this.runnable = priorityRunnable; this.hasPriority = priorityRunnable; } @Override public void run() { runnable.run(); } @Override public int getPriority() { return hasPriority.getPriority(); } @Override public long getInstanceCount() { return instanceCount; } } private interface ComparableTask extends Runnable { int getPriority(); long getInstanceCount(); public static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() { return (o1, o2) -> { int priorityResult = o2.getPriority() - o1.getPriority(); return priorityResult != 0 ? priorityResult : (int) (o1.getInstanceCount() - o2.getInstanceCount()); }; } } } private static interface HasPriority { int getPriority(); } public interface PriorityCallable<V> extends Callable<V>, HasPriority { public static <V> PriorityCallable<V> of(Callable<V> callable, int priority) { return new PriorityCallable<V>() { @Override public V call() throws Exception { return callable.call(); } @Override public int getPriority() { return priority; } }; } } public interface PriorityRunnable extends Runnable, HasPriority { public static PriorityRunnable of(Runnable runnable, int priority) { return new PriorityRunnable() { @Override public void run() { runnable.run(); } @Override public int getPriority() { return priority; } }; } } }
是否有可能有一个ThreadPoolExecutor为每个级别的优先级? 一个ThreadPoolExecutor可以被一个ThreadFactory实例化,并且你可以拥有一个ThreadFactory的实现来设置不同的优先级。
class MaxPriorityThreadFactory implements ThreadFactory { public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setPriority(Thread.MAX_PRIORITY); } }