用户
 找回密码
 立即注册

QQ登录

只需一步,快速开始

搜索
查看: 1531|回复: 0

第七篇 Spark SQL 源码分析之Physical Plan 到 RDD的具体实现

[复制链接]

394

主题

412

帖子

2065

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
2065

活跃会员热心会员推广达人宣传达人灌水之王突出贡献优秀版主荣誉管理论坛元老

发表于 2018-1-6 21:44:16 | 显示全部楼层 |阅读模式
----------------------------------------------------------
Spark SQL源码分析系列文章
  接上一篇文章Spark SQL Catalyst源码分析之Physical Plan,本文将介绍Physical Plan的toRDD的具体实现细节:
  我们都知道一段sql,真正的执行是当你调用它的collect()方法才会执行Spark Job,最后计算得到RDD。
[Java] 纯文本查看 复制代码
lazy val toRdd: RDD[Row] = executedPlan.execute()


  Spark Plan基本包含4种操作类型,即BasicOperator基本类型,还有就是Join、Aggregate和Sort这种稍复杂的。
  如图:
   1.png
一、BasicOperator1.1、Project  Project 的大致含义是:传入一系列表达式Seq[NamedExpression],给定输入的Row,经过Convert(Expression的计算eval)操作,生成一个新的Row。
  Project的实现是调用其child.execute()方法,然后调用mapPartitions对每一个Partition进行操作。
  这个f函数其实是new了一个MutableProjection,然后循环的对每个partition进行Convert。
[Java] 纯文本查看 复制代码
case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {  
  override def output = projectList.map(_.toAttribute)  
  override def execute() = child.execute().mapPartitions { iter => //对每个分区进行f映射  
    @transient val reusableProjection = new MutableProjection(projectList)   
    iter.map(reusableProjection)  
  }  
}  


  通过观察MutableProjection的定义,可以发现,就是bind references to a schema 和 eval的过程:
  将一个Row转换为另一个已经定义好schema column的Row。
  如果输入的Row已经有Schema了,则传入的Seq[Expression]也会bound到当前的Schema。
[Java] 纯文本查看 复制代码
case class MutableProjection(expressions: Seq[Expression]) extends (Row => Row) {  
  def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =  
    this(expressions.map(BindReferences.bindReference(_, inputSchema))) //bound schema  
  
  private[this] val exprArray = expressions.toArray  
  private[this] val mutableRow = new GenericMutableRow(exprArray.size) //新的Row  
  def currentValue: Row = mutableRow  
  def apply(input: Row): Row = {  
    var i = 0  
    while (i < exprArray.length) {  
      mutableRow(i) = exprArray(i).eval(input)  //根据输入的input,即一个Row,计算生成的Row  
      i += 1  
    }  
    mutableRow //返回新的Row  
  }  
}  


1.2、Filter Filter的具体实现是传入的condition进行对input row的eval计算,最后返回的是一个Boolean类型,
如果表达式计算成功,返回true,则这个分区的这条数据就会保存下来,否则会过滤掉。
[Java] 纯文本查看 复制代码
case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {  
  override def output = child.output  
  
  override def execute() = child.execute().mapPartitions { iter =>  
    iter.filter(condition.eval(_).asInstanceOf[Boolean]) //计算表达式 eval(input row)  
  }  
}  

1.3、Sample  Sample取样操作其实是调用了child.execute()的结果后,返回的是一个RDD,对这个RDD调用其sample函数,原生方法。
[Java] 纯文本查看 复制代码
case class Sample(fraction: Double, withReplacement: Boolean, seed: Long, child: SparkPlan)  
  extends UnaryNode  
{  
  override def output = child.output  
  
  // TODO: How to pick seed?  
  override def execute() = child.execute().sample(withReplacement, fraction, seed)  
}  

1.4、Union
  Union操作支持多个子查询的Union,所以传入的child是一个Seq[SparkPlan]
  execute()方法的实现是对其所有的children,每一个进行execute(),即select查询的结果集合RDD。
  通过调用SparkContext的union方法,将所有子查询的结果合并起来。
