您现在的位置 >> Hadoop教程 >> Hadoop实战 >> 专题  
 

值得一看:Oozie Coordinator使用及详解

【作者:Hadoop实战专家】【关键词:工作流 执行 】 【点击:30766次】【2013-09-0】
一个Coordinator Job包含了在Job外部设置执行周期和频率的语义,类似于在工作流外部增加了一个协调器来管理这些工作流的工作流Job的运行。  

相关热门搜索:

大数据标签:hadoop hdfs hive bigdata

问题导读:
1、什么是Oozie Coordinator ?
2、它与HDFS有什么不同 ?

Oozie所支持工作流,工作流定义通过将多个Hadoop Job的定义按照一定的顺序组织起来,然后作为一个整体按照既定的路径运行。一个工作流已经定义了,通过启动该工作流Job,就会执行该工作流中包含的多个Hadoop Job,直到完成,这就是工作流Job的生命周期。
那么,现在我们有一个工作流Job,希望每天半夜00:00启动运行,我们能够想到的就是通过写一个定时脚本来调度程序运行。如果我们有多个工作流Job,使用crontab的方式调用可能需要编写大量的脚本,还要通过脚本来控制好各个工作流Job的执行时序问题,不但脚本不好维护,而且监控也不方便。基于这样的背景,Oozie提出了Coordinator的概念,他们能够将每个工作流Job作为一个动作(Action)来运行,相当于工作流定义中的一个执行节点(我们可以理解为工作流的工作流),这样就能够将多个工作流Job组织起来,称为Coordinator Job,并指定触发时间和频率,还可以配置数据集、并发数等。一个Coordinator Job包含了在Job外部设置执行周期和频率的语义,类似于在工作流外部增加了一个协调器来管理这些工作流的工作流Job的运行。

运行Coordinator Job

我们先看一下官方发行包自带的一个简单的例子oozie-3.3.2\examples\src\main\apps\cron,它能够实现定时调度一个工作流Job运行,这个例子中给出的一个空的工作流Job,也是为了演示能够使用Coordinator系统给调度起来。这个例子有3个配置文件,我们不修改workflow.xml配置内容。修改后分别如下所示:

* job.properties配置
*

1. nameNode=hdfs://m1:9000

2. jobTracker=m1:19830

3. queueName=default

4. examplesRoot=examples

5. oozie.coord.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/cron

6. start=2014-03-04T19:00Z

7. end=2014-03-06T01:00Z

8. workflowAppUri=${nameNode}/user/${user.name}/${examplesRoot}/apps/cron

复制代码

修改了Hadoop集群的配置,以及调度起止时间范围。

*
workflow.xml配置

*

1. 

2.     

3.     

4. 

5.

复制代码

是一个空Job,没做任何修改。

* coordinator.xml配置
*

1. 

2.      

3.           

4.                ${workflowAppUri}

5.                

6.                     

7.                          jobTracker

8.                          ${jobTracker}

9.                     

10.                     

11.                          nameNode

12.                          ${nameNode}

13.                     

14.                     

15.                          queueName

16.                          ${queueName}

17.                     

18.                

19.           

20.      

21. 

22.

复制代码

修改上述coordinator.xml配置文件,将定时调度频率改为2分钟,然后需要将他们上传到HDFS上:
*

1. hadoop fs -rm /user/shirdrn/examples/apps/cron/coordinator.xml

2. hadoop fs -put /home/shirdrn/cloud/programs/oozie-3.3.2/examples/target/oozie-examples-3.3.2-examples/examples/apps/cron/coordinator.xml /user/shirdrn/examples/apps/cron/

复制代码
因为我之前已经上传过一次,所以修改了coordinator.xml文件配置内容后,一定要上传到HDFS中,而job.properties配置可以通过指定config选项来执行。启动一个Coordinator Job和启动一个Oozie工作流Job类似,执行如下命令即可:
*

1. bin/oozie job -oozie http://oozie-server:11000/oozie -config /home/shirdrn/cloud/programs/oozie-3.3.2/examples/target/oozie-examples-3.3.2-examples/examples/apps/cron/job.properties -run

复制代码

运行上面命令,在控制台上会返回这个Job的ID,我们也可以通过Oozie的Web控制台来查看:

* Coordinator Job状态
*

oozie-coordinator-1024x528.png (164.3 KB, 下载次数: 1)

  

2014-5-14 10:47 上传

* Coordinator Job详情
*

