parsing一个TB的文本,并有效地计算每个单词的出现次数
最近我遇到了一个面试问题,以任何语言创buildalgorithm,应该做以下几点
- 阅读1 TB的内容
- 对该内容中的每个重复单词进行计数
- 列出十个最常出现的单词
你能让我知道最好的方法来创build一个algorithm吗?
编辑:
好的,假设内容是英文的。 我们如何才能find那个内容中最常出现的前10个单词? 我的另一个疑问是,如果故意给予唯一的数据,那么我们的缓冲区将会随着堆栈大小溢出而过期。 我们也需要处理。
面试答案
这个任务很有趣,不需要太复杂,所以很好的开始一个很好的技术讨论。 我计划解决这个任务是:
- 用白色空格和标点符号分隔input数据
- 将发现的每个单词都join到Trie结构中,计数器更新为代表单词最后一个字母的节点
- 遍历完整填充的树来查找计数最高的节点
在采访的背景下,我将通过在纸板或纸上画树来展示Trie的想法。 从空开始,然后根据至less包含一个重复单词的单个句子构build树。 说“猫可以抓老鼠” 。 最后显示如何遍历树,以find最高的计数。 然后,我将certificate这棵树如何提供良好的内存使用,良好的单词查找速度(特别是在自然语言中,许多单词来自彼此),并且适合于并行处理。
在棋盘上画画
演示
下面的C#程序在一个4核心的Xeon W3520上通过75GB的2GB文本,最多只有8个线程。 性能大约每秒4.3百万字 ,但input分析代码不够理想。 用Trie结构存储单词时,内存不是处理自然语言input时的问题。
笔记:
- 从古腾堡项目中获得testing文本
- inputparsing代码假定换行符并且非常不理想
- 去除标点符号和其他非单词不是很好
- 处理一个较大的文件而不是几个较小的文件需要less量的代码来开始读取文件中指定偏移量之间的线程。
using System; using System.Collections.Generic; using System.Collections.Concurrent; using System.IO; using System.Threading; namespace WordCount { class MainClass { public static void Main(string[] args) { Console.WriteLine("Counting words..."); DateTime start_at = DateTime.Now; TrieNode root = new TrieNode(null, '?'); Dictionary<DataReader, Thread> readers = new Dictionary<DataReader, Thread>(); if (args.Length == 0) { args = new string[] { "war-and-peace.txt", "ulysees.txt", "les-miserables.txt", "the-republic.txt", "war-and-peace.txt", "ulysees.txt", "les-miserables.txt", "the-republic.txt" }; } if (args.Length > 0) { foreach (string path in args) { DataReader new_reader = new DataReader(path, ref root); Thread new_thread = new Thread(new_reader.ThreadRun); readers.Add(new_reader, new_thread); new_thread.Start(); } } foreach (Thread t in readers.Values) t.Join(); DateTime stop_at = DateTime.Now; Console.WriteLine("Input data processed in {0} secs", new TimeSpan(stop_at.Ticks - start_at.Ticks).TotalSeconds); Console.WriteLine(); Console.WriteLine("Most commonly found words:"); List<TrieNode> top10_nodes = new List<TrieNode> { root, root, root, root, root, root, root, root, root, root }; int distinct_word_count = 0; int total_word_count = 0; root.GetTopCounts(ref top10_nodes, ref distinct_word_count, ref total_word_count); top10_nodes.Reverse(); foreach (TrieNode node in top10_nodes) { Console.WriteLine("{0} - {1} times", node.ToString(), node.m_word_count); } Console.WriteLine(); Console.WriteLine("{0} words counted", total_word_count); Console.WriteLine("{0} distinct words found", distinct_word_count); Console.WriteLine(); Console.WriteLine("done."); } } #region Input data reader public class DataReader { static int LOOP_COUNT = 1; private TrieNode m_root; private string m_path; public DataReader(string path, ref TrieNode root) { m_root = root; m_path = path; } public void ThreadRun() { for (int i = 0; i < LOOP_COUNT; i++) // fake large data set buy parsing smaller file multiple times { using (FileStream fstream = new FileStream(m_path, FileMode.Open, FileAccess.Read)) { using (StreamReader sreader = new StreamReader(fstream)) { string line; while ((line = sreader.ReadLine()) != null) { string[] chunks = line.Split(null); foreach (string chunk in chunks) { m_root.AddWord(chunk.Trim()); } } } } } } } #endregion #region TRIE implementation public class TrieNode : IComparable<TrieNode> { private char m_char; public int m_word_count; private TrieNode m_parent = null; private ConcurrentDictionary<char, TrieNode> m_children = null; public TrieNode(TrieNode parent, char c) { m_char = c; m_word_count = 0; m_parent = parent; m_children = new ConcurrentDictionary<char, TrieNode>(); } public void AddWord(string word, int index = 0) { if (index < word.Length) { char key = word[index]; if (char.IsLetter(key)) // should do that during parsing but we're just playing here! right? { if (!m_children.ContainsKey(key)) { m_children.TryAdd(key, new TrieNode(this, key)); } m_children[key].AddWord(word, index + 1); } else { // not a letter! retry with next char AddWord(word, index + 1); } } else { if (m_parent != null) // empty words should never be counted { lock (this) { m_word_count++; } } } } public int GetCount(string word, int index = 0) { if (index < word.Length) { char key = word[index]; if (!m_children.ContainsKey(key)) { return -1; } return m_children[key].GetCount(word, index + 1); } else { return m_word_count; } } public void GetTopCounts(ref List<TrieNode> most_counted, ref int distinct_word_count, ref int total_word_count) { if (m_word_count > 0) { distinct_word_count++; total_word_count += m_word_count; } if (m_word_count > most_counted[0].m_word_count) { most_counted[0] = this; most_counted.Sort(); } foreach (char key in m_children.Keys) { m_children[key].GetTopCounts(ref most_counted, ref distinct_word_count, ref total_word_count); } } public override string ToString() { if (m_parent == null) return ""; else return m_parent.ToString() + m_char; } public int CompareTo(TrieNode other) { return this.m_word_count.CompareTo(other.m_word_count); } } #endregion }
这里的输出是从8个线程处理相同的20MB文本100次。
Counting words... Input data processed in 75.2879952 secs Most commonly found words: the - 19364400 times of - 10629600 times and - 10057400 times to - 8121200 times a - 6673600 times in - 5539000 times he - 4113600 times that - 3998000 times was - 3715400 times his - 3623200 times 323618000 words counted 60896 distinct words found
这里的很多东西取决于一些没有被指定的东西。 例如,我们是否试图这样做,或者我们是否正在试图build立一个定期和持续的系统? 我们是否有任何控制input? 我们处理的是全部使用单一语言(例如英语)的文本还是多种语言(如果有的话,还有多less)?
这些事情是因为:
- 如果数据在单个硬盘驱动器上启动,那么并行计数(例如map-reduce)不会有什么实际意义 – 瓶颈将是磁盘的传输速度。 复制到更多的磁盘,所以我们可以更快地计数将比直接从一个磁盘直接计数慢。
- 如果我们正在devise一个系统来定期这样做,我们的重点大部分都是在硬件上 – 特别是,有大量的磁盘并行来增加我们的带宽,并且至less要更紧密地跟上中央处理器。
- 不pipe你阅读的文本有多less,你需要处理的离散词汇的数量是有限的 – 无论你有一个TB的甚至是PB级别的英文文本,你都不会看到像数十亿英语中的不同单词。 做一个快速检查,牛津英语词典列出大约60万字英文。
- 尽pipe语言之间的实际词语明显不同,但每种语言的词语数量大致是不变的,所以我们构build的地图的大小在很大程度上取决于所代表的语言的数量。
这大多留下了可以表示多less种语言的问题。 目前,让我们假设最坏的情况。 ISO 639-2拥有485种人类语言的编码。 我们假设每种语言的平均字数为700,000个字,平均字长为每个字10个字节的UTF-8。
只要存储为简单的线性列表,就意味着我们可以将地球上每种语言的每一个字都存储起来,同时还有一个小于6千兆字节的8字节的频率计数。 如果我们用Patricia trie这样的东西来代替,那么我们或许可以计划至less在某种程度上缩小 – 很可能达到3千兆字节或更less,尽pipe我对所有这些语言都不太了解。
现在,现实情况是,我们几乎可以肯定地高估了那里的一些地方的数字 – 有相当一部分语言分享了相当多的词汇,很多(特别是较老的)语言的词语可能比英语less,并且通过名单,看起来像是包含了一些可能根本没有书面forms。
简介:几乎所有新的桌面/服务器都有足够的内存来完全保存内存 – 而更多的数据不会改变这一点。 对于一个(或几个)并行磁盘,无论如何我们都会受到I / O限制,所以平行计数(或者这样)可能会造成净损失。 在进行其他优化之前,我们可能需要并行数十个磁盘。
你可以尝试一个map-reduce方法来完成这个任务。 map-reduce的优点是可扩展性,所以即使对于1TB,或者10TB或1PB,同样的方法也可以工作,为了修改新规模的algorithm,您不需要做很多工作。 该框架还将负责在群集中的所有机器(和核心)之间分配工作。
首先 – 创build(word,occurances)
对。
这个伪代码就是这样的:
map(document): for each word w: EmitIntermediate(w,"1") reduce(word,list<val>): Emit(word,size(list))
其次,您可以轻松地在top对中findtopK最高的那些, 这个线程解释了这个概念。 主要思想是保留一堆最重要的K元素,同时迭代 – 确保堆总是包含迄今为止所看到的前K个元素。 当你完成 – 堆包含顶部K元素。
更具扩展性(如果机器很less,速度会更慢)另一种方法是使用map-reducesortingfunction,并根据发生的情况对数据进行sorting,只要grep top K.
这件事要注意三件事。
具体来说:文件大到容纳在内存中,单词列表(可能)太大,不能容纳在内存中,对于32位整数,单词计数可能太大。
一旦你通过这些警告,应该是直截了当的。 游戏正在pipe理可能较大的单词列表。
如果它更容易(保持头部转动)。
“你正在运行一个Z-80 8位机,有65K的RAM,有一个1MB的文件…”
同样确切的问题。
这取决于需求,但是如果你能承受一些错误, stream式algorithm和概率数据结构可能是有趣的,因为它们非常节省时间和空间,并且相当简单,例如:
- 如果你只对最常用的n个单词感兴趣,那么重击者(如空间节省)
- 计数 – 分钟草图,以获得任何单词的估计数
这些数据结构只需要很小的恒定空间(确切数量取决于您可以容忍的错误)。
请参阅http://alex.smola.org/teaching/berkeley2012/streams.html ,以获得对这些algorithm的精彩描述。
我很想用DAWG( 维基百科 ,和C#的书面更多的细节 )。 在叶节点上添加一个计数字段是非常简单的,高效的存储器并且在查找方面performance非常好。
编辑:虽然你有没有试过简单地使用一个Dictionary<string, int>
? 其中<string, int
>代表单词和计数? 也许你想尽早优化?
编者按:这篇文章最初链接到这个维基百科文章 ,这似乎是DAWG术语的另一个含义:存储一个单词的所有子string的方式,用于有效的近似string匹配。
一个不同的解决scheme可能是使用SQL表,并让系统尽可能好地处理数据。 首先为集合中的每个单词创build带有单个字段的表格。
然后使用查询(抱歉的语法问题,我的SQL是生锈的 – 这实际上是一个伪代码):
SELECT DISTINCT word, COUNT(*) AS c FROM myTable GROUP BY word ORDER BY c DESC
总的思路是首先生成一个包含所有单词的表(存储在磁盘上),然后使用查询为您计算和sorting(word,occurances)
。 然后你可以从检索的列表中取出最上面的K.
对所有人:如果我确实在SQL语句中有任何语法或其他问题:随意编辑
首先,我只是最近“发现”了Trie的数据结构,ZeFrenchy的回答对我来说是非常好的。
我在评论中看到了几个人如何提高性能提出build议,但这些只是微小的调整,所以我想我会和你分享我发现的真正的瓶颈… ConcurrentDictionary。
我想玩线程本地存储和您的示例给了我一个很好的机会做到这一点,并经过一些小的修改,使用每个线程的字典,然后结合后的Join()字典看到的性能提高〜30% (在8个线程中处理20MB的时间从48秒增加到了33秒)。
代码粘贴在下面,你会注意到从核准的答案没有太多的变化。
PS我没有超过50个声誉点,所以我不能把这个评论。
using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading; namespace WordCount { class MainClass { public static void Main(string[] args) { Console.WriteLine("Counting words..."); DateTime start_at = DateTime.Now; Dictionary<DataReader, Thread> readers = new Dictionary<DataReader, Thread>(); if (args.Length == 0) { args = new string[] { "war-and-peace.txt", "ulysees.txt", "les-miserables.txt", "the-republic.txt", "war-and-peace.txt", "ulysees.txt", "les-miserables.txt", "the-republic.txt" }; } List<ThreadLocal<TrieNode>> roots; if (args.Length == 0) { roots = new List<ThreadLocal<TrieNode>>(1); } else { roots = new List<ThreadLocal<TrieNode>>(args.Length); foreach (string path in args) { ThreadLocal<TrieNode> root = new ThreadLocal<TrieNode>(() => { return new TrieNode(null, '?'); }); roots.Add(root); DataReader new_reader = new DataReader(path, root); Thread new_thread = new Thread(new_reader.ThreadRun); readers.Add(new_reader, new_thread); new_thread.Start(); } } foreach (Thread t in readers.Values) t.Join(); foreach(ThreadLocal<TrieNode> root in roots.Skip(1)) { roots[0].Value.CombineNode(root.Value); root.Dispose(); } DateTime stop_at = DateTime.Now; Console.WriteLine("Input data processed in {0} secs", new TimeSpan(stop_at.Ticks - start_at.Ticks).TotalSeconds); Console.WriteLine(); Console.WriteLine("Most commonly found words:"); List<TrieNode> top10_nodes = new List<TrieNode> { roots[0].Value, roots[0].Value, roots[0].Value, roots[0].Value, roots[0].Value, roots[0].Value, roots[0].Value, roots[0].Value, roots[0].Value, roots[0].Value }; int distinct_word_count = 0; int total_word_count = 0; roots[0].Value.GetTopCounts(top10_nodes, ref distinct_word_count, ref total_word_count); top10_nodes.Reverse(); foreach (TrieNode node in top10_nodes) { Console.WriteLine("{0} - {1} times", node.ToString(), node.m_word_count); } roots[0].Dispose(); Console.WriteLine(); Console.WriteLine("{0} words counted", total_word_count); Console.WriteLine("{0} distinct words found", distinct_word_count); Console.WriteLine(); Console.WriteLine("done."); Console.ReadLine(); } } #region Input data reader public class DataReader { static int LOOP_COUNT = 100; private TrieNode m_root; private string m_path; public DataReader(string path, ThreadLocal<TrieNode> root) { m_root = root.Value; m_path = path; } public void ThreadRun() { for (int i = 0; i < LOOP_COUNT; i++) // fake large data set buy parsing smaller file multiple times { using (FileStream fstream = new FileStream(m_path, FileMode.Open, FileAccess.Read)) using (StreamReader sreader = new StreamReader(fstream)) { string line; while ((line = sreader.ReadLine()) != null) { string[] chunks = line.Split(null); foreach (string chunk in chunks) { m_root.AddWord(chunk.Trim()); } } } } } } #endregion #region TRIE implementation public class TrieNode : IComparable<TrieNode> { private char m_char; public int m_word_count; private TrieNode m_parent = null; private Dictionary<char, TrieNode> m_children = null; public TrieNode(TrieNode parent, char c) { m_char = c; m_word_count = 0; m_parent = parent; m_children = new Dictionary<char, TrieNode>(); } public void CombineNode(TrieNode from) { foreach(KeyValuePair<char, TrieNode> fromChild in from.m_children) { char keyChar = fromChild.Key; if (!m_children.ContainsKey(keyChar)) { m_children.Add(keyChar, new TrieNode(this, keyChar)); } m_children[keyChar].m_word_count += fromChild.Value.m_word_count; m_children[keyChar].CombineNode(fromChild.Value); } } public void AddWord(string word, int index = 0) { if (index < word.Length) { char key = word[index]; if (char.IsLetter(key)) // should do that during parsing but we're just playing here! right? { if (!m_children.ContainsKey(key)) { m_children.Add(key, new TrieNode(this, key)); } m_children[key].AddWord(word, index + 1); } else { // not a letter! retry with next char AddWord(word, index + 1); } } else { if (m_parent != null) // empty words should never be counted { m_word_count++; } } } public int GetCount(string word, int index = 0) { if (index < word.Length) { char key = word[index]; if (!m_children.ContainsKey(key)) { return -1; } return m_children[key].GetCount(word, index + 1); } else { return m_word_count; } } public void GetTopCounts(List<TrieNode> most_counted, ref int distinct_word_count, ref int total_word_count) { if (m_word_count > 0) { distinct_word_count++; total_word_count += m_word_count; } if (m_word_count > most_counted[0].m_word_count) { most_counted[0] = this; most_counted.Sort(); } foreach (char key in m_children.Keys) { m_children[key].GetTopCounts(most_counted, ref distinct_word_count, ref total_word_count); } } public override string ToString() { return BuildString(new StringBuilder()).ToString(); } private StringBuilder BuildString(StringBuilder builder) { if (m_parent == null) { return builder; } else { return m_parent.BuildString(builder).Append(m_char); } } public int CompareTo(TrieNode other) { return this.m_word_count.CompareTo(other.m_word_count); } } #endregion }
作为一个快速的通用algorithm,我会这样做。
Create a map with entries being the count for a specific word and the key being the actual string. for each string in content: if string is a valid key for the map: increment the value associated with that key else add a new key/value pair to the map with the key being the word and the count being one done
那么你可以在地图上find最大的值
create an array size 10 with data pairs of (word, count) for each value in the map if current pair has a count larger than the smallest count in the array replace that pair with the current one print all pairs in array
那么,我个人将文件拆分成不同的大小,比如说128mb,在scannng的时候总是保留两个内存,任何发现的单词都会被添加到一个哈希列表中,然后List列表count,然后迭代列表的名单在最后find前10名
那么第一个想法是以hashtable / Array的formspipe理一个dtabase或者其他什么来保存每个单词的出现,但是根据数据大小,我宁愿:
- 得到发现的第一个10个词,出现> = 2
- 获取这些单词在整个string中出现的次数,并在计数时删除它们
- 重新开始,一旦你有两套10个单词,每个单词都会出现10个单词中最多的单词
- 对于string的其余部分(dosnt包含这些单词)做相同的事情。
你甚至可以尝试更有效率,从1开始find10个单词,如果发现> = 5例如或更多,如果没有发现减less这个值,直到find10个单词。 通过这种方式,你有一个很好的机会避免使用记忆强化保存所有的数据是巨量的话,你可以保存扫描周期(在一个很好的情况下)
但是在最坏的情况下,你可能比传统的algorithm有更多的轮次。
顺便说一下,我将尝试用函数式编程语言而不是面向对象来解决这个问题。
下面的方法将只读取您的数据一次,可以调整内存大小。
- 以1GB的块读取文件
- 对于每个大块制作一个清单,说出5000个最常出现的词与他们的频率
- 根据频率合并列表(1000个列表,每个5000个字)
- 返回合并列表的前10名
理论上你可能会错过单词,我认为这个机会非常小。
风暴是要看的技术。 它将处理器(螺栓)的数据input(喷口)的作用分开。 风暴书的第2章解决你的确切问题,并很好地描述了系统架构 – http://www.amazon.com/Getting-Started-Storm-Jonathan-Leibiusky/dp/1449324010
与Hadoop的批处理相比,Storm更加实时。 如果你的数据是现有的,那么你可以分配负载到不同的喷口,并分散处理到不同的螺栓。
该algorithm还将支持数据增长到TB级以上,因为date将在实时到达时进行分析。
非常有趣的问题。 它比编码更多地涉及逻辑分析。 随着英语和有效句子的假设,它变得更容易。
你不必计算所有的单词,只是长度小于或等于给定语言的平均单词长度(英文是5.1)。 所以你不会有记忆问题。
至于阅读文件,你应该select一个并行模式,阅读块(你select的大小)通过操纵文件位置的空白。 如果你决定读取1MB的数据块,例如除了第一个数据块之外的所有数据块都应该宽一些(左起22个字节,右起22个字节,22表示最长的英文单词 – 如果我是对的)。 对于并行处理,您将需要并行字典或您将合并的本地集合。
请记住,通常情况下,您将以有效停用词列表的一部分作为前十名(这可能是另一种相反的方法,只要该文件的内容是普通的,也是有效的)。
试着想想特殊的数据结构来处理这类问题。 在这种情况下,特殊types的树像特里一样存储string,非常有效。 或者第二种方式来build立自己的解决scheme,如计算单词。 我猜这个TB的数据是英文的,所以我们一般有大约60万字,所以只能存储那些字和计算哪些string将被重复,这个解决scheme将需要正则expression式来消除一些特殊字符。 第一个解决scheme会更快,我很确定。
http://en.wikipedia.org/wiki/Trie
这里是在java中执行轮胎
http://algs4.cs.princeton.edu/52trie/TrieST.java.html
MapReduce的
WordCount可以通过使用hadoop的mapreduce有效地获得。 https://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html#Example%3A+WordCount+v1.0大文件可以通过它parsing。它使用集群中的多个节点来执行这个操作。;
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, one); } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } }