如何在Python中实现一个有效的素数数字生成器?
这不是功课,我只是好奇而已。
无限是这里的关键词。
我希望用它作为primes()中的p。 我相信这是Haskell的一个内置函数。
所以,答案不能像“只是做一个筛子”那样幼稚。
首先,你不知道会消耗多less连续的素数。 那么,假设你可以一次制造100个。 你会使用相同的筛法和质数公式的频率吗?
我更喜欢非并发的方法。
感谢您阅读(和写作))!
我仍然喜欢我在这里写的东西 (与其他许多作者合作的食谱),它显示了埃拉托斯图尼的筛子没有内在的限制,我相信这些评论和讨论是非常清楚的。 这是最近讨论堆栈溢出(search作者的名字,我猜),有人提出了一个快得多(但恕我直言不太清楚)的版本;-)。
“如果我看得更远……”
食谱的erat2
function可以进一步加快(约20-25%):
erat2a
import itertools as it def erat2a( ): D = { } yield 2 for q in it.islice(it.count(3), 0, None, 2): p = D.pop(q, None) if p is None: D[q*q] = q yield q else: # old code here: # x = p + q # while x in D or not (x&1): # x += p # changed into: x = q + 2*p while x in D: x += 2*p D[x] = p
not (x&1)
检查validationx
是奇数。 然而,由于q
和p
都是奇数,所以通过加上2*p
,就可以避免一半的步骤和奇数的testing。
erat3
如果一个人不介意一些额外的erat2
, erat2
可以加快35-40%,有以下变化(NB:由于itertools.compress
函数需要Python 2.7+或Python 3+):
import itertools as it def erat3( ): D = { 9: 3, 25: 5 } yield 2 yield 3 yield 5 MASK= 1, 0, 1, 1, 0, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, MODULOS= frozenset( (1, 7, 11, 13, 17, 19, 23, 29) ) for q in it.compress( it.islice(it.count(7), 0, None, 2), it.cycle(MASK)): p = D.pop(q, None) if p is None: D[q*q] = q yield q else: x = q + 2*p while x in D or (x%30) not in MODULOS: x += 2*p D[x] = p
erat3
函数利用了以下事实:所有的素数(除了erat3
除外)都只能得到8个数字:包含在MODULOS
frozenset中的数字。 因此,在产生最初的三个素数之后,我们从7开始, 只和候选人一起工作。
候选过滤使用itertools.compress
function; “魔法”是在MASK
序列中; MASK
有15个元素( itertools.islice
函数select每30个数字有15个奇数),每个候选从1
,每个候选从1
开始。循环重复itertools.cycle
函数指定的。
候选过滤的引入需要另一个修改: or (x%30) not in MODULOS
检查。 erat2
algorithm处理所有的奇数; 现在erat3
algorithm只处理r30个候选者,所以我们需要确保所有的D.keys()
只能是这样的候选者。
基准
结果
在Atom 330 Ubuntu 9.10服务器上,版本2.6.4和3.1.1+:
$ testit up to 8192 ==== python2 erat2 ==== 100 loops, best of 3: 18.6 msec per loop ==== python2 erat2a ==== 100 loops, best of 3: 14.5 msec per loop ==== python2 erat3 ==== Traceback (most recent call last): … AttributeError: 'module' object has no attribute 'compress' ==== python3 erat2 ==== 100 loops, best of 3: 19.2 msec per loop ==== python3 erat2a ==== 100 loops, best of 3: 14.1 msec per loop ==== python3 erat3 ==== 100 loops, best of 3: 11.7 msec per loop
在AMD Geode LX Gentoo家庭服务器上,Python 2.6.5和3.1.2:
$ testit up to 8192 ==== python2 erat2 ==== 10 loops, best of 3: 104 msec per loop ==== python2 erat2a ==== 10 loops, best of 3: 81 msec per loop ==== python2 erat3 ==== Traceback (most recent call last): … AttributeError: 'module' object has no attribute 'compress' ==== python3 erat2 ==== 10 loops, best of 3: 116 msec per loop ==== python3 erat2a ==== 10 loops, best of 3: 82 msec per loop ==== python3 erat3 ==== 10 loops, best of 3: 66 msec per loop
基准代码
primegen.py
模块包含erat2
, erat2a
和erat3
函数。 以下是testing脚本:
#!/bin/sh max_num=${1:-8192} echo up to $max_num for python_version in python2 python3 do for function in erat2 erat2a erat3 do echo "==== $python_version $function ====" $python_version -O -m timeit -c \ -s "import itertools as it, functools as ft, operator as op, primegen; cmp= ft.partial(op.ge, $max_num)" \ "next(it.dropwhile(cmp, primegen.$function()))" done done
由于OP要求高效的实现,下面是David Eppstein / Alex Martelli在2002年的活跃状态代码中的一个重大改进(在这里可以看到他的答案 ): 不要在字典中logging素数的信息,直到它的正方形在候选人 。 对于所产生的n个素数( π(sqrt(n log n)) 〜2 sqrt(n log n)/ log(n log n) 〜2,将空间复杂度降低到低于O(sqrt(n))而不是O(n) 2 sqrt(n / log n) )。 因此,时间复杂性也得到改善,即运行速度更快 。
创build一个“滑动筛”作为每个基本素数的当前倍数(即,低于当前生产点的sqrt)的字典,以及它们的步长值:
from itertools import count # ideone.com/aVndFM def postponed_sieve(): # postponed sieve, by Will Ness yield 2; yield 3; yield 5; yield 7; # original code David Eppstein, sieve = {} # Alex Martelli, ActiveState Recipe 2002 ps = postponed_sieve() # a separate base Primes Supply: p = next(ps) and next(ps) # (3) a Prime to add to dict q = p*p # (9) its sQuare for c in count(9,2): # the Candidate if c in sieve: # c's a multiple of some base prime s = sieve.pop(c) # ie a composite ; or elif c < q: yield c # a prime continue else: # (c==q): # or the next base prime's square: s=count(q+2*p,2*p) # (9+6, by 6 : 15,21,27,33,...) p=next(ps) # (5) q=p*p # (25) for m in s: # the next multiple if m not in sieve: # no duplicates break sieve[m] = s # original test entry: ideone.com/WFv4f
(这里的旧的,原始的代码被编辑成包含Tim Peters在下面的答案中看到的变化)。 有关相关讨论,也请参阅本文 。
类似的2-3-5-7基于轮子的代码运行〜2.15倍更快 (这非常接近3/2 * 5/4 * 7/6 = 2.1875
的理论改进)。
对于后人来说,这里是Will Ness对Python 3美丽algorithm的重写。需要做一些改变(迭代器不再有.next()
方法,但是有一个新的next()
内build函数)。 其他的改变是为了好玩(使用yield from <iterable>
的新的yield from <iterable>
replace原来的四个yield
语句,更多的是为了可读性(我不是过度使用的粉丝;-)单字母variables名)。
它比原来的要快得多,但不是出于algorithm的原因。 加速主要是由于删除原来的add()
函数,而不是内联。
def psieve(): import itertools yield from (2, 3, 5, 7) D = {} ps = psieve() next(ps) p = next(ps) assert p == 3 psq = p*p for i in itertools.count(9, 2): if i in D: # composite step = D.pop(i) elif i < psq: # prime yield i continue else: # composite, = p*p assert i == psq step = 2*p p = next(ps) psq = p*p i += step while i in D: i += step D[i] = step
这不是我的代码,但是,这是值得发布。 原文可以在这里find: http : //code.activestate.com/recipes/117119/
def gen_primes(): D = {} q = 2 # first integer to test for primality. while True: if q not in D: # not marked composite, must be prime yield q #first multiple of q not already marked D[q * q] = [q] else: for p in D[q]: D.setdefault(p + q, []).append(p) # no longer need D[q], free memory del D[q] q += 1
这是一个发电机,所以使用它像任何其他。
primes = gen_primes() for p in primes: print p
我的桌面上需要1.62秒的时间才能生成一百万个素数。
做一个分段筛,其中段的大小由可用内存或bitset的最大大小决定。
对于每个分段表示一些区间[n; n + segment_size)作为一个位集,并筛选所有素数低于上限的平方根。
使用比特集比使用哈希表或树数据结构的内存less,因为您正在使用密集的数字集。
这是相似的,但比erat2a快大约5%。
import itertools as it def erat2b( ): D = { } yield 2 for q in it.islice(it.count(3), 0, None, 2): p = D.pop(q, None) if p is None: D[q*q] = q yield q else: # Only this part is replaced. q += p while q in D: q += p D[q] = p
还有一个答案,比我的erat3
答案更有记忆效率:
import heapq def heapprimegen(): hp= [] yield 2 yield 3 cn= 3 nn, inc= 3, 6 while 1: while cn < nn: yield cn heapq.heappush(hp, (3*cn, 2*cn)) cn+= 2 cn= nn+2 nn, inc= heapq.heappushpop(hp, (nn+inc, inc))
它维护一个主要倍数堆(列表)而不是字典。 它显然失去了一些速度。
这是一个复杂的基于堆的实现,它不比其他基于堆的实现速度快(请参阅我的另一个答案中的速度比较),但它使用的内存less得多。
这个实现使用了两个堆(tu和wv),它们包含相同的数字元素。 每个元素是一个int对。 为了find所有素数达到q**2
(其中q
是一个素数),每个堆将包含至多2*pi(q-1)
元素,其中pi(x)
是不大于正数的素数x
。 所以整数的总数至多是4*pi(floor(sqrt(n)))
。 (我们可以通过把一半的内容推到堆上来获得内存中的2倍,但这会使algorithm变慢。)
其他基于字典和堆的方法(例如,erat2b和heap_prime_gen_squares和heapprimegen)存储大约2 * pi(n)个整数,因为每当他们find一个整数时,它们就会扩展它们的堆或字典。 作为比较:find1_000_000素数,这个实现存储less于4141个整数,其他实现存储超过1_000_000个整数。
import heapq def heap_prime_gen_smallmem(): yield 2 yield 3 f = 5 fmar3 = 2 q = 7 q6 = 7 * 6 qmar3 = 4 tu = [(25, 30), (35, 30)] vw = [(25, 30), (35, 30)] while True: qmar3 += 2 if qmar3 == 6: qb = q + 4 q6b = q6 + 24 qmar3 = 2 else: qb = q + 2 q6b = q6 + 12 if q < tu[0][0]: d = q * q while f < d: a, b = vw[0] if f < a: yield f else: a, b = vw[0] heapq.heapreplace(vw, (a + b, b)) a, b = vw[0] while f >= a: heapq.heapreplace(vw, (a + b, b)) a, b = vw[0] fmar3 += 2 if fmar3 == 6: f += 4 fmar3 = 2 else: f += 2 c = q * qb heapq.heappush(tu, (d, q6)) heapq.heappush(tu, (c, q6)) heapq.heappush(vw, (d, q6)) heapq.heappush(vw, (c, q6)) else: a, b = tu[0] heapq.heapreplace(tu, (a + b, b)) a, b = tu[0] while q >= a: heapq.heapreplace(tu, (a + b, b)) a, b = tu[0] q = qb q6 = q6b
下面是一个对Haskell如何完成的生成器:对已知素数的组合进行过滤,然后将剩余的素数添加到列表中。
def gen_primes(): primes = [] i = 2 while True: prime = True for p in primes: if not (i % p): prime = False break if prime: yield i primes.append(i) i += 1
我之前写了一篇关于无限素数生成器的文章:
http://stacktrace.it/2008/01/progetto-eulero-problema-3/
这是意大利语,但你可能有一个讨厌的翻译使用谷歌: http : //tinyurl.com/yzpyeom
另一种方法来做到这一点:
import itertools def primeseq(): prime = [2] num = 0 yield 2 for i in itertools.count(3, 2): is_prime = True for num in prime: if i % num == 0: is_prime = False break elif num ** 2 > i: break if is_prime: prime.append(i) yield i
这是一个简单但不是非常慢的使用堆而不是字典:
import heapq def heap_prime_gen_squares(): yield 2 yield 3 h = [(9, 6)] n = 5 while True: a, b = h[0] while n < a: yield n heapq.heappush(h, (n * n, n << 1)) n += 2 heapq.heapreplace(h, (a + b, b)) # Replace h[0], which is still (a, b).
我对前100万个素数的用户时间的速度测量(较小的数字更好):
- postponed_sieve(基于字典):8.553s
- erat2b(以字典为基础):9.513s
- erat2a(以字典为基础):10.313s
- heap_prime_gen_smallmem(基于堆):23.935s
- heap_prime_gen_squares(基于堆):27.302s
- heapprimegen(字典为基础):145.029s
所以基于字典的方法似乎是最快的。
这是一个非常快速的无限生成器,用Python2编写,但很容易调整到Python3。 要使用它来添加最多10 ** 9的素数,请使用以下内容:
from itertools import takewhile from functools import partial from operator import gt print (sum(takewhile(partial(gt, 10**9), prime_gen_inf())))
这是一个分段筛,比Will Ness的algorithm更快,但明显不那么优雅。
from operator import mul from functools import reduce def prod(x): return reduce(mul, x, 1) def build_sieve(wheel): w = prod(wheel) w_phi = prod([p-1 for p in wheel]) rems = [a for a in range(w) if all(a % p for p in wheel)] assert len(rems) == w_phi inv = {a:pow(a, w_phi - 1, w) for a in rems} try: known_p = wheel + rems[1 : rems.index(rems[1]*rems[1])] except ValueError: known_p = wheel + rems[1:] return wheel, w, w_phi, rems, inv, known_p #Adjust the chunk variable based on your computer's architecture. # #Adjust the line with #! if you don't need "true" infinite. If you don't need #primes larger than 1<<32, use array('H', []), if 1<<64 use 'L', if 1<<128 (in #Python3) use 'Q', otherwise use empty list []. #To save memory, comment out the lines with #*, and uncomment the commented-out #lines import itertools from itertools import islice, count, compress, izip chain_f = itertools.chain.from_iterable from array import array def prime_gen_inf(chunk=250000, sieve_info = build_sieve([2,3,5,7])): """ Indefinitely yields primes """ wheel, w, w_phi, rems, inv, known_p = sieve_info for p in known_p: yield p new_n = 0; while True: size = min(chunk, (p * p - new_n) / w) sieve = bytearray([1]) * size * w_phi n, new_n = new_n, new_n + size * w if not n: zero = bytearray([0]) seen = len(known_p) - len(wheel) + 1 sieve[:seen:1] = zero * seen p_gen = islice(prime_gen_inf(), len(wheel), None) new_p = next(p_gen) ps = [] #! array('H', []) p_invs = bytearray([]) #* while new_p * new_p < new_n: ps.append(new_p) p_invs.append(inv[new_p % w]) #* new_p = next(p_gen) for p, p_inv, modp in izip(ps, p_invs, [-n % p for p in ps]): #* s = [(modp + p * (p_inv * (r - modp) % w)) / w for r in rems] #* #for p in ps: # s = [(-n%p + p * (inv[p%w] * (r - -n%p) % w)) / w for r in rems] for i, start in enumerate(s): slice_size = ((size - start - 1) / p + 1) sieve[i + start * w_phi :: p * w_phi] = zero * slice_size for p in compress(chain_f(izip(*[count(n+r, w) for r in rems])), sieve): yield p