oozie-coordinator-details.png (97.15 KB, 下载次数: 1)

  

2014-5-14 10:46 上传

* 如果想要杀掉一个Job,需要指定Oozie的Job ID,可以执行如下命令:
*

1. bin/oozie job -oozie http://oozie-server:11000/oozie -kill 0000065-140302210847342-oozie-shir-C

复制代码

Coordinator应用(Coordinator Application)

Coordinator应用是指当满足一定条件时,会触发Oozie工作流Job(在Coordinator中将工作流Job定义为一个动作(Action))。其中,触发条件可以是一个时间频率、一个dataset实例是否可用,或者可能是外部的其他事件。
Coordinator Job是一个Coordinator应用的运行实例,这个Coordinator Job是在Oozie提供的Coordinator引擎上运行的,并且这个实例从指定的时间开始,直到运行结束。一个Coordinator Job具有以上几个状态:

* PREP
* RUNNING
* RUNNINGWITHERROR
* PREPSUSPENDED
* SUSPENDED
* SUSPENDEDWITHERROR
* PREPPAUSED
* PAUSED
* PAUSEDWITHERROR
* SUCCEEDED
* DONEWITHERROR
* KILLED
* FAILED
*
*
从状态字符串的含义,我们大概就能知道它的含义,这里不做过多解释,可以查阅官方文档。现在,我们关注一下这些状态之间是怎样转移的,从一个状态变成哪些状态是合法的,如下表所示:

转移前状态

转以后状态集合

PREP PREPSUSPENDED | PREPPAUSED | RUNNING | KILLED
RUNNING RUNNINGWITHERROR | SUSPENDED | PAUSED | SUCCEEDED | KILLED
RUNNINGWITHERROR RUNNING | SUSPENDEDWITHERROR | PAUSEDWITHERROR | DONEWITHERROR | KILLED | FAILED
PREPSUSPENDED PREP | KILLED
SUSPENDED RUNNING | KILLED
SUSPENDEDWITHERROR RUNNINGWITHERROR | KILLED
PREPPAUSED PREP | KILLED
PAUSED SUSPENDED | RUNNING | KILLED
PAUSEDWITHERROR SUSPENDEDWITHERROR | RUNNINGWITHERROR | KILLED

我们可以看到,Coordinator Job的状态比一个基本的Oozie工作流Job的状态要复杂的多,因为Coordinator Job的基本执行单元可能是一个基本Oozie Job,而且外加了一些调度信息,必然要增加额外的状态来描述。

Coordinator动作(Coordinator Action)

*

一个Coordinator Job会创建并执行Coordinator 动作(Coordinator Action)。通常一个Coordinator 动作是一个工作流Job,这个工作流Job会生成一个dataset实例并处理这个数据集。当一个一个Coordinator 动作被创建以后,它会一直等待满足执行条件的所有输入事件的完成然后执行,或者发生超时。
每个Coordinator Job都有一个驱动事件,来决定它所包含的Coordinator动作的初始化(创建)。对于同步Coordinator Job(synchronous coordinator job)来说,触发执行频率(frequency)就是一个驱动事件。
同样,组成Coordinator Job的基本单元是Coordinator 动作(Coordinator Action),它不像Oozie工作流Job只有OK和Error两个执行结果,一个Coordinator 动作的状态集合,如下所示:

* WAITING
* READY
* SUBMITTED
* TIMEDOUT
* RUNNING
* KILLED
* SUCCEEDED
* FAILED

一个Coordinator 动作的状态变迁情况,如下表所示:

*

转移前状态

转以后状态集合

WAITING READY | TIMEDOUT | KILLED
READY SUBMITTED | KILLED
SUBMITTED RUNNING | KILLED | FAILED
RUNNING SUCCEEDED | KILLED | FAILED

Coordinator应用定义(Coordinator Application Definition)

一个同步的Coordinator应用定义的语法格式,如下所示:

*

1. 

2.      

3.           [TIME_PERIOD]

4.           [CONCURRENCY]

5.           [EXECUTION_STRATEGY]

6.      

7.      

8.           [SHARED_DATASETS]

9.           ...

10.

11.           

12.           

13.                [URI_TEMPLATE]

14.           

15.           ...

16.      

17.      

18.           

19.                [INSTANCE]

20.                ...

21.           

22.           ...

23.           

24.                [INSTANCE]

25.                [INSTANCE]

26.           

27.           ...

28.      

29.      

30.           

