DataFrame连接优化 – 广播散列连接

我试图有效地join两个dataframe,其中一个是大的,第二个是小一点。

有没有办法避免所有这些洗牌? 我不能设置autoBroadCastJoinThreshold ,因为它只支持整数 – 而我试图广播的表略大于整数字节数。

有没有办法强制广播忽略这个variables?

广播哈希联接(类似MapReduce中的地图边连接或地图边组合):

在SparkSQL中,您可以通过调用queryExecution.executedPlan来查看正在执行的连接的types。 与核心Spark一样,如果其中一个表比另一个小得多,则可能需要广播散列连接。 您可以向Spark SQL提示:给定的DF应该在join之前通过在DataFrame上调用方法broadcast来join

例如: largedataframe.join(broadcast(smalldataframe), "key")

在DWH术语中,大数据框可能与实际情况相似,小数据框可能与维度相似

正如我的collections书(HPS)所述,请参阅下面有更好的理解.. 在这里输入图像描述

注意:上面的broadcast是从import org.apache.spark.sql.functions.broadcast而不是从SparkContext

Spark也会自动使用spark.sql.conf.autoBroadcastJoinThreshold来确定是否应该广播一个表。

提示:请参阅DataFrame.explain()方法

 def explain(): Unit Prints the physical plan to the console for debugging purposes. 

有没有办法强制广播忽略这个variables?

sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")

您可以使用left.join(broadcast(right), ...)来提示要广播的dataframeleft.join(broadcast(right), ...)

这是火花的当前局限性,请参阅SPARK-6235 。 2GB限制也适用于广播variables。

你确定没有其他的好办法,例如不同的分区?

否则,您可以通过手动创build多个广播variables(每个<2GB)来解决这个问题。

设置spark.sql.autoBroadcastJoinThreshold = -1将完全禁用广播。 请参阅“ Spark SQL,数据框和数据集指南”中的其他configuration选项 。

我发现这个代码适用于Spark 2.11版本2.0.0中的广播连接。

 import org.apache.spark.sql.functions.broadcast val employeesDF = employeesRDD.toDF val departmentsDF = departmentsRDD.toDF // materializing the department data val tmpDepartments = broadcast(departmentsDF.as("departments")) import context.implicits._ employeesDF.join(broadcast(tmpDepartments), $"depId" === $"id", // join by employees.depID == departments.id "inner").show() 

这里是上述代码的参考Henning Kropp博客,与Sparkjoin广播