Spark2.x:dataframe的join方法详解

Spark join

Spark生态里,join分为Spark SQLjoin和基于dataframejoin,这次我们来谈谈最常用的基于dataframejoin方法详解示例。

spark join 基本要素

Spark支持所有类型的Join[1],包括:

下面分别阐述这几种Join的例子。

数据准备

运行环境

首先创建一下运行的类,本文将使用scalatest测试用例来做。[2]

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructField, StructType}
import org.scalatest.{FlatSpec, Matchers}

class JoinSpec extends FlatSpec with Matchers {
}

基本变量

然后加载成员变量sparksc

lazy val spark: SparkSession = {
  SparkSession
    .builder()
    .master("local")
    .appName("spark test of join")
    .getOrCreate()
}
val sc = spark.sparkContext

日志级别

如果觉得有需要的话,可以调高spark的默认日志级别:

@transient lazy val logger: Logger = Logger.getLogger(getClass)
Logger.getRootLogger.setLevel(Level.INFO)
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)

创建数据

  1. 创建第一张表的数据

cid代表顾客编号(customID)name代表顾客的姓名[2]

1号harbor
2号吴老师(mr.wu)
3号八宝粥(babaozhou)

private val customers = spark.createDataFrame(
  sc.parallelize(Seq(
    Row(1, "harbor"),
    Row(2, "mr.wu"),
    Row(3, "babaozhou")
  )),
  schema = StructType(
    Array[StructField](
      StructField("cid", IntegerType),
      StructField("name", StringType)
    ))
)
  1. 第二张表

    oid代表订单的IDcid代表顾客编号(customID)amount代表此订单的金额[2]

private val orders = spark.createDataFrame(
  sc.parallelize(Seq(
    Row(1, 1, 50.0d),
    Row(2, 2, 10d),
    Row(3, 2, 10d),
    Row(4, 2, 10d),
    Row(5, 1000, 19d)
  )), schema = StructType(
    Array[StructField](
      StructField("oid", IntegerType),
      StructField("cid", IntegerType),
      StructField("amount", DoubleType)
    ))
)

这两张表的数据:

  • 顾客表中,顾客3:babaozhou,没有产生订单;
  • 订单表中,订单5:顾客id1000,在顾客表中没有此人。

示例

inner join

inner join 是我们一般比较熟悉也经常用的一种方式。

inner join

inner join 相当于是要把两个集合中重叠的部分作为结果,要求是此id在左侧出现了,同时也在右侧出现了

"SparkJoin inner" should "collect only matching rows from both sides" in {
  val innerJoinResultDF = orders.join(customers, Seq("cid"), joinType = "inner")
  innerJoinResultDF.show()
  val innerJoinResult = innerJoinResultDF.collect()
  innerJoinResult should (have size 4 and contain allOf(
    // |cid|oid|amount| name|
    Row(1,  1,  50.0d,  "harbor"),
    Row(2,  2,  10d,    "mr.wu"),
    Row(2,  3,  10d,    "mr.wu"),
    Row(2,  4,  10d,    "mr.wu")
  ))
}

cross join

cross join 会生成笛卡尔积,也就是说,左边的每一项和右边的每一项都一一连接,简单来说就是:我们的顾客表有3个人,订单表有5个订单,那么应用 cross join 最终就会有15条数据。

cross join

"SparkJoin cross" should "create a Cartesian Product" in {
  val crossJoinResultDF = orders.crossJoin(customers)
  crossJoinResultDF.show()
  val crossJoinResult = crossJoinResultDF.collect()
  crossJoinResult should (have size 15 and contain allOf(
    // |oid| cid|   amount| cid|  name|
    Row(1,    1,    50.0,   1,    "harbor"),
    Row(1,    1,    50.0,   2,    "mr.wu"),
    Row(1,    1,    50.0,   3,    "babaozhou"),
    Row(2,    2,    10.0,   1,    "harbor"),
    Row(2,    2,    10.0,   2,    "mr.wu"),
    Row(2,    2,    10.0,   3,    "babaozhou"),
    Row(3,    2,    10.0,   1,    "harbor"),
    Row(3,    2,    10.0,   2,    "mr.wu"),
    Row(3,    2,    10.0,   3,    "babaozhou"),
    Row(4,    2,    10.0,   1,    "harbor"),
    Row(4,    2,    10.0,   2,    "mr.wu"),
    Row(4,    2,    10.0,   3,    "babaozhou"),
    Row(5,    1000, 19.0,   1,    "harbor"),
    Row(5,    1000, 19.0,   2,    "mr.wu"),
    Row(5,    1000, 19.0,   3,    "babaozhou")
  ))
}

outer join

spark里面,outer, full, fullouter, full_outer都代表的是outer join。

outer join

outer join 就是两个集合的全集,包括两个集合key对不上的null数据。简单来说就是:相比内连接多了null数据。

