数以百万计的3D点:如何find最接近给定点的10个点?
3-d中的点由(x,y,z)定义。 (X,Y,Z)和(x,y,z)之间的距离d是d = Sqrt [(Xx)^ 2 +(Yy)^ 2 +(Zz)^ 2]。 现在文件中有一百万个条目,每个条目都是空间中的某个点,没有特定的顺序。 给定任何点(a,b,c)find最接近的10点。 你将如何存储百万分,你将如何从这个数据结构中检索这10个点。
百万分是less数。 最直接的方法在这里工作(基于KDTree的代码较慢(仅查询一个点))。
暴力方法(时间〜1秒)
#!/usr/bin/env python import numpy NDIM = 3 # number of dimensions # read points into array a = numpy.fromfile('million_3D_points.txt', sep=' ') a.shape = a.size / NDIM, NDIM point = numpy.random.uniform(0, 100, NDIM) # choose random point print 'point:', point d = ((a-point)**2).sum(axis=1) # compute distances ndx = d.argsort() # indirect sort # print 10 nearest points to the chosen one import pprint pprint.pprint(zip(a[ndx[:10]], d[ndx[:10]])) 运行:
 $ time python nearest.py point: [ 69.06310224 2.23409409 50.41979143] [(array([ 69., 2., 50.]), 0.23500677815852947), (array([ 69., 2., 51.]), 0.39542392750839772), (array([ 69., 3., 50.]), 0.76681859086988302), (array([ 69., 3., 50.]), 0.76681859086988302), (array([ 69., 3., 51.]), 0.9272357402197513), (array([ 70., 2., 50.]), 1.1088022980015722), (array([ 70., 2., 51.]), 1.2692194473514404), (array([ 70., 2., 51.]), 1.2692194473514404), (array([ 70., 3., 51.]), 1.801031260062794), (array([ 69., 1., 51.]), 1.8636121147970444)] real 0m1.122s user 0m1.010s sys 0m0.120s 
以下是可生成百万个3D点的脚本:
 #!/usr/bin/env python import random for _ in xrange(10**6): print ' '.join(str(random.randrange(100)) for _ in range(3)) 
输出:
 $ head million_3D_points.txt 18 56 26 19 35 74 47 43 71 82 63 28 43 82 0 34 40 16 75 85 69 88 58 3 0 63 90 81 78 98 
您可以使用该代码来testing更复杂的数据结构和algorithm(例如,它们实际上是消耗更less的内存,还是比上述最简单的方法更快)。 值得注意的是,目前这是唯一包含工作代码的答案。
基于KDTree的解决scheme(时间〜1.4秒)
 #!/usr/bin/env python import numpy NDIM = 3 # number of dimensions # read points into array a = numpy.fromfile('million_3D_points.txt', sep=' ') a.shape = a.size / NDIM, NDIM point = [ 69.06310224, 2.23409409, 50.41979143] # use the same point as above print 'point:', point from scipy.spatial import KDTree # find 10 nearest points tree = KDTree(a, leafsize=a.shape[0]+1) distances, ndx = tree.query([point], k=10) # print 10 nearest points to the chosen one print a[ndx] 
运行:
 $ time python nearest_kdtree.py point: [69.063102240000006, 2.2340940900000001, 50.419791429999997] [[[ 69. 2. 50.] [ 69. 2. 51.] [ 69. 3. 50.] [ 69. 3. 50.] [ 69. 3. 51.] [ 70. 2. 50.] [ 70. 2. 51.] [ 70. 2. 51.] [ 70. 3. 51.] [ 69. 1. 51.]]] real 0m1.359s user 0m1.280s sys 0m0.080s 