[Java] 纯文本查看 复制代码
case class Union(children: Seq[SparkPlan])(@transient sqlContext: SQLContext) extends SparkPlan {  
  // TODO: attributes output by union should be distinct for nullability purposes  
  override def output = children.head.output  
  override def execute() = sqlContext.sparkContext.union(children.map(_.execute())) //子查询的结果进行union  
  
  override def otherCopyArgs = sqlContext :: Nil  
}  

1.5、Limit  Limit操作在RDD的原生API里也有,即take().
  但是Limit的实现分2种情况:
  第一种是 limit作为结尾的操作符,即select xxx from yyy limit zzz。 并且是被executeCollect调用,则直接在driver里使用take方法。
  第二种是 limit不是作为结尾的操作符,即limit后面还有查询,那么就在每个分区调用limit,最后repartition到一个分区来计算global limit.
[Java] 纯文本查看 复制代码
case class Limit(limit: Int, child: SparkPlan)(@transient sqlContext: SQLContext)  
  extends UnaryNode {  
  // TODO: Implement a partition local limit, and use a strategy to generate the proper limit plan:  
  // partition local limit -> exchange into one partition -> partition local limit again  
  
  override def otherCopyArgs = sqlContext :: Nil  
  
  override def output = child.output  
  
  override def executeCollect() = child.execute().map(_.copy()).take(limit) //直接在driver调用take  
  
  override def execute() = {  
    val rdd = child.execute().mapPartitions { iter =>  
      val mutablePair = new MutablePair[Boolean, Row]()  
      iter.take(limit).map(row => mutablePair.update(false, row)) //每个分区先计算limit  
    }  
    val part = new HashPartitioner(1)  
    val shuffled = new ShuffledRDD[Boolean, Row, Row, MutablePair[Boolean, Row]](rdd, part) //需要shuffle,来repartition  
    shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))  
    shuffled.mapPartitions(_.take(limit).map(_._2)) //最后单独一个partition来take limit  
  }  
}  


1.6、TakeOrdered
  TakeOrdered是经过排序后的limit N,一般是用在sort by 操作符后的limit。
  可以简单理解为TopN操作符。
[Java] 纯文本查看 复制代码
case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)  
                      (@transient sqlContext: SQLContext) extends UnaryNode {  
  override def otherCopyArgs = sqlContext :: Nil  
  
  override def output = child.output  
  
  @transient  
  lazy val ordering = new RowOrdering(sortOrder) //这里是通过RowOrdering来实现排序的  
  
  override def executeCollect() = child.execute().map(_.copy()).takeOrdered(limit)(ordering)  
  
  // TODO: Terminal split should be implemented differently from non-terminal split.  
  // TODO: Pick num splits based on |limit|.  
  override def execute() = sqlContext.sparkContext.makeRDD(executeCollect(), 1)  
}  

1.7、Sort
  Sort也是通过RowOrdering这个类来实现排序的,child.execute()对每个分区进行map,每个分区根据RowOrdering的order来进行排序,生成一个新的有序集合。
  也是通过调用Spark RDD的sorted方法来实现的。
[Java] 纯文本查看 复制代码
case class Sort(  
    sortOrder: Seq[SortOrder],  
    global: Boolean,  
    child: SparkPlan)  
  extends UnaryNode {  
  override def requiredChildDistribution =  
    if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil  
  
  @transient  
  lazy val ordering = new RowOrdering(sortOrder) //排序顺序  
  
  override def execute() = attachTree(this, "sort") {  
    // TODO: Optimize sorting operation?  
    child.execute()  
      .mapPartitions(  
        iterator => iterator.map(_.copy()).toArray.sorted(ordering).iterator, //每个分区调用sorted方法,传入<span style="font-family: Arial, Helvetica, sans-serif;">ordering排序规则,进行排序</span>  
        preservesPartitioning = true)  
  }  
  
  override def output = child.output  
}  

1.8、ExistingRddExistingRdd是

