Hive集群vs vs按sorting

据我所理解;

  • 在减速机中只能sorting

  • 命令全球范围内的东西,但一切推进一个减速机

  • 通过关键哈希智能地将东西分配到reducer中,并进行sorting

所以我的问题是确保一个全球秩序的集群吗​​? 通过将相同的密钥分配到相同的减速器中,但是相邻的密钥呢?

我可以在这里find唯一的文件,从这个例子看来,它似乎在全球订购。 但从定义来看,我觉得这并不总是这样。

一个较短的答案:是的, CLUSTER BY保证全球订购,只要你愿意自己join多个输出文件。

较长的版本:

  • ORDER BY x :保证全局sorting,但是通过只用一个reducer来推送所有的数据。 大数据集基本上是不可接受的。 你最终得到一个sorting的文件作为输出。
  • SORT BY x :在每个减速器上命令数据,但是每个减速器可以接收重叠范围的数据。 您最终有N个或更多的重叠范围的sorting文件。
  • DISTRIBUTE BY x :确保N个reducer中的每一个都获得x非重叠范围,但不sorting每个reducer的输出。 您最终会得到N个或没有重叠范围的未sorting文件。
  • CLUSTER BY x :确保N个reducer中的每一个都得到非重叠范围,然后在reducer中按这些范围进行sorting。 这给你全局sorting,就像做( DISTRIBUTE BY xSORT BY x )一样。 您最终有N个或更多的sorting文件与非重叠的范围。

合理? 所以CLUSTER BY基本上是ORDER BY可扩展版本。

首先让我澄清一下:只clustered by将你的密钥分发到不同的桶中进行clustered by ... sorted by桶分类sorting。

通过一个简单的实验(见下文),您可以看到默认情况下您不会获得全局sorting。 原因是默认分区程序使用哈希代码分割键,而不pipe实际的键顺序如何。

但是,您可以完全订购您的数据。

Tom White(第3版,第8章,第274页,总sorting)的动机是“Hadoop:通用指南”,他讨论了TotalOrderPartitioner。

我将首先回答您的TotalOrdering问题,然后介绍几个与sorting相关的Hive实验。

请记住:我在这里描述的是一个“概念validation”,我能够使用Claudera的CDH3发行版处理一个例子。

最初我希望org.apache.hadoop.mapred.lib.TotalOrderPartitioner可以做到这一点。 不幸的是,它并不是因为它看起来像Hive按值分区,而不是键。 所以我补丁(应该有子类,但我没有时间):

更换

 public int getPartition(K key, V value, int numPartitions) { return partitions.findPartition(key); } 

 public int getPartition(K key, V value, int numPartitions) { return partitions.findPartition(value); } 

现在你可以设置(修补)TotalOrderPartitioner作为你的Hive分区器:

 hive> set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner; hive> set total.order.partitioner.natural.order=false hive> set total.order.partitioner.path=/user/yevgen/out_data2 

我也用了

 hive> set hive.enforce.bucketing = true; hive> set mapred.reduce.tasks=4; 

在我的testing中。

