`
扬州老鬼
  • 浏览: 301761 次
  • 性别: Icon_minigender_1
  • 来自: 苏州
社区版块
存档分类
最新评论

Apach Dril的计算流程说明

 
阅读更多
原创,转载请注明出处

花了两天时间,学习了Drill的工作流程,这要感谢Timothy Chen提供Drill关于工作流的文章。

工作流的图(注:该工作流是针对alpha版本的说明):

原文地址:
http://tnachen.wordpress.com/2013/11/05/lifetime-of-a-query-in-drill-alpha-release/

如上图所知,本文所述的drill query需要Zookeeper以及Hazelcast ,同时是通过sqlline命令行来进行查询,而DrillBit 是一个运行在集群中某个节点上面的一个Drill 过程。

Client
上图中的SQL query:
SELET _MAP[‘R_NAME’] as name,
_MAP[‘R_REGIONKEY’] as region_key FROM
“hdfs://data/region,parquet” WHERE
Cast(_MAP[‘R_NAME’] as varchar) > ‘M’ LIMIT 3;
该查询语句要查询HDFS上面一份数据文件,WHRER条件中按照表达式过滤数据。

从Client端,查询语句被提交给sqlline,该sqlline只是一个简单用java编写的console,他能够和jdbc driver进行沟通,将SELET语句传递给Optiq。

Drill利用Optiq来解析query并且进行plan。Optiq提供可插入式的转换规则,利用这些规则来讲SQL语句的各个部分映射成为你想要的对象。Optiq内置一个查询优化器,Drill的开发者们利用这个优化器挑选出SQL语句执行的最佳顺序,而这个挑选过程不需要任何查询的统计数据。Drill 开发者们自定义很多Optiq 规则完成对SQL 操作符的对象映射,这里面包括(WHRER LIMIT等等),每一条规则都将SQL query中查询的某个操作符转换为drill能够识别的logical operator。

其中Drill logical plan中logical operator 可参考链接:
https://docs.google.com/document/d/1QTL8warUYS2KjldQrGUse7zp8eA72VKtLOHwfXy6c7I/edit) 我自己也已经对这篇文档进行了翻译。Drill logical plan的唯一的目标就是Drill的数据流的工作流程,而没有做任何的优化,和分布式计算的分发等工作。

一旦client产生了logical plan,那么他会查询其中一个已经配置好的DrillBit的host/port的信息,然后将logical plan传递给DrillBit。

Running Logical Plan

在集群中任何一个DrillBit都能运行一个查询,而执行查询的DrillBit要负责将查询结果返回给client。

当正在监听用户提交请求的UserServer得到logical plan的时候,他会将该logical plan 传递个Forman,该Forman的作用是调优该plan,并且转换为实际执行的计划,并提交该计划的信息。

在Foreman内部,第一个操作就是通过Drill Optimizer将logical plan转换为一个physical plan,在Drill当前的版本(指Alpha版),优化器相当基础,作用仅限于将logical plan转化为一个或者多个physical operator,而并不进行太多的优化。

Physical plan是一个DAG(有向无环图),每一子节点或者父节点之间的关系都指明了数据如何在图中流动。Google’s Dremel paper论文为Drill讲述了如何实现一个MPP(大规模并行计算)的执行树。在这个树中,每一个node都代表一个DrillBit计算过程,他们相互依赖彼此的计算结果。关于这方便的讨论还可以参考:
http://www.quora.com/Cloudera-Impala/How-exactly-does-a-multi-level-execution-tree-improve-Impala-Query-performance

当我们想要将多层级的执行树划分为一个个DrillBit的时候,我我们首先需要搜集每一个physical plan的信息。每一个physical operator 都会根据预先给定的operator的配置信息返回包括Network/Cpu/Memory/Disk and Row size and count的信息,同样physical operator也能够返回一个将要执行该operator的DrilBit列表,该列表称之为endpoint affinities。
举例来说,一个Parquet Scan opreator会在离Parquet 最近的DrillBit上面发起查询,他可以查询Parquet file的元数据信息,该元数据信息保存在HDFS上面,他保存了HDFS的datanode信息,并返回一个最优的endpoint,这个endpoint的首要一个条件是需要已经运行了DrillBit。(类似于计算节点)。

有了physical plan,所有的统计信息(Network/Cpu/Memory/Disk等),以及endpoint affinities,这个时候,Foreman 中的Parallellizer 就会讲physical plan转换为多个fragment。每一个Fragment自身也是一个physical plan,这些个与Fragment对应的physical plan同样会被分配到DrillBit node上面。在任意的一个physical plan中,只有一个root fragment(运行在最初始的DrillBit上面),另外还有多个Leaf Fragment,以及多个中间Fragment,这些中间Fragment的顾名思义就是处理这些Leaf Fragment以及root Fragment的中间结果的运算。需要知道的时候,上面提到了Hazelcast,他的作用就是为了保存这些中间Frament。



Running Fragments

  Root fragment 会被提交给DrillBit上面的Worker manager 。中间fragment 保存在Hazelcast分布式缓存,所有的leaf fragment会直接通过BitCom(RPC层次的东西,协议是Protobuf )发送给其他DrillBits。
   Worker Manager一旦接受到Root Fragment ,就会运行这个plan,并且包含一个Screen Operator ,用来阻塞,并且等待返回的数据。如果该plan需要另外多个DrillBit,这些DrillBit组成一个wire,Worker Manager也同时会包含一个exchange operator,该exchange operator启动了一个Receiver用以等待wire中的数据。
  在wire中,leaf fragment被发送给其他DrillBit并且执行。这些leaf fragment也会被转换成为由physical operator 组成的DAG。每一个Physical operator都会利用一个Pull 类型的消息机制,从树的底部开始,operator会从他的parent operator中pull 记录信息,而他的parent operator 则返回一个Outcome status消息。Operator被设计成能够处理每一个可能outcome status(STOP,OK,OK_WITH_NEW_SCHEMA,NONE),因为Drill支持动态schema,也就是说Drill允许在同一个数据集中schema发生变化,所以Drill要能够处理当schema发生变化时的情况,可以参考columnnar storage(http://the-paper-trail.org/blog/columnar-storage/),Drill同时实现了他自己的内存数据结构,我们称之为ValueVector,ValueVector是一组byte集合,代表了一个column内的数据。在每一个Physical operator pull的消息中会返回一个RecordBatch,一个RecordBatch中包含一个或者多个ValueVector。(一个column会包含一个或者多个ValueVector,同时还有schema信息)。
  在文章的例子中(图中),leaf fragment的顶端是这个Scan operator,该Scan operator被设置成为查询Parquet file,并且通过Parquet storage engine运行。这个Storage engine的作用就是从数据源中拉取数据,把数据转换为ValueVector,然后将这些ValueVector作为RecordBatch传递回他的child 。
  最终,所有的Leaf fragment将会接管这些batch数据,通过Sender operator 发送给中间DrillBit。
  中间fragment 一旦第一次接受到一个RecordBatch,会从HazleCast中通过RecordBatch中保留的fragment id查询相应的fragment,并且设置Receiver以及必要的physical operator来继续在DrillBit中进行处理计算。
  中间Fragment包含一个Filtering operator,在这个Filtering operator内部,一旦他接收到一个RecordBatch,他就会查找新的schema,并且将schema传递给CodeGeneration,同时还会传递一个特殊定义的filter expression,type information,借此产生一段特殊的code来完成filter 操作。通过设计成避免casting,运行轻量级的loop,以及进行prefetching,来减少方法的调用,这种方式在Hive的新vectoried query engine(通过Stinger initiative)以及impala中很普遍。
  中间fragment最终会议batch为单元,一次发送一个batch给Root DrillBit,在Root DrillBit中会由Screen operator 来接收相关数据,并且返回给client。
  DrillClient接收RecordBatch,简单讲ValueVector转换成Rows并且显示给client。
  以上说明过程,只是在Drill alpha版本中初步定义的工作流过程,Drill开发者们会在后续的开发测试中持续验证改进这个过程,并且会提供更多operator,以及storage engine,以便方便用户进行实时数据分析查询。
  • 大小: 154.7 KB
1
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics