oozie常用脚本,取消任务,死锁等

1.常用脚本

#校验配置文件
oozie validate -oozie http://path/to/oozie:11000/oozie /export/servers/oozie/test/coordinator.xml
#运行job
oozie job -oozie http://path/to/oozie:11000/oozie -config /export/servers/oozie/test/coordinator.properties -run
#查看信息
oozie job -oozie http://path/to/oozie:11000/oozie -info 00000**-**-oozie-demo-C
#查看日志
oozie job -oozie http://path/to/oozie:11000/oozie -log 00000**-**--oozie-demo-C
#Kill任务
oozie job -oozie http://path/to/oozie:11000/oozie -kill 00000**-**--oozie-demo-C
#查看所有普通任务
oozie  jobs
oozie jobs -oozie http://path/to/oozie:11000/oozie
#查看定时任务
oozie jobs -jobtype coordinator -oozie http:/path/to/oozie:11000/oozie
oozie jobs -oozie http://path/to/oozie:11000/oozie -jobtype coordinator

2.启停oozie任务

启动任务 java代码
oozie job http://path/to/oozie:11000/oozie -config job.properties -run
停止任务
oozie job http://path/to/oozie:11000/oozie -kill jobid
注意在停止任务的时候可能会出现权限问题,需要修改oozie-site.xml文件
hadoop.proxyuser.oozie.groups *
hadoop.proxyuser.oozie.hosts *
oozie.server.ProxyUserSever.proxyuser.hadoop.hosts *
oozie.server.ProxyUserSever.proxyuser.hadoop.groups *

3.Oozie任务死锁解决方案

产生死锁的必要条件就是任务会产生子任务,而Ooize的机制恰好是这样的:Oozie拉起一个YARN应用的机制是先拉起一个MapReduce任务(称为oozie launcher任务),然后该MR任务拉起真正的任务(文章刚开始提到的那些任务)。举个死锁的例子:某一时刻我们通过Oozie提交了n个Spark任务(通过Oozie的Spark Action或Shell Action),这样Oozie会向YARN提交n个MapReduce任务(oozie launcher),假设m(m≤n)个MR任务获得了资源并且创建了spark任务,但此时队列内的资源都被这m个MR任务占用了,所以spark任务一直在等待资源,而那m个MR任务却在等待spark任务完成返回,这样便产生了死锁。

4.借助hue,启停oozie

实景案例:在实际应用中,尤其是自建大数据集群的情况下,一般情况下会结合hue + oozie来实现集群的任务日常调取;但是hue上面的schedule中没有正在运行的任务,而每个周期结点workflow中一直有任务在运行,我们想要停止某个任务怎么办?

hue+oozie 日常任务调度界面

解决方案:点击workflows, 找到某个固定周期运行的任务,点击任务的,进入到任务的详情页面,点击详情页面的上一级任务id,如下图

oozie workflows

进入到调度任务的详情页面后,会直接调转到schedules页面,如下图;kill掉任务即可停止固定周期的任务

Hive之列转行,行转列

hive sql 处理数据的时候经常会有行转列,列转行的需求。废话不多说,上代码。

hive> select * from test limit 10;
OK
test.user_id    test.order_id
104399    1715131
104399    2105395
104399    1758844
104399    981085
104399    2444143
104399    1458638
104399    968412
104400    1609001
104400    2986088
104400    1795054

列传行–> 把相同user_id的order_id按照逗号转为一行

select user_id,
concat_ws(',',collect_list(order_id)) as order_value 
from test
group by user_id
limit 10;

//结果(简写)
user_id    order_value
104399    1715131,2105395,1758844,981085,2444143

说明:

使用函数:concat_ws(‘,’,collect_set(column))  ;collect_list 不去重,collect_set 去重。 column的数据类型要求是string

行转列 –> 把一行的数据按某个分隔符分割后形成多行

测试数据如下:

hive> select * from test;
OK
test.user_id    test.order_value
104408    2909888,2662805,2922438,674972,2877863,190237
104407    2982655,814964,1484250,2323912,2689723,2034331,1692373,677498,156562,2862492,338128
104406    1463273,2351480,1958037,2606570,3226561,3239512,990271,1436056,2262338,2858678
104405    153023,2076625,1734614,2796812,1633995,2298856,2833641,3286778,2402946,2944051,181577,464232
104404    1815641,108556,3110738,2536910,1977293,424564
104403    253936,2917434,2345879,235401,2268252,2149562,2910478,375109,932923,1989353
104402    3373196,1908678,291757,1603657,1807247,573497,1050134,3402420
104401    814760,213922,2008045,3305934,2130994,1602245,419609,2502539,3040058,2828163,3063469
104400    1609001,2986088,1795054,429550,1812893
104399    1715131,2105395,1758844,981085,2444143,1458638,968412
Time taken: 0.065 seconds, Fetched: 10 row(s)

将order_value的每条记录切割为单元素

select user_id,order_value,order_id
from test
lateral view explode(split(order_value,',')) num as order_id
limit 10;

//结果
user_id    order_value    order_id
104408    2909888,2662805,2922438,674972,2877863,190237    2909888
104408    2909888,2662805,2922438,674972,2877863,190237    2662805
104408    2909888,2662805,2922438,674972,2877863,190237    2922438
104408    2909888,2662805,2922438,674972,2877863,190237    674972
104408    2909888,2662805,2922438,674972,2877863,190237    2877863
104408    2909888,2662805,2922438,674972,2877863,190237    190237
104407    2982655,814964,1484250,2323912,2689723,2034331,1692373,677498,156562,2862492,338128    2982655
104407    2982655,814964,1484250,2323912,2689723,2034331,1692373,677498,156562,2862492,338128    814964
104407    2982655,814964,1484250,2323912,2689723,2034331,1692373,677498,156562,2862492,338128    1484250
104407    2982655,814964,1484250,2323912,2689723,2034331,1692373,677498,156562,2862492,338128    2323912
Time taken: 0.096 seconds, Fetched: 10 row(s)

Spark中Vector转变成Array

Convert a Spark Vector of features into an array

在spark处理数据的时候经常会有某个字段处理成Vector的情况,这样的字段在很多的数据固化组件中可能不能直接存储,这个时候需要我们变通一下,把得到的向量转变成Array或者字符串的格式,便于存储

Spark 3.0 增加了 vector_to_array 的UDF. 不需要自我实现了,直接调用即可。但是如果spark的版本较低,则需要自己实现转换逻辑。

时光机:https://github.com/apache/spark/pull/26910

提供两个代码示例,如果读者明确知道自己的数据是什么格式的向量,例如

import org.apache.spark.ml.linalg.DenseVector
val toArr: Any => Array[Double] = _.asInstanceOf[DenseVector].toArray
val toArrUdf = udf(toArr)
val dataWithFeaturesArr = dataWithFeatures.withColumn("features_arr",toArrUdf('features))

但是如果读者不清楚自己的向量是哪个,那就尽可能去兼容一下,如下

import org.apache.spark.ml.linalg.{SparseVector, Vector}
import org.apache.spark.mllib.linalg.{Vector => OldVector}

private val vectorToArrayUdf = udf { vec: Any =>
    vec match {
      case v: Vector => v.toArray
      case v: OldVector => v.toArray
      case v => throw new IllegalArgumentException(
        "function vector_to_array requires a non-null input argument and input type must be " +
        "`org.apache.spark.ml.linalg.Vector` or `org.apache.spark.mllib.linalg.Vector`, " +
        s"but got ${ if (v == null) "null" else v.getClass.getName }.")
    }
  }.asNonNullable()