用户
 找回密码
 立即注册

QQ登录

只需一步,快速开始

搜索
查看: 1863|回复: 0

第一篇 Spark SQL源码分析之核心流程

[复制链接]

394

主题

412

帖子

2065

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
2065

活跃会员热心会员推广达人宣传达人灌水之王突出贡献优秀版主荣誉管理论坛元老

发表于 2018-1-3 13:41:49 | 显示全部楼层 |阅读模式
----------------------------------------------------------
Spark SQL源码分析系列文章
    自从去年Spark Submit 2013 Michael Armbrust分享了他的Catalyst,到至今1年多了,Spark SQL的贡献者从几人到了几十人,而且发展速度异常迅猛,究其原因,个人认为有以下2点:
    1、整合:将SQL类型的查询语言整合到 Spark 的核心RDD概念里。这样可以应用于多种任务,流处理,批处理,包括机器学习里都可以引入Sql。
    2、效率:因为Shark受到hive的编程模型限制,无法再继续优化来适应Spark模型里。
    前一段时间测试过Shark,并且对Spark SQL也进行了一些测试,但是还是忍不住对Spark SQL一探究竟,就从源代码的角度来看一下Spark SQL的核心执行流程吧。

一、引子先来看一段简单的Spark SQL程序:
[Java] 纯文本查看 复制代码
1. val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
2. import sqlContext._ 
3.case class Person(name: String, age: Int) 
4.val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)) 
5.people.registerAsTable("people") 
6.val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") 
7.teenagers.map(t => "Name: " + t(0)).collect().foreach(println) 



程序前两句1和2生成SQLContext,导入sqlContext下面的all,也就是运行SparkSQL的上下文环境。
程序3,4两句是加载数据源注册table
第6句是真正的入口,是sql函数,传入一句sql,先会返回一个SchemaRDD。这一步是lazy的,直到第七句的collect这个action执行时,sql才会执行。



二、SQLCOntextSQLContext是执行SQL的上下文对象,首先来看一下它Hold的有哪些成员:
Catalog  
一个存储<tableName,logicalPlan>的map结构,查找关系的目录,注册表,注销表,查询表和逻辑计划关系的类。
1.png
SqlParser
Parse 传入的sql来对语法分词,构建语法树,返回一个logical plan
2.png
Analyzer
  logical plan的语法分析器
3.png
Optimizer
logical Plan的优化器
4.png
LogicalPlan
逻辑计划,由catalyst的TreeNode组成,可以看到有3种语法树
5.png
SparkPlanner
包含不同策略的优化策略来优化物理执行计划
6.png
QueryExecution
sql执行的环境上下文
7.png
就是这些对象组成了Spark SQL的运行时,看起来很酷,有静态的metadata存储,有分析器、优化器、逻辑计划、物理计划、执行运行时。
那这些对象是怎么相互协作来执行sql语句的呢?

三、Spark SQL执行流程话不多说,先上图,这个图我用一个在线作图工具process on话的,画的不好,图能达意就行:
       8.png

核心组件都是绿色的方框,每一步流程的结果都是蓝色的框框,调用的方法是橙色的框框。

先概括一下,大致的执行流程是:
Parse SQL -> Analyze Logical Plan -> Optimize Logical Plan -> Generate Physical Plan -> Prepareed Spark Plan -> Execute SQL -> Generate RDD

更具体的执行流程:
     sql or hql -> sql parser(parse)生成 unresolved logical plan -> analyzer(analysis)生成analyzed logical plan  -> optimizer(optimize)optimized logical plan -> spark planner(use strategies to plan)生成physical plan -> 采用不同Strategies生成spark plan -> spark plan(prepare) prepared spark plan -> call toRDD(execute()函数调用) 执行sql生成RDD

3.1、Parse SQL 回到开始的程序,我们调用sql函数,其实是SQLContext里的sql函数它的实现是new一个SchemaRDD,在生成的时候就调用parseSql方法了。
[Java] 纯文本查看 复制代码
/** 
* Executes a SQL query using Spark, returning the result as a SchemaRDD. 
* 
* @group userf 
*/ 
def sql(sqlText: String): SchemaRDD = new SchemaRDD(this, parseSql(sqlText)) 


[Java] 纯文本查看 复制代码
 @transient 
protected[sql] val parser = new catalyst.SqlParser 


protected[sql] def parseSql(sql: String): LogicalPlan = parser(sql) 
3.2、Analyze to Execution当我们调用SchemaRDD里面的collect方法时,则会初始化QueryExecution,开始启动执行。
[Java] 纯文本查看 复制代码
override def collect(): Array[Row] = queryExecution.executedPlan.executeCollect() 


我们可以很清晰的看到执行步骤:
[Java] 纯文本查看 复制代码
protected abstract class QueryExecution { 
def logical: LogicalPlan 

lazy val analyzed = analyzer(logical) //首先分析器会分析逻辑计划 
lazy val optimizedPlan = optimizer(analyzed) //随后优化器去优化分析后的逻辑计划 
// TODO: Don't just pick the first one... 
lazy val sparkPlan = planner(optimizedPlan).next() //根据策略生成plan物理计划 
// executedPlan should not be used to initialize any SparkPlan. It should be 
// only used for execution. 
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) //最后生成已经准备好的Spark Plan 

/** Internal version of the RDD. Avoids copies and has no schema */ 
lazy val toRdd: RDD[Row] = executedPlan.execute() //最后调用toRDD方法执行任务将结果转换为RDD 

protected def stringOrError[A](f: => A): String = 
try f.toString catch { case e: Throwable => e.toString } 

def simpleString: String = stringOrError(executedPlan) 

override def toString: String = 
s"""== Logical Plan == 
|${stringOrError(analyzed)} 
|== Optimized Logical Plan == 
|${stringOrError(optimizedPlan)} 
|== Physical Plan == 
|${stringOrError(executedPlan)} 
""".stripMargin.trim 
} 



至此整个流程结束。

  四、总结:  通过分析SQLContext我们知道了Spark SQL都包含了哪些组件,SqlParser,Parser,Analyzer,Optimizer,LogicalPlan,SparkPlanner(包含Physical Plan),QueryExecution.
  通过调试代码,知道了Spark SQL的执行流程:
sql or hql -> sql parser(parse)生成 unresolved logical plan -> analyzer(analysis)生成analyzed logical plan  -> optimizer(optimize)optimized logical plan -> spark planner(use strategies to plan)生成physical plan -> 采用不同Strategies生成spark plan -> spark plan(prepare) prepared spark plan -> call toRDD(execute()函数调用) 执行sql生成RDD
  
  随后还会对里面的每个组件对象进行研究,看看catalyst究竟做了哪些优化。
  
  ——EOF——


原创文章,转载请注明:
本文链接地址:http://blog.csdn.net/oopsoom/article/details/37658021

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐 上一条 /4 下一条

返回顶部