文件out_data2告诉TotalOrderPartitioner如何提取值。 您通过对数据进行采样来生成out_data2。 在我的testing中,我使用了4个存储桶和从0到10的密钥。我使用ad-hoc方法生成了out_data2:

 import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.Tool; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.fs.FileSystem; public class TotalPartitioner extends Configured implements Tool{ public static void main(String[] args) throws Exception{ ToolRunner.run(new TotalPartitioner(), args); } @Override public int run(String[] args) throws Exception { Path partFile = new Path("/home/yevgen/out_data2"); FileSystem fs = FileSystem.getLocal(getConf()); HiveKey key = new HiveKey(); NullWritable value = NullWritable.get(); SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf(), partFile, HiveKey.class, NullWritable.class); key.set( new byte[]{1,3}, 0, 2);//partition at 3; 1 came from Hive -- do not know why writer.append(key, value); key.set( new byte[]{1, 6}, 0, 2);//partition at 6 writer.append(key, value); key.set( new byte[]{1, 9}, 0, 2);//partition at 9 writer.append(key, value); writer.close(); return 0; } } 

然后我把结果out_data2复制到HDFS(到/ user / yevgen / out_data2)

有了这些设置,我得到了我的数据bucketed /sorting(见我的实验列表中的最后一项)。

这是我的实验。

  • 创build示例数据

    bash> echo -e“1 \ n3 \ n2 \ n4 \ n5 \ n7 \ n6 \ n8 \ n9 \ n0”> data.txt

  • 创build基本testing表:

    configuration单元>创build表test(x int); configuration单元>加载数据本地inpath'data.txt'到表testing;

基本上这个表格包含从0到9的值,没有顺序。

  • 演示如何表复制工作(真正的mapred.reduce.tasks参数设置MAXIMAL数量的减less任务使用)

    configuration单元>创build表test2(x int);

    hive> set mapred.reduce.tasks = 4;

    configuration单元>插入覆盖表test2selectax从testing一个连接testingb在ax = bx; – 愚蠢的join强制非平凡的地图 – 减less

    bash> hadoop fs -cat / user / hive / warehouse / test2 / 000001_0

    1

    9

  • 展示bucketing。 你可以看到密钥随机分配,没有任何sorting顺序:

    configuration单元>创build表test3(x int)由(x)聚类成4个桶;

    hive> set hive.enforce.bucketing = true;

    configuration单元>插入覆盖表test3 select * from test;

    bash> hadoop fs -cat / user / hive / warehouse / test3 / 000000_0

    4

    8

    0

  • 分类sorting。 结果部分sorting,不完全sorting

    configuration单元>创build表test4(x int)聚类(x)sorting(x desc)为4个桶;

    configuration单元>插入覆盖表test4select*从testing;

    bash> hadoop fs -cat / user / hive / warehouse / test4 / 000001_0

    1

    9

您可以看到值按升序sorting。 看起来像CDH3中的Hive bug?

  • 不通过语句获得部分sorting:

    configuration单元>创build表test5作为selectx从testing分布由xsortingx desc;

    bash> hadoop fs -cat / user / hive / warehouse / test5 / 000001_0

    9

    1

  • 使用我的补丁TotalOrderParitioner:

    hive> set hive.mapred.partitioner = org.apache.hadoop.mapred.lib.TotalOrderPartitioner;

    configuration单元>设置total.order.partitioner.natural.order = false

    hive> set total.order.partitioner.path = / user / training / out_data2

    configuration单元>创build表test6(x int)由(x)聚类到(x)到4个桶中;

    configuration单元>插入覆盖表test6select*从testing;

    bash> hadoop fs -cat / user / hive / warehouse / test6 / 000000_0

    1

    2

    0

    bash> hadoop fs -cat / user / hive / warehouse / test6 / 000001_0

    3

    4

    bash> hadoop fs -cat / user / hive / warehouse / test6 / 000002_0

    7

    6

    8

    bash> hadoop fs -cat / user / hive / warehouse / test6 / 000003_0

    9

据我所知,简短的答案是否定的。你会得到重叠的范围。

从SortBy文档 :“Cluster By是分发和sorting的捷径”。 “具有相同”分配依据“列的所有行将转到同一个缩减器。” 但是没有信息保证非重叠范围的分配。

此外,从DDL BucketedTables文档 :“Hive如何在桶中分配行?通常,桶数由hash_function(bucketing_column)mod num_bucketsexpression式确定。 我猜想,Select语句中的Cluster使用相同的原理来在reducer之间分配行,因为它主要用于使用数据填充bucketed表。

我创build了一个int int列“a”,并在那里插入了从0到9的数字。

然后我把减速器的数量set mapred.reduce.tasks = 2;为2 set mapred.reduce.tasks = 2;

并且从这个表中select数据用Cluster by子句select * from my_tab cluster by a;

并收到我期待的结果:

0 2 4 6 8 1 3 5 7 9

所以第一个减速器(编号0)得到偶数(因为他们的模式2给0)

和第二个减速器(1号)得到奇数(因为他们的模式2给1)

所以这就是“分配方式”的作品。

然后“sorting”sorting每个减速器内的结果。

CLUSTER BY不会产生全局sorting。

(Lars Yencken)接受的答案误导了减员会得到不重叠的范围。 正如Anton Zaviriukhin正确地指出了BucketedTables文档,CLUSTER BY基本上是DISTRIBUTE BY(与bucketing相同),加上每个bucket / reducer中的SORT BY。 而DISTRIBUTE BY只是散列和模块到桶中,而散列函数可以保持顺序(如果i> j,则散列i>散列),散列值的模式不会。

这是一个更好的示例,显示重叠范围

http://myitlearnings.com/bucketing-in-hive/

按每个reducersorting不是全局的。 在许多书中也提到了不正确或混淆。 在将各部门分配到特定的缩减器中,然后根据各部门的员工名称进行分类的情况下,特别有用,并且不关心使用集群的顺序,而且由于工作负载在分配器之间分配更加有效。