"SparkJoin outer" should "full outer join 相比内连接多了null数据" in {
  // "outer", "full", "fullouter", "full_outer"
  val outerJoinResultDF = orders.join(customers, Seq("cid"), joinType = "outer")
  outerJoinResultDF.show()
  val outerJoinResult = outerJoinResultDF.collect()
  outerJoinResult should (have size 6 and contain allOf(
    // | cid|   oid|  amount| name|
    Row(  1,    1,    50.0d,  "harbor"),
    Row(  2,    2,    10d,    "mr.wu"),
    Row(  2,    3,    10d,    "mr.wu"),
    Row(  2,    4,    10d,    "mr.wu"),
    Row(  3,    null, null,   "babaozhou"),
    Row(  1000, 5,    19d,    null)
  ))
}

left join

spark里面,left, leftouter, left_outer都代表的是left join。

left join

left outer join 相比内连接多了左边key不为nullnull数据。

下面代码是 customer join order 的例子:

有一个在订单表中找不到对应订单的人也出现了。

"SparkJoin left customer join order" should "left outer join 相比内连接多了左边key不为null的null数据" in {
  // "leftouter", "left", "left_outer"
  val leftJoinResultDF = customers.join(orders, Seq("cid"), joinType = "left")
  leftJoinResultDF.show()
  val leftJoinResult = leftJoinResultDF.collect()
  leftJoinResult should (have size 5 and contain allOf(
    // |cid| name|        oid|  amount|
    Row(1,   "harbor",    1,    50.0d),
    Row(2,   "mr.wu",     2,    10d),
    Row(2,   "mr.wu",     3,    10d),
    Row(2,   "mr.wu",     4,    10d),
    Row(3,   "babaozhou", null, null)
  ))
}

下面代码是 order join customer 的例子:

有一个在顾客表中找不到对应人的订单也出现了。

"SparkJoin left order join customer" should "left outer join 相比内连接多了左边key不为null的null数据" in {
  // "leftouter", "left", "left_outer"
  val leftJoinResultDF = orders.join(customers, Seq("cid"), joinType = "left")
  leftJoinResultDF.show()
  val leftJoinResult = leftJoinResultDF.collect()
  leftJoinResult should (have size 5 and contain allOf(
    // | cid|   oid| amount|  name|
    Row(  1,    1,   50.0d,   "harbor"),
    Row(  2,    2,   10d,     "mr.wu"),
    Row(  2,    3,   10d,     "mr.wu"),
    Row(  2,    4,   10d,     "mr.wu"),
    Row(  1000, 5,   19d,     null)
  ))
}

right join

rightouter, right, right_outer都指的是同一种join方式。left join 很像,right join 只是顺序颠倒过来,很好理解。

right join

下面代码是 customer join order 的例子:

有一个在顾客表中找不到对应人的订单也出现了。

"SparkJoin right customer join order" should "和left order join customer除了列数据顺序不一样之外其他都一样" in {
  // "rightouter", "right", "right_outer"
  val rightJoinResultDF = customers.join(orders, Seq("cid"), joinType = "right")
  rightJoinResultDF.show()
  val rightJoinResult = rightJoinResultDF.collect()
  rightJoinResult should (have size 5 and contain allOf(
    // | cid|   name|      oid|  amount|
    Row(  1,    "harbor",  1,    50.0d),
    Row(  2,    "mr.wu",   2,    10d),
    Row(  2,    "mr.wu",   3,    10d),
    Row(  2,    "mr.wu",   4,    10d),
    Row(  1000, null,      5,    19d)
  ))
}

下面代码是 order join customer 的例子:

有一个在订单表中找不到对应订单的人也出现了。

"SparkJoin right order join customer" should "和left customer join order除了列数据顺序不一样之外其他都一样" in {
  // "rightouter", "right", "right_outer"
  val rightJoinResultDF = orders.join(customers, Seq("cid"), joinType = "right")
  rightJoinResultDF.show()
  val rightJoinResult = rightJoinResultDF.collect()
  rightJoinResult should (have size 5 and contain allOf(
    // |cid| oid|   amount| name|
    Row(1,    1,    50.0d,  "harbor"),
    Row(2,    2,    10d,    "mr.wu"),
    Row(2,    3,    10d,    "mr.wu"),
    Row(2,    4,    10d,    "mr.wu"),
    Row(3,    null, null,   "babaozhou")
  ))
}

lefi semi join

左半连接,又可以叫leftsemi, left_semi。正如其名字暗示的一样,他只会按照条件和顺序去 join ,然后凭 join 结果只去左边表的列,并不会把两张表合并到一起,相当于右边的表只是作为判定使用,并不会掺和进结果中。

下面代码是 order join customer 的例子:

最终结果不会出现两边的空数据,顾客ID为1000的那个订单5就不会出现在最终的结果里。