[Java] 纯文本查看 复制代码
object ExistingRdd {  
  def convertToCatalyst(a: Any): Any = a match {  
    case o: Option[_] => o.orNull  
    case s: Seq[Any] => s.map(convertToCatalyst)  
    case p: Product => new GenericRow(p.productIterator.map(convertToCatalyst).toArray)  
    case other => other  
  }  
  
  def productToRowRdd[A <: Product](data: RDD[A]): RDD[Row] = {  
    data.mapPartitions { iterator =>  
      if (iterator.isEmpty) {  
        Iterator.empty  
      } else {  
        val bufferedIterator = iterator.buffered  
        val mutableRow = new GenericMutableRow(bufferedIterator.head.productArity)  
  
        bufferedIterator.map { r =>  
          var i = 0  
          while (i < mutableRow.length) {  
            mutableRow(i) = convertToCatalyst(r.productElement(i))  
            i += 1  
          }  
  
          mutableRow  
        }  
      }  
    }  
  }  
  
  def fromProductRdd[A <: Product : TypeTag](productRdd: RDD[A]) = {  
    ExistingRdd(ScalaReflection.attributesFor[A], productToRowRdd(productRdd))  
  }  
}  


二、 Join Related Operators  HashJoin:  在讲解Join Related Operator之前,有必要了解一下HashJoin这个位于execution包下的joins.scala文件里的trait。
  Join操作主要包含BroadcastHashJoin、LeftSemiJoinHash、ShuffledHashJoin均实现了HashJoin这个trait.
  主要类图如下:
   2.jpeg

  HashJoin这个trait的主要成员有:
  buildSide是左连接还是右连接,有一种基准的意思。
  leftKeys是左孩子的expressions, rightKeys是右孩子的expressions。
  left是左孩子物理计划,right是右孩子物理计划。
  buildSideKeyGenerator是一个Projection是根据传入的Row对象来计算buildSide的Expression的。
  streamSideKeyGenerator是一个MutableProjection是根据传入的Row对象来计算streamSide的Expression的。
  这里buildSide如果是left的话,可以理解为buildSide是左表,那么去连接这个左表的右表就是streamSide。
   3.jpeg
  HashJoin关键的操作是joinIterators,简单来说就是join两个表,把每个表看着Iterators[Row].
  方式:
  1、首先遍历buildSide,计算buildKeys然后利用一个HashMap,形成 (buildKeys, Iterators[Row])的格式。
  2、遍历StreamedSide,计算streamedKey,去HashMap里面去匹配key,来进行join
  3、最后生成一个joinRow,这个将2个row对接。
  见代码注释:
[Java] 纯文本查看 复制代码
trait HashJoin {  
  val leftKeys: Seq[Expression]  
  val rightKeys: Seq[Expression]  
  val buildSide: BuildSide  
  val left: SparkPlan  
  val right: SparkPlan  
  
  lazy val (buildPlan, streamedPlan) = buildSide match {  //模式匹配,将physical plan封装形成Tuple2,如果是buildLeft,那么就是(left,right),否则是(right,left)  
    case BuildLeft => (left, right)  
    case BuildRight => (right, left)  
  }  
  
  lazy val (buildKeys, streamedKeys) = buildSide match { //模式匹配,将expression进行封装<span style="font-family: Arial, Helvetica, sans-serif;">Tuple2</span>  
  
    case BuildLeft => (leftKeys, rightKeys)  
    case BuildRight => (rightKeys, leftKeys)  
  }  
  
  def output = left.output ++ right.output  
  
  @transient lazy val buildSideKeyGenerator = new Projection(buildKeys, buildPlan.output) //生成buildSideKey来根据Expression来计算Row返回结果  
  @transient lazy val streamSideKeyGenerator = //<span style="font-family: Arial, Helvetica, sans-serif;">生成</span><span style="font-family: Arial, Helvetica, sans-serif;">streamSideKeyGenerator</span><span style="font-family: Arial, Helvetica, sans-serif;">来根据Expression来计算Row返回结果</span>  
    () => new MutableProjection(streamedKeys, streamedPlan.output)  
  