31.                [INSTANCE]

32.           

33.           ...

34.      

35.      

36.           

复制代码

基于上述定义语法格式,我们分别说明对应元素的含义,如下所示:

* control元素

control元素定义了一个Coordinator Job的控制信息,主要包括如下三个配置元素:

元素名称

含义说明

timeout 超时时间,单位为分钟。当一个Coordinator Job启动的时候,会初始化多个Coordinator动作,timeout用来限制这个初始化过程。默认值为-1,表示永远不超时,如果为0 则总是超时。
concurrency 并发数,指多个Coordinator Job并发执行,默认值为1。
execution 配置多个Coordinator Job并发执行的策略:默认是FIFO。另外还有两种:LIFO(最新的先执行)、LAST_ONLY(只执行最新的Coordinator Job,其它的全部丢弃)。
throttle 一个Coordinator Job初始化时,允许Coordinator动作处于WAITING状态的最大数量。

* Dataset元素

Coordinator Job中有一个Dataset的概念,它可以为实际计算提供计算的数据,主要是指HDFS上的数据目录或文件,能够配置数据集生成的频率(Frequency)、URI模板、时间等信息,下面看一下dataset的语法格式:

*

1. 

2.      [URI TEMPLATE]

3.      [FILE NAME]

4. 

5.

6.

复制代码
举例如下:
*

1. 

2.      

3.           hdfs://m1:9000/hive/warehouse/user_events/${YEAR}${MONTH}/${DAY}/data

4.      

5.      donefile.flag

6. 

7.

8.

复制代码

上面会每天都会生成一个用户事件表,可以供Hive查询分析,这里指定了这个数据集的位置,后续计算会使用这部分数据。其中,uri-template指定了一个匹配的模板,满足这个模板的路径都会被作为计算的基础数据。
另外,还有一种定义dataset集合的方式,将多个dataset合并成一个组来定义,语法格式如下所示:
*
*

1. 

2.      [SHARED_DATASETS]

3.      ...

4.      

5.           [URI TEMPLATE]

6.      

7.      ...

8. 

9.

10.

复制代码

* input-events和output-events元素

一个Coordinator应用的输入事件指定了要执行一个Coordinator动作必须满足的输入条件,在Oozie当前版本,只支持使用dataset实例。
一个Coordinator动作可能会生成一个或多个dataset实例,在Oozie当前版本,输出事件只支持输出dataset实例。

EL常量

常量表示形式

含义说明

${coord:minutes(int n)} 返回日期时间:从一开始,周期执行n分钟
${coord:hours(int n)} 返回日期时间:从一开始,周期执行n * 60分钟
${coord:days(int n)} 返回日期时间:从一开始,周期执行n * 24 * 60分钟
${coord:months(int n)} 返回日期时间:从一开始,周期执行n * M * 24 * 60分钟(M表示一个月的天数)
${coord:endOfDays(int n)} 返回日期时间:从当天的最晚时间(即下一天)开始,周期执行n * 24 * 60分钟
${coord:endOfMonths(1)} 返回日期时间:从当月的最晚时间开始(即下个月初),周期执行n * 24 * 60分钟
${coord:current(int n)} 返回日期时间:从一个Coordinator动作(Action)创建时开始计算,第n个dataset实例执行时间
${coord:dataIn(String name)} 在输入事件(input-events)中,解析dataset实例包含的所有的URI
${coord:dataOut(String name)} 在输出事件(output-events)中,解析dataset实例包含的所有的URI
${coord:offset(int n, String timeUnit)} 表示时间偏移,如果一个Coordinator动作创建时间为T,n为正数表示向时刻T之后偏移,n为负数向向时刻T之前偏移,timeUnit表示时间单位(选项有MINUTE、HOUR、DAY、MONTH、YEAR)
${coord:hoursInDay(int n)} 指定的第n天的小时数,n>0表示向后数第n天的小时数,n=0表示当天小时数,n<0表示向前数第n天的小时数
${coord:daysInMonth(int n)} 指定的第n个月的天数,n>0表示向后数第n个月的天数,n=0表示当月的天数,n<0表示向前数第n个月的天数
${coord:tzOffset()} ataset对应的时区与Coordinator Job的时区所差的分钟数
${coord:latest(int n)} 最近以来,当前可以用的第n个dataset实例
${coord:future(int n, int limit)} 当前时间之后的dataset实例,n>=0,当n=0时表示立即可用的dataset实例,limit表示dataset实例的个数

