我们如何使用SQL-esque“LIKE”标准来join两个Spark SQL数据框?

我们正在使用与Spark 1.3.1接口的PySpark库。

我们有两个数据框: documents_df := {document_id, document_text}keywords_df := {keyword} 。 我们希望join两个数据框,并使用keyword_df.keyword出现在document_df.document_textstring中的条件返回带有{document_id, keyword}对的结果数据框。

例如,在PostgreSQL中,我们可以使用以下forms的ON子句来实现:

document_df.document_text ilike '%' || keyword_df.keyword || '%'

然而,在PySpark中,我无法获得任何forms的连接语法。 有没有人做过这样的事情?

亲切的问候,

可能有两种不同的方式,但一般来说不推荐。 首先让我们创build一个虚拟数据:

 from pyspark.sql import Row document_row = Row("document_id", "document_text") keyword_row = Row("keyword") documents_df = sc.parallelize([ document_row(1L, "apache spark is the best"), document_row(2L, "erlang rocks"), document_row(3L, "but haskell is better") ]).toDF() keywords_df = sc.parallelize([ keyword_row("erlang"), keyword_row("haskell"), keyword_row("spark") ]).toDF() 
  1. 蜂巢UDFs

     documents_df.registerTempTable("documents") keywords_df.registerTempTable("keywords") query = """SELECT document_id, keyword FROM documents JOIN keywords ON document_text LIKE CONCAT('%', keyword, '%')""" like_with_hive_udf = sqlContext.sql(query) like_with_hive_udf.show() ## +-----------+-------+ ## |document_id|keyword| ## +-----------+-------+ ## | 1| spark| ## | 2| erlang| ## | 3|haskell| ## +-----------+-------+ 
  2. Python UDF

     from pyspark.sql.functions import udf, col from pyspark.sql.types import BooleanType # Of you can replace `in` with a regular expression contains = udf(lambda s, q: q in s, BooleanType()) like_with_python_udf = (documents_df.join(keywords_df) .where(contains(col("document_text"), col("keyword"))) .select(col("document_id"), col("keyword"))) like_with_python_udf.show() ## +-----------+-------+ ## |document_id|keyword| ## +-----------+-------+ ## | 1| spark| ## | 2| erlang| ## | 3|haskell| ## +-----------+-------+ 

为什么不推荐? 因为在这两种情况下,它都需要一个笛卡尔积:

 like_with_hive_udf.explain() ## TungstenProject [document_id#2L,keyword#4] ## Filter document_text#3 LIKE concat(%,keyword#4,%) ## CartesianProduct ## Scan PhysicalRDD[document_id#2L,document_text#3] ## Scan PhysicalRDD[keyword#4] like_with_python_udf.explain() ## TungstenProject [document_id#2L,keyword#4] ## Filter pythonUDF#13 ## !BatchPythonEvaluation PythonUDF#<lambda>(document_text#3,keyword#4), ... ## CartesianProduct ## Scan PhysicalRDD[document_id#2L,document_text#3] ## Scan PhysicalRDD[keyword#4] 

没有一个完整的笛卡儿,还有其他方法可以达到类似的效果。

  1. 在标记文档上join – 如果关键字列表大到可以在单个机器的内存中处理,那就很有用

     from pyspark.ml.feature import Tokenizer from pyspark.sql.functions import explode tokenizer = Tokenizer(inputCol="document_text", outputCol="words") tokenized = (tokenizer.transform(documents_df) .select(col("document_id"), explode(col("words")).alias("token"))) like_with_tokenizer = (tokenized .join(keywords_df, col("token") == col("keyword")) .drop("token")) like_with_tokenizer.show() ## +-----------+-------+ ## |document_id|keyword| ## +-----------+-------+ ## | 3|haskell| ## | 1| spark| ## | 2| erlang| ## +-----------+-------+ 

    这需要洗牌而不是笛卡尔:

     like_with_tokenizer.explain() ## TungstenProject [document_id#2L,keyword#4] ## SortMergeJoin [token#29], [keyword#4] ## TungstenSort [token#29 ASC], false, 0 ## TungstenExchange hashpartitioning(token#29) ## TungstenProject [document_id#2L,token#29] ## !Generate explode(words#27), true, false, [document_id#2L, ... ## ConvertToSafe ## TungstenProject [document_id#2L,UDF(document_text#3) AS words#27] ## Scan PhysicalRDD[document_id#2L,document_text#3] ## TungstenSort [keyword#4 ASC], false, 0 ## TungstenExchange hashpartitioning(keyword#4) ## ConvertToUnsafe ## Scan PhysicalRDD[keyword#4] 
  2. Python UDF和广播variables – 如果关键字列表比较小

     from pyspark.sql.types import ArrayType, StringType keywords = sc.broadcast(set( keywords_df.map(lambda row: row[0]).collect())) bd_contains = udf( lambda s: list(set(s.split()) & keywords.value), ArrayType(StringType())) like_with_bd = (documents_df.select( col("document_id"), explode(bd_contains(col("document_text"))).alias("keyword"))) like_with_bd.show() ## +-----------+-------+ ## |document_id|keyword| ## +-----------+-------+ ## | 1| spark| ## | 2| erlang| ## | 3|haskell| ## +-----------+-------+ 

    它既不需要洗牌,也不需要笛卡尔,但是您仍然需要将广播variables传送给每个工作节点。

     like_with_bd.explain() ## TungstenProject [document_id#2L,keyword#46] ## !Generate explode(pythonUDF#47), true, false, ... ## ConvertToSafe ## TungstenProject [document_id#2L,pythonUDF#47] ## !BatchPythonEvaluation PythonUDF#<lambda>(document_text#3), ... ## Scan PhysicalRDD[document_id#2L,document_text#3] 
  3. 自Spark 1.6.0以来,您可以使用sql.functions.broadcast标记一个小数据框,以获得与上面类似的效果,而不使用UDF和显式广播variables。 重用标记化数据:

     from pyspark.sql.functions import broadcast like_with_tokenizer_and_bd = (broadcast(tokenized) .join(keywords_df, col("token") == col("keyword")) .drop("token")) like_with_tokenizer.explain() ## TungstenProject [document_id#3L,keyword#5] ## BroadcastHashJoin [token#10], [keyword#5], BuildLeft ## TungstenProject [document_id#3L,token#10] ## !Generate explode(words#8), true, false, ... ## ConvertToSafe ## TungstenProject [document_id#3L,UDF(document_text#4) AS words#8] ## Scan PhysicalRDD[document_id#3L,document_text#4] ## ConvertToUnsafe ## Scan PhysicalRDD[keyword#5] 

相关

  • 对于近似匹配,请参阅Apache Spark中的高效string匹配 。