Apache Pig:FLATTEN和并行执行的reducer

我已经实现了一个Apache Pig脚本。 当我执行脚本时,会导致许多映射器执行特定步骤,但是该步骤只有一个缩减器。 由于这种情况(许多映射器,一个reducer),Hadoop集群在单个reducer执行时几乎空闲。 为了更好地使用集群的资源,我希望还有许多并行运行的减速器。

即使我使用SET DEFAULT_PARALLEL命令在Pig脚本中设置了并行性,我仍然导致只有一个reducer。

发出问题的代码部分如下:

SET DEFAULT_PARALLEL 5; inputData = LOAD 'input_data.txt' AS (group_name:chararray, item:int); inputDataGrouped = GROUP inputData BY (group_name); -- The GeneratePairsUDF generates a bag containing pairs of integers, eg {(1, 5), (1, 8), ..., (8, 5)} pairs = FOREACH inputDataGrouped GENERATE GeneratePairsUDF(inputData.item) AS pairs_bag; pairsFlat = FOREACH pairs GENERATE FLATTEN(pairs_bag) AS (item1:int, item2:int); 

'inputData'和'inputDataGrouped'别名在映射器中计算。

减速机中的“双”和“双平”。

如果我通过使用FLATTEN命令(pairsFlat = FOREACH对生成FLATTEN(pairs_bag)AS(item1:int,item2:int);)删除行来更改脚本,则执行结果为5个reducer(并行执行) 。

FLATTEN命令似乎是问题,并且避免了许多减速器的创build。

有谁知道我能达到与FLATTEN相同的结果,但脚本并行执行(与许多reducer)?

非常感谢您的支持!

最好的问候,基督徒

编辑:

有两个FOREACH时解释计划(如上):

 Map Plan inputDataGrouped: Local Rearrange[tuple]{chararray}(false) - scope-32 | | | Project[chararray][0] - scope-33 | |---inputData: New For Each(false,false)[bag] - scope-29 | | | Cast[chararray] - scope-24 | | | |---Project[bytearray][0] - scope-23 | | | Cast[int] - scope-27 | | | |---Project[bytearray][1] - scope-26 | |---inputData: Load(file:///input_data.txt:org.apache.pig.builtin.PigStorage) - scope-22-------- Reduce Plan pairsFlat: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-42 | |---pairsFlat: New For Each(true)[bag] - scope-41 | | | Project[bag][0] - scope-39 | |---pairs: New For Each(false)[bag] - scope-38 | | | POUserFunc(GeneratePairsUDF)[bag] - scope-36 | | | |---Project[bag][1] - scope-35 | | | |---Project[bag][1] - scope-34 | |---inputDataGrouped: Package[tuple]{chararray} - scope-31-------- Global sort: false 

EXPLAIN计划何时只有一个包含UDF的FLATTEN FOREACH:

 Map Plan inputDataGrouped: Local Rearrange[tuple]{chararray}(false) - scope-29 | | | Project[chararray][0] - scope-30 | |---inputData: New For Each(false,false)[bag] - scope-26 | | | Cast[chararray] - scope-21 | | | |---Project[bytearray][0] - scope-20 | | | Cast[int] - scope-24 | | | |---Project[bytearray][1] - scope-23 | |---inputData: Load(file:///input_data.txt:org.apache.pig.builtin.PigStorage) - scope-19-------- Reduce Plan pairs: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36 | |---pairs: New For Each(true)[bag] - scope-35 | | | POUserFunc(GeneratePairsUDF)[bag] - scope-33 | | | |---Project[bag][1] - scope-32 | | | |---Project[bag][1] - scope-31 | |---inputDataGrouped: Package[tuple]{chararray} - scope-28-------- Global sort: false 

如果猪在猪脚本的每个步骤中使用configurationDEFAULT_PARALLEL值,那么没有保证。 尝试PARALLEL与您感觉需要时间的特定join/组步骤(在您的情况GROUP步骤)。

  inputDataGrouped = GROUP inputData BY (group_name) PARALLEL 67; 

如果仍然无法正常工作,那么您可能需要查看您的数据是否存在偏斜问题。

我认为数据存在偏差。 只有less数的制图者正在产生指数级的大规模产出。 查看数据中密钥的分布情况。 类似的数据包含less数logging数量较多的组。

我试过“设置默认并行”和“并行100”,但没有运气。 猪仍然使用1减速机。

事实certificate,我必须为每个logging生成一个从1到100的随机数,并用这个随机数对这些logging进行分组。

我们在分组上浪费时间,但对我来说要快得多,因为现在我可以使用更多的减速器。

这里是代码(提交者是我自己的UDF):

 tmpRecord = FOREACH record GENERATE (int)(RANDOM()*100.0) as rnd, data; groupTmpRecord = GROUP tmpRecord BY rnd; result = FOREACH groupTmpRecord GENERATE FLATTEN(SUBMITTER(tmpRecord)); 

为了回答你的问题,我们必须首先知道有多less个减速器猪执行全球重新布置过程。 因为根据我的理解,生成/投影不应该只需要一个简化器。 我不能说Flatten同样的事情。 但是我们从常识中知道,在拼合期间,目的是从包中去掉元组,反之亦然。 要做到这一点,属于一个包的所有元组都应该在同一个reducer中可用。 我可能错了。 但任何人都可以在这里添加一些东西给这个用户一个答案吗?