部分sorting在C ++(时间约1.1秒)
 // $ g++ nearest.cc && (time ./a.out < million_3D_points.txt ) #include <algorithm> #include <iostream> #include <vector> #include <boost/lambda/lambda.hpp> // _1 #include <boost/lambda/bind.hpp> // bind() #include <boost/tuple/tuple_io.hpp> namespace { typedef double coord_t; typedef boost::tuple<coord_t,coord_t,coord_t> point_t; coord_t distance_sq(const point_t& a, const point_t& b) { // or boost::geometry::distance coord_t x = a.get<0>() - b.get<0>(); coord_t y = a.get<1>() - b.get<1>(); coord_t z = a.get<2>() - b.get<2>(); return x*x + y*y + z*z; } } int main() { using namespace std; using namespace boost::lambda; // _1, _2, bind() // read array from stdin vector<point_t> points; cin.exceptions(ios::badbit); // throw exception on bad input while(cin) { coord_t x,y,z; cin >> x >> y >> z; points.push_back(boost::make_tuple(x,y,z)); } // use point value from previous examples point_t point(69.06310224, 2.23409409, 50.41979143); cout << "point: " << point << endl; // 1.14s // find 10 nearest points using partial_sort() // Complexity: O(N)*log(m) comparisons (O(N)*log(N) worst case for the implementation) const size_t m = 10; partial_sort(points.begin(), points.begin() + m, points.end(), bind(less<coord_t>(), // compare by distance to the point bind(distance_sq, _1, point), bind(distance_sq, _2, point))); for_each(points.begin(), points.begin() + m, cout << _1 << "\n"); // 1.16s } 
运行:
 g++ -O3 nearest.cc && (time ./a.out < million_3D_points.txt ) point: (69.0631 2.23409 50.4198) (69 2 50) (69 2 51) (69 3 50) (69 3 50) (69 3 51) (70 2 50) (70 2 51) (70 2 51) (70 3 51) (69 1 51) real 0m1.152s user 0m1.140s sys 0m0.010s 
C ++中的优先级队列(时间约1.2秒)
 #include <algorithm> // make_heap #include <functional> // binary_function<> #include <iostream> #include <boost/range.hpp> // boost::begin(), boost::end() #include <boost/tr1/tuple.hpp> // get<>, tuple<>, cout << namespace { typedef double coord_t; typedef std::tr1::tuple<coord_t,coord_t,coord_t> point_t; // calculate distance (squared) between points `a` & `b` coord_t distance_sq(const point_t& a, const point_t& b) { // boost::geometry::distance() squared using std::tr1::get; coord_t x = get<0>(a) - get<0>(b); coord_t y = get<1>(a) - get<1>(b); coord_t z = get<2>(a) - get<2>(b); return x*x + y*y + z*z; } // read from input stream `in` to the point `point_out` std::istream& getpoint(std::istream& in, point_t& point_out) { using std::tr1::get; return (in >> get<0>(point_out) >> get<1>(point_out) >> get<2>(point_out)); } // Adaptable binary predicate that defines whether the first // argument is nearer than the second one to given reference point template<class T> class less_distance : public std::binary_function<T, T, bool> { const T& point; public: less_distance(const T& reference_point) : point(reference_point) {} bool operator () (const T& a, const T& b) const { return distance_sq(a, point) < distance_sq(b, point); } }; } int main() { using namespace std; // use point value from previous examples point_t point(69.06310224, 2.23409409, 50.41979143); cout << "point: " << point << endl; const size_t nneighbours = 10; // number of nearest neighbours to find point_t points[nneighbours+1]; // populate `points` for (size_t i = 0; getpoint(cin, points[i]) && i < nneighbours; ++i) ; less_distance<point_t> less_distance_point(point); make_heap (boost::begin(points), boost::end(points), less_distance_point); // Complexity: O(N*log(m)) while(getpoint(cin, points[nneighbours])) { // add points[-1] to the heap; O(log(m)) push_heap(boost::begin(points), boost::end(points), less_distance_point); // remove (move to last position) the most distant from the // `point` point; O(log(m)) pop_heap (boost::begin(points), boost::end(points), less_distance_point); } // print results push_heap (boost::begin(points), boost::end(points), less_distance_point); // O(m*log(m)) sort_heap (boost::begin(points), boost::end(points), less_distance_point); for (size_t i = 0; i < nneighbours; ++i) { cout << points[i] << ' ' << distance_sq(points[i], point) << '\n'; } } 
运行:
 $ g++ -O3 nearest.cc && (time ./a.out < million_3D_points.txt ) point: (69.0631 2.23409 50.4198) (69 2 50) 0.235007 (69 2 51) 0.395424 (69 3 50) 0.766819 (69 3 50) 0.766819 (69 3 51) 0.927236 (70 2 50) 1.1088 (70 2 51) 1.26922 (70 2 51) 1.26922 (70 3 51) 1.80103 (69 1 51) 1.86361 real 0m1.174s user 0m1.180s sys 0m0.000s 
基于线性search的方法(时间〜1.15秒)
 // $ g++ -O3 nearest.cc && (time ./a.out < million_3D_points.txt ) #include <algorithm> // sort #include <functional> // binary_function<> #include <iostream> #include <boost/foreach.hpp> #include <boost/range.hpp> // begin(), end() #include <boost/tr1/tuple.hpp> // get<>, tuple<>, cout << #define foreach BOOST_FOREACH namespace { typedef double coord_t; typedef std::tr1::tuple<coord_t,coord_t,coord_t> point_t; // calculate distance (squared) between points `a` & `b` coord_t distance_sq(const point_t& a, const point_t& b); // read from input stream `in` to the point `point_out` std::istream& getpoint(std::istream& in, point_t& point_out); // Adaptable binary predicate that defines whether the first // argument is nearer than the second one to given reference point class less_distance : public std::binary_function<point_t, point_t, bool> { const point_t& point; public: explicit less_distance(const point_t& reference_point) : point(reference_point) {} bool operator () (const point_t& a, const point_t& b) const { return distance_sq(a, point) < distance_sq(b, point); } }; } int main() { using namespace std; // use point value from previous examples point_t point(69.06310224, 2.23409409, 50.41979143); cout << "point: " << point << endl; less_distance nearer(point); const size_t nneighbours = 10; // number of nearest neighbours to find point_t points[nneighbours]; // populate `points` foreach (point_t& p, points) if (! getpoint(cin, p)) break; // Complexity: O(N*m) point_t current_point; while(cin) { getpoint(cin, current_point); //NOTE: `cin` fails after the last //point, so one can't lift it up to //the while condition // move to the last position the most distant from the // `point` point; O(m) foreach (point_t& p, points) if (nearer(current_point, p)) // found point that is nearer to the `point` //NOTE: could use insert (on sorted sequence) & break instead //of swap but in that case it might be better to use //heap-based algorithm altogether std::swap(current_point, p); } // print results; O(m*log(m)) sort(boost::begin(points), boost::end(points), nearer); foreach (point_t p, points) cout << p << ' ' << distance_sq(p, point) << '\n'; } namespace { coord_t distance_sq(const point_t& a, const point_t& b) { // boost::geometry::distance() squared using std::tr1::get; coord_t x = get<0>(a) - get<0>(b); coord_t y = get<1>(a) - get<1>(b); coord_t z = get<2>(a) - get<2>(b); return x*x + y*y + z*z; } std::istream& getpoint(std::istream& in, point_t& point_out) { using std::tr1::get; return (in >> get<0>(point_out) >> get<1>(point_out) >> get<2>(point_out)); } } 
