

  • 具有固定大小的Java PriorityQueue
  • 我如何使用PriorityQueue?
  • 获取数组中n个最小元素的索引
  • Scala:有没有像我在Java中一样使用PriorityQueue的方法?

我有一个非常大的数据集 (超过500万个项目),我需要从它得到N个最大的项目。 最自然的方法是使用堆栈/优先级队列, 只存储前N个项目 。 对于JVM(Scala / Java),有几个优先级队列的实现,即:

  • scala.collection.mutable.PriorityQueue
  • java.util.PriorityQueue中
  • lucene.util.PriorityQueue

前两个是不错的,但他们存储所有的项目,在我的情况下给重要的内存开销。 第三(Lucene的实现)没有这样的缺点,但正如我从文档中看到的,它也不支持自定义比较器,这对我来说是无用的。


UPD。 最后,我根据Peter的回答创build了自己的实现:

 public class FixedSizePriorityQueue<E> extends TreeSet<E> { private int elementsLeft; public FixedSizePriorityQueue(int maxSize) { super(new NaturalComparator()); this.elementsLeft = maxSize; } public FixedSizePriorityQueue(int maxSize, Comparator<E> comparator) { super(comparator); this.elementsLeft = maxSize; } /** * @return true if element was added, false otherwise * */ @Override public boolean add(E e) { if (elementsLeft == 0 && size() == 0) { // max size was initiated to zero => just return false return false; } else if (elementsLeft > 0) { // queue isn't full => add element and decrement elementsLeft boolean added = super.add(e); if (added) { elementsLeft--; } return added; } else { // there is already 1 or more elements => compare to the least int compared = super.comparator().compare(e, this.first()); if (compared == 1) { // new element is larger than the least in queue => pull the least and add new one to queue pollFirst(); super.add(e); return true; } else { // new element is less than the least in queue => return false return false; } } } } 




它的抽象,你必须实现抽象的方法lessThan(T a, T b)

虽然是一个老问题,但对别人可能有帮助。 您可以使用Google的Java库guava的minMaxPriorityQueue 。


不同之处在于比较器,但是如果从PriorityQueue扩展,您将拥有它。 并在每个添加检查,如果你还没有达到极限,如果你有 – 删除最后一个项目。

以下是我以前使用的实现。 符合彼得的build议。

  public @interface NonThreadSafe { } /** * A priority queue implementation with a fixed size based on a {@link TreeMap}. * The number of elements in the queue will be at most {@code maxSize}. * Once the number of elements in the queue reaches {@code maxSize}, trying to add a new element * will remove the greatest element in the queue if the new element is less than or equal to * the current greatest element. The queue will not be modified otherwise. */ @NonThreadSafe public static class FixedSizePriorityQueue<E> { private final TreeSet<E> treeSet; /* backing data structure */ private final Comparator<? super E> comparator; private final int maxSize; /** * Constructs a {@link FixedSizePriorityQueue} with the specified {@code maxSize} * and {@code comparator}. * * @param maxSize - The maximum size the queue can reach, must be a positive integer. * @param comparator - The comparator to be used to compare the elements in the queue, must be non-null. */ public FixedSizePriorityQueue(final int maxSize, final Comparator<? super E> comparator) { super(); if (maxSize <= 0) { throw new IllegalArgumentException("maxSize = " + maxSize + "; expected a positive integer."); } if (comparator == null) { throw new NullPointerException("Comparator is null."); } this.treeSet = new TreeSet<E>(comparator); this.comparator = treeSet.comparator(); this.maxSize = maxSize; } /** * Adds an element to the queue. If the queue contains {@code maxSize} elements, {@code e} will * be compared to the greatest element in the queue using {@code comparator}. * If {@code e} is less than or equal to the greatest element, that element will be removed and * {@code e} will be added instead. Otherwise, the queue will not be modified * and {@code e} will not be added. * * @param e - Element to be added, must be non-null. */ public void add(final E e) { if (e == null) { throw new NullPointerException("e is null."); } if (maxSize <= treeSet.size()) { final E firstElm = treeSet.first(); if (comparator.compare(e, firstElm) < 1) { return; } else { treeSet.pollFirst(); } } treeSet.add(e); } /** * @return Returns a sorted view of the queue as a {@link Collections#unmodifiableList(java.util.List)} * unmodifiableList. */ public List<E> asList() { return Collections.unmodifiableList(new ArrayList<E>(treeSet)); } } 


编辑:看起来像使用TreeSet不是很有效,毕竟是因为调用first()似乎需要次线性时间。 我将TreeSet更改为PriorityQueue 。 修改后的add()方法如下所示:

  /** * Adds an element to the queue. If the queue contains {@code maxSize} elements, {@code e} will * be compared to the lowest element in the queue using {@code comparator}. * If {@code e} is greater than or equal to the lowest element, that element will be removed and * {@code e} will be added instead. Otherwise, the queue will not be modified * and {@code e} will not be added. * * @param e - Element to be added, must be non-null. */ public void add(final E e) { if (e == null) { throw new NullPointerException("e is null."); } if (maxSize <= priorityQueue.size()) { final E firstElm = priorityQueue.peek(); if (comparator.compare(e, firstElm) < 1) { return; } else { priorityQueue.poll(); } } priorityQueue.add(e); } 

正是我在找什么。 但是,实现包含一个错误:

即:如果elementsLeft> 0和e已经包含在TreeSet中。 在这种情况下,elementsLeft减less,但TreeSet中元素的数量保持不变。


  } else if (elementsLeft > 0) { // queue isn't full => add element and decrement elementsLeft boolean added = super.add(e); if (added) { elementsLeft--; } return added; 


 public class BoundedPQueue<E extends Comparable<E>> { /** * Lock used for all public operations */ private final ReentrantLock lock; PriorityBlockingQueue<E> queue ; int size = 0; public BoundedPQueue(int capacity){ queue = new PriorityBlockingQueue<E>(capacity, new CustomComparator<E>()); size = capacity; this.lock = new ReentrantLock(); } public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); E vl = null; if(queue.size()>= size) { vl= queue.poll(); if(vl.compareTo(e)<0) e=vl; } try { return queue.offer(e); } finally { lock.unlock(); } } public E poll() { return queue.poll(); } public static class CustomComparator<E extends Comparable<E>> implements Comparator<E> { @Override public int compare(E o1, E o2) { //give me a max heap return o1.compareTo(o2) *-1; } } } 

如果你有番石榴,这是我放在一起的一个。 我认为这是相当完整的。 让我知道如果我错过了什么。

您可以使用gauva ForwardingBlockingQueue,因此您不必映射所有其他方法。

 import com.google.common.util.concurrent.ForwardingBlockingQueue; public class PriorityBlockingQueueDecorator<E> extends ForwardingBlockingQueue<E> { public static final class QueueFullException extends IllegalStateException { private static final long serialVersionUID = -9218216017510478441L; } private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; private int maxSize; private PriorityBlockingQueue<E> delegate; public PriorityBlockingQueueDecorator(PriorityBlockingQueue<E> delegate) { this(MAX_ARRAY_SIZE, delegate); } public PriorityBlockingQueueDecorator(int maxSize, PriorityBlockingQueue<E> delegate) { this.maxSize = maxSize; this.delegate = delegate; } @Override protected BlockingQueue<E> delegate() { return delegate; } @Override public boolean add(E element) { return offer(element); } @Override public boolean addAll(Collection<? extends E> collection) { boolean modified = false; for (E e : collection) if (add(e)) modified = true; return modified; } @Override public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { return offer(e); } @Override public boolean offer(E o) { if (maxSize > size()) { throw new QueueFullException(); } return super.offer(o); } } 

创build一个具有大小限制的PriorityQueue。 它存储N个最大号码。

 import java.util.*; class Demo { public static <E extends Comparable<E>> PriorityQueue<E> getPq(final int n, Comparator<E> comparator) { return new PriorityQueue<E>(comparator) { boolean full() { return size() >= n; } @Override public boolean add(E e) { if (!full()) { return super.add(e); } else if (peek().compareTo(e) < 0) { poll(); return super.add(e); } return false; } @Override public boolean offer(E e) { if (!full()) { return super.offer(e); } else if (peek().compareTo(e) < 0) { poll(); return super.offer(e); } return false; } }; } public static void printq(PriorityQueue pq) { Object o = null; while ((o = pq.poll()) != null) { System.out.println(o); } } public static void main (String[] args) { PriorityQueue<Integer> pq = getPq(2, new Comparator<Integer>(){ @Override public int compare(Integer i1, Integer i2) { return i1.compareTo(i2); } }); pq.add(4); pq.add(1); pq.add(5); pq.add(2); printq(pq); } }