"SparkJoin left_semi order join customer" should "和inner除了少了右边独有的列之外其他都一样" in {
  // "leftsemi", "left_semi"
  val leftSemiJoinResultDF = orders.join(customers, Seq("cid"), joinType = "left_semi")
  leftSemiJoinResultDF.show()
  val leftSemiJoinResult = leftSemiJoinResultDF.collect()
  leftSemiJoinResult should (have size 4 and contain allOf(
    // |cid|oid|amount|
    Row(1,  1,  50.0d),
    Row(2,  2,  10d),
    Row(2,  3,  10d),
    Row(2,  4,  10d)
  ))
}

下面代码是 customer join order 的例子:

最终结果不会出现两边的空数据,八宝粥(babaozhou)同学没有产生过订单,那么就不会出现在最终的结果里。

"SparkJoin left_semi custom join order" should "和inner除了少了右边独有的列还有去重了之外其他都一样" in {
  // "leftsemi", "left_semi"
  val leftSemiJoinResultDF = customers.join(orders, Seq("cid"), joinType = "left_semi")
  leftSemiJoinResultDF.show()
  val leftSemiJoinResult = leftSemiJoinResultDF.collect()
  leftSemiJoinResult should (have size 2 and contain allOf(
    // |cid |name  |
    Row(1,  "harbor"),
    Row(2,  "mr.wu")
  ))
}

left anti join

leftantileft_anti是同样的意思,相当于两个集合相减

下面代码是 customer join order 的例子:

最终结果只出现两边的空数据,八宝粥(babaozhou)同学没有产生过订单,那么就会出现在最终的结果里。也就是说:

set(customers) - set(orders)

最终的结果只剩下八宝粥(babaozhou)同学。

"SparkJoin left_anti custom join order" should "和 left_semi custom join order 组合在一起就是完整的 customs 表" in {
  // "leftanti", "left_anti"
  val leftAntiJoinResultDF = customers.join(orders, Seq("cid"), joinType = "left_anti")
  leftAntiJoinResultDF.show()
  val leftAntiJoinResult = leftAntiJoinResultDF.collect()
  leftAntiJoinResult should (have size 1 and contain (
    // |cid|name|
    Row(3,  "babaozhou")
  ))
}

下面代码是 orders join customers 的例子:

最终结果只出现两边的空数据,顾客ID为1000的那个订单5就会出现在最终的结果里。也就是说:

set(orders) - set(customers)

最终的结果只剩下顾客ID为1000的那个订单5

"SparkJoin left_anti order join customer" should "和 left_semi order join customer 组合在一起就是完整的 orders 表" in {
  // "leftanti", "left_anti"
  val leftAntiJoinResultDF = orders.join(customers, Seq("cid"), joinType = "left_anti")
  leftAntiJoinResultDF.show()
  val leftAntiJoinResult = leftAntiJoinResultDF.collect()
  leftAntiJoinResult should (have size 1 and contain (
    // |cid|oid|amount|
    Row(1000,5, 19d)
  ))
}

总结

Spark里面join方法,种类丰富,能满足日常遇到的几乎一切问题。

但是由于其是shuffle操作,是巨大的计算开销,所以在使用的时候也需要关注性能优化,比如:

  1. 设置参数spark.sql.autoBroadcastJoinThreshold的值(默认10M)[1],让小表被广播到每个节点,以降低大表的 I/O 开销,提升性能。
  2. spark默认sort merge join,可以看情况创造条件使用hash join提升性能。

完整的代码可以再这里找到:https://gist.github.com/c0f8a928dfe75aaf85cf9aab3708593a

参考文献


  1. Spark SQL 之 Join 实现[EB/OL]. 守护之鲨, [2019-11-04]. http://sharkdtu.com/posts/spark-sql-join.html.
  2. types in Spark SQL[EB/OL]. [2019-11-04]. https://www.waitingforcode.com/apache-spark-sql/join-types-spark-sql/read.
  3. SQL Left Join Example | Java Tutorial Network[J].

   转载规则


《Spark2.x:dataframe的join方法详解》 Harbor Zeng 采用 知识共享署名 4.0 国际许可协议 进行许可。
 本篇
Spark2.x:dataframe的join方法详解 Spark2.x:dataframe的join方法详解
Spark join在Spark生态里,join分为Spark SQL的join和基于dataframe的join,这次我们来谈谈最常用的基于dataframe的join方法详解示例。 Spark支持所有类型的Join[1],包括: inner join left outer join right outer join full outer join left semi join left a
下一篇 
机器学习基础算法:线性回归,从案例到公式推演深度解析 机器学习基础算法:线性回归,从案例到公式推演深度解析
前言什么是线性回归?线性回归是一种预测模型,基于现有的数据构建参数模型,对未来的数据做出预测。 之所以叫线性,是因为他专门为符合一定线性规则模式的数据设计的,回归(regression)二字,可以理解为拟合的过程,通过一些数学技巧,构建参数模型。回归在英文里本意里有“衰退”的意思,高尔顿当时拟合了父母平均身高 x 和 子女平均身高 y 的经验方程,发现即使父母的身高都极端高,其子女不见得会比父母高
  目录