测量显示,大部分时间都是从文件中读取数组,实际计算的时间更less。
如果一百万个条目已经在一个文件中,则不需要将它们全部加载到内存中的数据结构中。 只要保持一个arrays中的十大点,find目前为止,并扫描超过百万点,更新您的前十名单。
这是点数O(n)。
您可以将点存储在k维树 (kd-tree)中。 Kd树被优化用于最近邻居search(find最接近给定点的n个点)。
我认为这是一个棘手的问题,testing如果你不尝试过分的事情。
考虑一下上面已经给出的最简单的algorithm:保留一个十个最好的候选人的表格,并逐一浏览所有的点。 如果find比十个最好的任何一个更近的点,请将其replace。 什么是复杂性? 那么,我们必须从文件中查看每个点,计算它的距离(或实际距离的平方),并与第10个最近点进行比较。 如果更好的话,把它插入到10个最好的表中的适当位置。
那么复杂性是什么? 我们看一下每一个点,所以它是n的计算和n比较。 如果这一点更好,我们需要把它插入正确的位置,这需要更多的比较,但这是一个不变的因素,因为最佳候选人的表格是一个恒定的大小10。
我们最终得到一个以线性时间运行的algorithm,O(n)中的点数。
但现在考虑一下这种algorithm的下界是什么? 如果input数据中没有顺序,我们必须看看每个点,看它是不是最接近的一个。 所以就我所知,下界是Omega(n),所以上面的algorithm是最优的。
这不是一个功课问题,是吗? 😉
我的想法:遍历所有点,并把它们放到一个最小堆或有界优先级队列中,与目标距离。
 这个问题本质上是testing你的空间分割algorithm的知识和/或直觉。 我认为将数据存储在八叉树中是最好的select。 它常用于处理这类问题的3D引擎(存储数百万个顶点,光线追踪,查找碰撞等)。 在最坏的情况下(我相信),查找时间将按log(n)的顺序排列。 
 不需要计算距离。 距离的正方形应该满足您的需求。 我认为应该更快。 换句话说,你可以跳过sqrt位。 
直截了当的algorithm:
将点存储为元组列表,扫描点,计算距离,并保持“最接近”的列表。
更有创意:
组指向区域(例如“0,0,0”到“50,50,50”或“0,0,0”到“-20,-20,-20”所描述的立方体)可以从目标点“索引”到它们中。 检查目标所在的立方体,只search该立方体中的点。 如果该立方体中的点数less于10个,请检查“邻近”立方体,依此类推。
进一步的思考,这不是一个很好的algorithm:如果你的目标点比一个立方体的墙更接近10点,那么你将不得不search到相邻的立方体。
我会用kd-tree方法去find最近的,然后删除(或标记)最近的节点,然后重新search新的最近的节点。 冲洗并重复。
对于任意两点P1(x1,y1,z1)和P2(x2,y2,z2),如果点之间的距离为d,则以下所有条件必须为真:
 |x1 - x2| <= d |y1 - y2| <= d |z1 - z2| <= d 
在迭代整个集合的同时,保持距离最近的10个距离,但也要保持距离最近的距离。 在计算每个点的距离之前,先使用这三个条件来为自己节省很多复杂的工作。
基本上是前面两个答案的组合。 因为这些点是在一个文件中,所以不需要将它们保存在内存中。 因为您只想检查小于第10个最近点的距离,所以我会使用最大堆,而不是数组或最小堆。 对于数组,您需要将每个新计算的距离与您保存的全部10个距离进行比较。 对于一个分钟的堆,你必须执行3次比较与每个新计算的距离。 使用最大堆时,只有当新计算的距离大于根节点时才执行1次比较。
这个问题需要进一步定义。
1)关于预索引数据的algorithm的决定会有很大的变化,这取决于你是否可以将整个数据保存在内存中。
使用kdtrees和octrees,您不必将数据保存在内存中,并且性能可以从这一事实中获益,不仅因为内存占用更低,而且仅仅是因为您不必读取整个文件。
用暴力,你将不得不阅读整个文件,并重新计算你将要search的每个新点的距离。
不过,这对你来说可能并不重要。
2)另一个因素是你需要多less次search一个点。
正如JF Sebastian所说,即使在大型数据集上,暴力破解的速度也会更快,但要小心考虑到他的基准测量是从磁盘读取整个数据集(一旦kd-tree或八叉树被构build并写入某处,这是不必要的)而且他们只测量一次search。
计算它们中的每一个的距离,并在O(n)时间内做Select(1..10,n)。 这将是我猜想的天真algorithm。