${coord:nominalTime()} nominal时间等于Coordinator Job启动时间,加上多个Coordinator Job的频率所得到的日期时间。例如:start=”2009-01-01T24:00Z”,end=”2009-12-31T24:00Z”,frequency=”${coord:days(1)}”,frequency=”${coord:days(1)},则nominal时间为:2009-01-02T00:00Z、2009-01-03T00:00Z、2009-01-04T00:00Z、…、2010-01-01T00:00Z
${coord:actualTime()} Coordinator动作的实际创建时间。例如:start=”2011-05-01T24:00Z”,end=”2011-12-31T24:00Z”,frequency=”${coord:days(1)}”,则实际时间为:2011-05-01,2011-05-02,2011-05-03,…,2011-12-31
${coord:user()} 启动当前Coordinator Job的用户名称
${coord:dateOffset(String baseDate, int instance, String timeUnit)} 计算新的日期时间的公式:newDate = baseDate + instance * timeUnit,如:baseDate=’2009-01-01T00:00Z’,instance=’2′,timeUnit=’MONTH’,则计算得到的新的日期时间为’2009-03-01T00:00Z’。
${coord:formatTime(String timeStamp, String format)} 格式化时间字符串,format指定模式

配置举例

下面,根据官网上给出的例子,进行说明,配置例子如下所示:

1. 

4.      

5.           

7.                hdfs://bar:8020/app/logs/${YEAR}${MONTH}/${DAY}

8.                

9.           

10.           

12.                hdfs://bar:8020/app/weeklystats/${YEAR}/${MONTH}/${DAY}

13.                

14.           

15.      

16.      

17.           

18.                ${coord:current(-6)}

19.                ${coord:current(0)}

20.           

21.      

22.      

23.           

24.                ${coord:current(0)}

25.           

26.      

27.      

28.           

29.                hdfs://bar:8020/usr/joe/logsprocessor-wf

30.                

31.                     

32. wfInput

33.                          ${coord:dataIn('input')}

34.                     

35.                     

36.                          wfOutput

37.                          ${coord:dataOut('output')}

38.                     

39.                

40.           

41.      

42. 

43.

44.

复制代码

名称为logs的dataset实例频率为1天,它配置的初始实例时间为2009-01-07T24:00Z,则在input-events输入事件中开始实例(start-instance)时间为6天前,即2009-01-01T24:00Z,结束实例(end-instance)时间为当天时间。
后半部分中定义了action,其中${coord:dataIn(‘input’)}表示解析名称为input的输入事件所关联的URI(即HDFS上的文件或目录)。

大数据系列相关文章:

最新评论
江湖小虾米2014-09-10 02:57:57
视频:【从技术角度思考Hadoop到底是什么】大数据是个铺天盖地的词,而谈论大数据又不可避免地要提到Hadoop,遗憾的是今天大多数大数据鼓吹者,甚至专业人士其实并不能说清楚Hadoop到底是什么玩意,王家林老师从技术角度思考Hadoop到底是什么。@王家林_Android_HTML5 http://t.cn/8souHnB
小猫2014-09-09 09:41:28
Hadoop集群环境下网络架构的设计与优化-CSDN.NET http://t.cn/8s9Ff2j
shen19882014-09-08 11:41:58
[图片]这个图是我的Hadoop集群的规模。可是我在启动zkServer之后,发现[图片]模式为:standalone,但是教案上要求的是follower,这是怎么一回事啊?
夜下的流星2014-09-07 09:19:26
[图片]
临高2014-09-07 12:14:17
[图片]
unknown_4042014-09-06 12:13:02
分享了一篇文章:《看hadoop视频学习,税后12K,真牛》 http://t.cn/8s6EHyg
钟先生2014-09-05 11:00:02
在本机虚拟机上
罗sir2014-09-04 10:26:25
./hdfs dfs -ls /
真心真意2014-09-04 02:13:35
【大数据巨头Cloudera或9月入华】面对中国市场对大数据解决方案的强劲需求,大数据巨头Cloudera也计划进入这一市场。Cloudera将与英特尔合作向中国客户推出融合版本Hadoop产品。 http://t.cn/8sgnv9o
龙冰伦2014-09-03 09:33:42
发现一篇说话超直接的技术日志,很有意思。从技术上讲,确实同no future on GP. 《update: hadoop vs GP》 http://t.cn/Rv31xBp
 
  • Hadoop生态系统资料推荐