  def joinIterators(buildIter: Iterator[Row], streamIter: Iterator[Row]): Iterator[Row] = { //把build表的Iterator[Row]和streamIterator[Row]进行join操作返回Join后的Iterator[Row]  
    // TODO: Use Spark's HashMap implementation.  
  
    val hashTable = new java.util.HashMap[Row, ArrayBuffer[Row]]() //匹配主要使用HashMap实现  
    var currentRow: Row = null  
  
    // Create a mapping of buildKeys -> rows   
    while (buildIter.hasNext) { //目前只对build Iterator进行迭代,形成rowKey,Rows,类似wordCount,但是这里不是累加Value,而是Row的集合。  
      currentRow = buildIter.next()  
      val rowKey = buildSideKeyGenerator(currentRow) //计算rowKey作为HashMap的key  
      if(!rowKey.anyNull) {  
        val existingMatchList = hashTable.get(rowKey)  
        val matchList = if (existingMatchList == null) {  
          val newMatchList = new ArrayBuffer[Row]()  
          hashTable.put(rowKey, newMatchList) //(rowKey, matchedRowList)  
          newMatchList  
        } else {  
          existingMatchList  
        }  
        matchList += currentRow.copy() //返回matchList  
      }  
    }  
  
    new Iterator[Row] { //最后用streamedRow的Key来匹配buildSide端的HashMap  
      private[this] var currentStreamedRow: Row = _  
      private[this] var currentHashMatches: ArrayBuffer[Row] = _  
      private[this] var currentMatchPosition: Int = -1  
  
      // Mutable per row objects.  
      private[this] val joinRow = new JoinedRow  
  
      private[this] val joinKeys = streamSideKeyGenerator()  
  
      override final def hasNext: Boolean =  
        (currentMatchPosition != -1 && currentMatchPosition < currentHashMatches.size) ||  
          (streamIter.hasNext && fetchNext())  
  
      override final def next() = {  
        val ret = buildSide match {  
          case BuildRight => joinRow(currentStreamedRow, currentHashMatches(currentMatchPosition)) //右连接的话,streamedRow放左边,匹配到的key的Row放到右表  
          case BuildLeft => joinRow(currentHashMatches(currentMatchPosition), currentStreamedRow) //左连接的话,相反。  
        }  
        currentMatchPosition += 1  
        ret  
      }  
  
      /** 
       * Searches the streamed iterator for the next row that has at least one match in hashtable. 
       * 
       * @return true if the search is successful, and false if the streamed iterator runs out of 
       *         tuples. 
       */  
      private final def fetchNext(): Boolean = {  
        currentHashMatches = null  
        currentMatchPosition = -1  
  
        while (currentHashMatches == null && streamIter.hasNext) {  
          currentStreamedRow = streamIter.next()  
          if (!joinKeys(currentStreamedRow).anyNull) {  
            currentHashMatches = hashTable.get(joinKeys.currentValue) //streamedRow从buildSide里的HashTable里面匹配rowKey  
          }  
        }  
  
        if (currentHashMatches == null) {  
          false  
        } else {  
          currentMatchPosition = 0  
          true  
        }  
      }  
    }  
  }  
}  


joinRow的实现,实现2个Row对接:
实际上就是生成一个新的Array,将2个Array合并。
[Java] 纯文本查看 复制代码
class JoinedRow extends Row {  
  private[this] var row1: Row = _  
  private[this] var row2: Row = _  
  .........  
   def copy() = {  
    val totalSize = row1.size + row2.size   
    val copiedValues = new Array[Any](totalSize)  
    var i = 0  
    while(i < totalSize) {  
      copiedValues(i) = apply(i)  
      i += 1  
    }  
    new GenericRow(copiedValues) //返回一个新的合并后的Row  
  }  

2.1、LeftSemiJoinHash left semi join,不多说了,hive早期版本里替代IN和EXISTS 的版本。
将右表的join keys放到HashSet里,然后遍历左表,查找左表的join key是否能匹配。
[Java] 纯文本查看 复制代码
case class LeftSemiJoinHash(  
    leftKeys: Seq[Expression],  
    rightKeys: Seq[Expression],  
    left: SparkPlan,  
    right: SparkPlan) extends BinaryNode with HashJoin {  
  
  val buildSide = BuildRight //buildSide是以右表为基准  
  
  override def requiredChildDistribution =  
    ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil  
  
  override def output = left.output  
  
  def execute() = {  
    buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => //右表的物理计划执行后生成RDD,利用zipPartitions对Partition进行合并。然后用上述方法实现。  
      val hashSet = new java.util.HashSet[Row]()  
      var currentRow: Row = null  
  
      // Create a Hash set of buildKeys  
      while (buildIter.hasNext) {  
        currentRow = buildIter.next()  
        val rowKey = buildSideKeyGenerator(currentRow)  
        if(!rowKey.anyNull) {  
          val keyExists = hashSet.contains(rowKey)  
          if (!keyExists) {  
            hashSet.add(rowKey)  
          }  
        }  
      }  
  
      val joinKeys = streamSideKeyGenerator()  
      streamIter.filter(current => {  
        !joinKeys(current).anyNull && hashSet.contains(joinKeys.currentValue)  
      })  
    }  
  }  
}  

2.2、BroadcastHashJoin 名约: 广播HashJoin,呵呵。
  是InnerHashJoin的实现。这里用到了concurrent并发里的future,异步的广播buildPlan的表执行后的的RDD。
  如果接收到了广播后的表,那么就用streamedPlan来匹配这个广播的表。
  实现是RDD的mapPartitions和HashJoin里的joinIterators最后生成join的结果。
[Java] 纯文本查看 复制代码
case class BroadcastHashJoin(  
     leftKeys: Seq[Expression],  
     rightKeys: Seq[Expression],  
     buildSide: BuildSide,  
     left: SparkPlan,  
     right: SparkPlan)(@transient sqlContext: SQLContext) extends BinaryNode with HashJoin {  
  
  override def otherCopyArgs = sqlContext :: Nil  
  
  override def outputPartitioning: Partitioning = left.outputPartitioning  
  
  override def requiredChildDistribution =  
    UnspecifiedDistribution :: UnspecifiedDistribution :: Nil  
  
  @transient  
  lazy val broadcastFuture = future {  //利用SparkContext广播表  
    sqlContext.sparkContext.broadcast(buildPlan.executeCollect())  
  }  
  
  def execute() = {  
    val broadcastRelation = Await.result(broadcastFuture, 5.minute)  
  
    streamedPlan.execute().mapPartitions { streamedIter =>  
      joinIterators(broadcastRelation.value.iterator, streamedIter) //调用joinIterators对每个分区map  
    }  
  }  
}  

2.3、ShuffleHashJoin
ShuffleHashJoin顾名思义就是需要shuffle数据,outputPartitioning是左孩子的的Partitioning。
会根据这个Partitioning进行shuffle。然后利用SparkContext里的zipPartitions方法对每个分区进行zip。
这里的requiredChildDistribution,的是ClusteredDistribution,这个会在HashPartitioning里面进行匹配。
关于这里面的分区这里不赘述,可以去org.apache.spark.sql.catalyst.plans.physical下的partitioning里面去查看。
[Java] 纯文本查看 复制代码
case class ShuffledHashJoin(  
    leftKeys: Seq[Expression],  
    rightKeys: Seq[Expression],  
    buildSide: BuildSide,  
    left: SparkPlan,  
    right: SparkPlan) extends BinaryNode with HashJoin {  
  
  override def outputPartitioning: Partitioning = left.outputPartitioning  
  
  override def requiredChildDistribution =  
    ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil  
  
  def execute() = {  
    buildPlan.execute().zipPartitions(streamedPlan.execute()) {  
      (buildIter, streamIter) => joinIterators(buildIter, streamIter)  
    }  
  }  
}  


未完待续

原创文章,转载请注明:



回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐 上一条 /4 下一条

返回顶部