LinkedBlockingQueue vs ConcurrentLinkedQueue
我的问题与之前提到的这个问题有关 。 在我使用生产者和消费者线程之间的通信队列的情况下,人们通常会推荐使用LinkedBlockingQueue
或ConcurrentLinkedQueue
?
使用其中一个的优点/缺点是什么?
我从APIangular度看到的主要区别是LinkedBlockingQueue
可以有select地绑定。
对于一个生产者/消费者线程,我不确定ConcurrentLinkedQueue
是一个合理的select – 它没有实现BlockingQueue
,这是生产者/消费者队列IMO的基本接口。 你必须调用poll()
,等一下,如果你没有find任何东西,然后再次轮询等等,导致延迟,当一个新的项目进来时,效率低下时(由于不必要的唤醒从睡觉)。
来自BlockingQueue的文档:
BlockingQueue
实现被devise为主要用于生产者 – 消费者队列
我知道这并不是说只有阻塞队列应该用于生产者 – 消费者队列,但即使如此…
这个问题值得一个更好的答案。
Java的ConcurrentLinkedQueue
基于Maged M. Michael和Michael L. Scott着名的非阻塞无锁队列algorithm。
争用资源(我们的队列)中的“非阻塞”作为一个术语,意味着无论平台的调度程序如何中断线程,或者如果线程太慢,其他线程将争用相同的资源仍然能够进步。 例如,如果涉及锁,持有该锁的线程可能被中断,等待该锁的所有线程都将被阻塞。 Java中的内部锁( synchronized
关键字)也可能带来严重的性能损失 – 例如在涉及偏向locking的情况下,您确实存在争用时,或者在虚拟机在旋转宽限期后决定“locking”锁之后阻止竞争线程……这就是为什么在许多情况下(低/中等争用的场景),对primefaces引用进行比较和设置可能会更有效率,这正是许多非阻塞数据结构正在做的事情。
Java的ConcurrentLinkedQueue
不仅是非阻塞的,而且它具有生产者不与消费者竞争的优秀属性。 在单一的生产者/单一消费者情景(SPSC)中,这确实意味着不会有争论。 在多生产者/单一消费者的情况下,消费者不会与生产者竞争。 当多个生产者试图offer()
,这个队列确实有争用,但是这是定义的并发。 这基本上是一个通用和高效的非阻塞队列。
至于它不是一个BlockingQueue
,好吧,阻塞一个线程等待队列是一个奇怪的糟糕的devise并发系统的方式。 别。 如果你不知道如何在消费者/生产者场景中使用ConcurrentLinkedQueue
,那么就切换到更高层次的抽象,就像一个好的actor框架一样。
LinkedBlockingQueue
在队列为空或已满并且相应的消费者/生产者线程进入睡眠状态时阻止使用者或生产者。 但是这个阻塞function带来了一个成本:每个放置或者取出操作都是在生产者或者消费者(如果很多的话)之间进行locking,所以在有许多生产者/消费者的情况下,操作可能会变慢。
ConcurrentLinkedQueue
不使用锁,而是使用CAS对其put / take操作潜在地减less了许多生产者和消费者线程的争用。 但是作为一个“等待空闲”的数据结构, ConcurrentLinkedQueue
在空时不会被阻塞,这意味着消费者需要通过“忙等待”来处理take()
返回null
值,例如消费者线程吃掉了CPU。
那么哪一个“更好”取决于消费者线程的数量,消费/生产的速率等。每个场景都需要一个基准。
ConcurrentLinkedQueue
显然更好的一个特殊用例是生产者首先生产某些东西,然后把工作放在队列中,只有在消费者开始消费之后才能完成他们的工作。 (这里生产者与消费者之间并不是一致的,而只是生产者与消费者之间)
仅供参考,试试这个—一种哈克,不是非常科学,但也许有趣:
import java.util.ArrayList; import java.util.List; import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Supplier; class QueueDemo { private static final int NUM_NODES = 1000000; private static final int NUM_TRIALS = 10; private static final int NUM_THREADS = 8; private static final Object ANY_OBJECT = new Object(); public static void main(String[] args) { Queue<Object> qN = new ConcurrentLinkedQueue<>(); Queue<Object> qB = new LinkedBlockingQueue<>(); Queue<Object> qA = new ArrayBlockingQueue<>(NUM_NODES+1); for (int i=0 ; i<NUM_TRIALS ; i++) { doOneTrial(qN, "non-blocking"); doOneTrial(qB, " blocking"); doOneTrial(qA, " Array"); } } private static void doOneTrial(final Queue<Object> q, String name) { List<CompletableFuture<Integer>> futures = new ArrayList<>(NUM_THREADS); CountDownLatch startSignal = new CountDownLatch(1); CountDownLatch doneSignal = new CountDownLatch(NUM_THREADS); fillQueue(q); for (int i=0 ; i<NUM_THREADS ; i++) { futures.add(CompletableFuture.supplyAsync(new Supplier<Integer>() { public Integer get() { int count = 0; wait4(startSignal); while (q.poll() != null) count++; doneSignal.countDown(); return count; } })); } long startTime = System.currentTimeMillis(); startSignal.countDown(); wait4(doneSignal); long endTime = System.currentTimeMillis(); int count = 0; for (CompletableFuture<Integer> future : futures) { count += future.join(); } if (count == NUM_NODES) { System.out.println(name + ", " + Long.toString(endTime-startTime)); } else { System.out.println("Aieeeeeegh!"); System.exit(1); } } private static void fillQueue(Queue<Object> q) { for (int i=0 ; i<NUM_NODES ; i++) { q.add(ANY_OBJECT); } } private static void wait4(CountDownLatch latch) { try { latch.await(); } catch (InterruptedException ex) { throw new RuntimeException(ex); } } }
如果您的队列是不可扩展的并且只包含一个生产者/消费者线程。 你可以使用无锁队列(你不需要locking数据访问)。
另一个解决scheme(不能很好地扩展)是集合通道:java.util.concurrent SynchronousQueue