从元素有权重的列表中selectk个随机元素
没有任何权重(等概率)的select在这里被精美地描述。
我想知道是否有办法将这种方法转换为加权方法。
我也对其他方法感兴趣。
更新:取样而不更换
我知道这是一个非常古老的问题,但是我认为如果你应用一点math,那么在O(n)时间里做这个就有一个巧妙的把戏!
指数分布有两个非常有用的属性。
-
给定具有不同速率参数的不同指数分布的n个样本,给定样本最小的概率等于其速率参数除以所有速率参数之和。
-
这是“无记忆的”。 所以如果你已经知道了最小值,那么剩下的元素中的任何一个元素就是2nd-to-min的概率与如果真正的min被移除(并且从不产生)的概率是一样的,那么这个元素就是新的分钟。 这似乎是显而易见的,但我认为由于一些条件概率问题,其他分布可能不是这样。
使用事实1,我们知道select一个单一元素可以通过生成速率参数等于权重的指数分布样本,然后select一个具有最小值的分布样本来完成。
使用事实2,我们知道我们不必重新生成指数样本。 相反,只需为每个元素生成一个元素,然后以最低的样本取k元素。
find最低的k可以在O(n)中完成。 使用Quickselectalgorithmfind第k个元素,然后简单地再次遍历所有元素,输出全部低于第k个元素。
有用的注意事项:如果您没有立即访问库来生成指数分布样本,可以通过以下方法轻松完成: -ln(rand())/weight
如果抽样是在replace,你可以使用这个algorithm(这里用Python实现):
import random items = [(10, "low"), (100, "mid"), (890, "large")] def weighted_sample(items, n): total = float(sum(w for w, v in items)) i = 0 w, v = items[0] while n: x = total * (1 - random.random() ** (1.0 / n)) total -= x while x > w: x -= w i += 1 w, v = items[i] w -= x yield v n -= 1
这是O( n + m ),其中m是项目的数量。
为什么这个工作? 它基于以下algorithm:
def n_random_numbers_decreasing(v, n): """Like reversed(sorted(v * random() for i in range(n))), but faster because we avoid sorting.""" while n: v *= random.random() ** (1.0 / n) yield v n -= 1
函数weighted_sample
就是这个algorithm,通过对items
列表的散步进行融合,以挑选那些随机数字所select的项目。
这反过来又是有效的,因为n个随机数0..v都会碰巧小于z的概率是P =( z / v ) n 。 求解z ,得到z = vP 1 / n 。 用一个随机数P代替正确分布的最大数; 我们可以重复这个过程来select所有其他的数字。
如果采样没有replace,可以将所有项目放入二进制堆,每个节点caching该子堆中所有项的权重总和。 build立堆是O( m )。 从堆中select一个随机的项目,重量是O(log m )。 删除该项目并更新caching的总数也是O(log m )。 所以你可以selectO( m + n log m )时间的n个项目。
(注意:这里的“权重”意味着每次select一个元素时,其余的可能性都是以与其权重成正比的概率来select的,并不意味着元素出现在输出中的可能性与其权重成正比)。
这是一个实现,丰富地评论:
import random class Node: # Each node in the heap has a weight, value, and total weight. # The total weight, self.tw, is self.w plus the weight of any children. __slots__ = ['w', 'v', 'tw'] def __init__(self, w, v, tw): self.w, self.v, self.tw = w, v, tw def rws_heap(items): # h is the heap. It's like a binary tree that lives in an array. # It has a Node for each pair in `items`. h[1] is the root. Each # other Node h[i] has a parent at h[i>>1]. Each node has up to 2 # children, h[i<<1] and h[(i<<1)+1]. To get this nice simple # arithmetic, we have to leave h[0] vacant. h = [None] # leave h[0] vacant for w, v in items: h.append(Node(w, v, w)) for i in range(len(h) - 1, 1, -1): # total up the tws h[i>>1].tw += h[i].tw # add h[i]'s total to its parent return h def rws_heap_pop(h): gas = h[1].tw * random.random() # start with a random amount of gas i = 1 # start driving at the root while gas >= h[i].w: # while we have enough gas to get past node i: gas -= h[i].w # drive past node i i <<= 1 # move to first child if gas >= h[i].tw: # if we have enough gas: gas -= h[i].tw # drive past first child and descendants i += 1 # move to second child w = h[i].w # out of gas! h[i] is the selected node. v = h[i].v h[i].w = 0 # make sure this node isn't chosen again while i: # fix up total weights h[i].tw -= w i >>= 1 return v def random_weighted_sample_no_replacement(items, n): heap = rws_heap(items) # just make a heap... for i in range(n): yield rws_heap_pop(heap) # and pop n items off it.
如果取样是在更换,使用轮盘select技术(通常用于遗传algorithm):
- 对权重进行sorting
- 计算累计权重
- 在
[0,1]*totalWeight
select一个随机数 - find这个数字落入的时间间隔
- select具有相应间隔的元素
- 重复
k
次
如果采样没有被replace,则可以通过在每次迭代之后从列表中去除所选元素来适应上述技术,然后对权重进行重新归一化,使得它们的和为1(有效概率分布函数)
我已经在Ruby中完成了
https://github.com/fl00r/pickup
require 'pickup' pond = { "selmon" => 1, "carp" => 4, "crucian" => 3, "herring" => 6, "sturgeon" => 8, "gudgeon" => 10, "minnow" => 20 } pickup = Pickup.new(pond, uniq: true) pickup.pick(3) #=> [ "gudgeon", "herring", "minnow" ] pickup.pick #=> "herring" pickup.pick #=> "gudgeon" pickup.pick #=> "sturgeon"
如果要生成更大的随机整数大数组,可以使用分段线性插值。 例如,使用NumPy / SciPy:
import numpy import scipy.interpolate def weighted_randint(weights, size=None): """Given an n-element vector of weights, randomly sample integers up to n with probabilities proportional to weights""" n = weights.size # normalize so that the weights sum to unity weights = weights / numpy.linalg.norm(weights, 1) # cumulative sum of weights cumulative_weights = weights.cumsum() # piecewise-linear interpolating function whose domain is # the unit interval and whose range is the integers up to n f = scipy.interpolate.interp1d( numpy.hstack((0.0, weights)), numpy.arange(n + 1), kind='linear') return f(numpy.random.random(size=size)).astype(int)
如果您想要取样而不更换,这是无效的。
这是一个从geodns开始的Go实现:
package foo import ( "log" "math/rand" ) type server struct { Weight int data interface{} } func foo(servers []server) { // servers list is already sorted by the Weight attribute // number of items to pick max := 4 result := make([]server, max) sum := 0 for _, r := range servers { sum += r.Weight } for si := 0; si < max; si++ { n := rand.Intn(sum + 1) s := 0 for i := range servers { s += int(servers[i].Weight) if s >= n { log.Println("Picked record", i, servers[i]) sum -= servers[i].Weight result[si] = servers[i] // remove the server from the list servers = append(servers[:i], servers[i+1:]...) break } } } return result }
如果你想从一个加权集合中selectx个元素而不需要replace,那么元素的select的概率与他们的权重成正比:
import random def weighted_choose_subset(weighted_set, count): """Return a random sample of count elements from a weighted set. weighted_set should be a sequence of tuples of the form (item, weight), for example: [('a', 1), ('b', 2), ('c', 3)] Each element from weighted_set shows up at most once in the result, and the relative likelihood of two particular elements showing up is equal to the ratio of their weights. This works as follows: 1.) Line up the items along the number line from [0, the sum of all weights) such that each item occupies a segment of length equal to its weight. 2.) Randomly pick a number "start" in the range [0, total weight / count). 3.) Find all the points "start + n/count" (for all integers n such that the point is within our segments) and yield the set containing the items marked by those points. Note that this implementation may not return each possible subset. For example, with the input ([('a': 1), ('b': 1), ('c': 1), ('d': 1)], 2), it may only produce the sets ['a', 'c'] and ['b', 'd'], but it will do so such that the weights are respected. This implementation only works for nonnegative integral weights. The highest weight in the input set must be less than the total weight divided by the count; otherwise it would be impossible to respect the weights while never returning that element more than once per invocation. """ if count == 0: return [] total_weight = 0 max_weight = 0 borders = [] for item, weight in weighted_set: if weight < 0: raise RuntimeError("All weights must be positive integers") # Scale up weights so dividing total_weight / count doesn't truncate: weight *= count total_weight += weight borders.append(total_weight) max_weight = max(max_weight, weight) step = int(total_weight / count) if max_weight > step: raise RuntimeError( "Each weight must be less than total weight / count") next_stop = random.randint(0, step - 1) results = [] current = 0 for i in range(count): while borders[current] <= next_stop: current += 1 results.append(weighted_set[current][0]) next_stop += step return results
在你所关联的问题中,凯尔的解决scheme将工作在一个简单的泛化。 扫描列表并总计总权重。 那么select元素的概率应该是:
1 – (1 – (#需要/(左边的重量)))/(在n的重量)。 在访问节点之后,从总量中减去它的权重。 另外,如果你需要n并且有n,你必须明确地停止。
你可以检查一切重量1,这简化了凯尔的解决scheme。
编辑:(不得不重新思考两次可能意味着什么)
这与O(n)完全一样,没有多余的内存使用。 我相信这是一个聪明,高效的解决scheme,可以轻松移植到任何语言。 前两行只是在Drupal中填充示例数据。
function getNrandomGuysWithWeight($numitems){ $q = db_query('SELECT id, weight FROM theTableWithTheData'); $q = $q->fetchAll(); $accum = 0; foreach($q as $r){ $accum += $r->weight; $r->weight = $accum; } $out = array(); while(count($out) < $numitems && count($q)){ $n = rand(0,$accum); $lessaccum = NULL; $prevaccum = 0; $idxrm = 0; foreach($q as $i=>$r){ if(($lessaccum == NULL) && ($n <= $r->weight)){ $out[] = $r->id; $lessaccum = $r->weight- $prevaccum; $accum -= $lessaccum; $idxrm = $i; }else if($lessaccum){ $r->weight -= $lessaccum; } $prevaccum = $r->weight; } unset($q[$idxrm]); } return $out; }
我在这里提供一个简单的解决scheme来挑选1个项目,你可以很容易地扩展它的K个项目(Java风格):
double random = Math.random(); double sum = 0; for (int i = 0; i < items.length; i++) { val = items[i]; sum += val.getValue(); if (sum > random) { selected = val; break; } }
我在这里实现了一个类似于Jason Orendorff的Rust的algorithm。 我的版本还支持批量操作:从O(m + log n)
时间的数据结构中插入和删除(当你想删除一堆由它们的ID给出的项目,而不是通过加权selectpath),其中m是数字要删除的项目数量以及储存的项目数量。
我用了一个关联地图(重量,对象)。 例如:
{ (10,"low"), (100,"mid"), (10000,"large") } total=10110
偷看一个介于0和'total'之间的随机数,并迭代这些密钥,直到这个数字符合给定的范围。