apache Flume 写入HDFS Sink配置参数说明

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

当前Flume有两个版本Flume 0.9X版本的统称Flume-og,Flume1.X版本的统称Flume-ng。由于Flume-ng经过重大重构,与Flume-og有很大不同,使用时请注意区分。

apache flume

1、简介

Flume-og采用了多Master的方式。为了保证配置数据的一致性,Flume引入了ZooKeeper,用于保存配置数据,ZooKeeper本身可保证配置数据的一致性和高可用,另外,在配置数据发生变化时,ZooKeeper可以通知Flume Master节点。Flume Master间使用gossip协议同步数据。

Flume-ng最明显的改动就是取消了集中管理配置的 Master 和 Zookeeper,变为一个纯粹的传输工具。Flume-ng另一个主要的不同点是读入数据和写出数据由不同的工作线程处理(称为 Runner)。 在 Flume-og 中,读入线程同样做写出工作(除了故障重试)。如果写出慢的话(不是完全失败),它将阻塞 Flume 接收数据的能力。这种异步的设计使读入线程可以顺畅的工作而无需关注下游的任何问题。

2、组件

Agent主要由:source,channel,sink三个组件组成.

Source:从数据发生器接收数据,并将接收的数据以Flume的event格式传递给一个或者多个通道channel,Flume提供多种数据接收的方式,比如Avro,Thrift,twitter1%等Channel:channel是一种短暂的存储容器,它将从source处接收到的event格式的数据缓存起来,直到它们被sinks消费掉,它在source和sink间起着桥梁的作用,channel是一个完整的事务,这一点保证了数据在收发的时候的一致性. 并且它可以和任意数量的source和sink链接. 支持的类型有: JDBC channel , File System channel , Memory channel等.

sink:sink将数据存储到集中存储器比如Hbase和HDFS,它从channels消费数据(events)并将其传递给目标地. 目标地可能是另一个sink,也可能HDFS,HBase.

3、HDFS Sink 配置

Flume HDFS Sink应该是非常常用的,其中的配置参数也比较多,

  • channel
  • type : hdfs
  • path : 写入hdfs的路径,需要包含文件系统标识,比如:hdfs://namenode/flume/webdata/可以使用flume提供的日期及%{host}表达式。
  • filePrefix : 默认值:FlumeData写入hdfs的文件名前缀,可以使用flume提供的日期及%{host}表达式。
  • fileSuffix : 写入hdfs的文件名后缀,比如:.lzo .log等。
  • inUsePrefix : 临时文件的文件名前缀,hdfs sink会先往目标目录中写临时文件,再根据相关规则重命名成最终目标文件;
  • inUseSuffix : 默认值:.tmp临时文件的文件名后缀。
  • rollInterval : 默认值:30hdfs sink间隔多长将临时文件滚动成最终目标文件,单位:秒;如果设置成0,则表示不根据时间来滚动文件;注:滚动(roll)指的是,hdfs sink将临时文件重命名成最终目标文件,并新打开一个临时文件来写入数据;
  • rollSize: 默认值:1024当临时文件达到该大小(单位:bytes)时,滚动成目标文件;如果设置成0,则表示不根据临时文件大小来滚动文件;
  • rollCount : 默认值:10当events数据达到该数量时候,将临时文件滚动成目标文件;如果设置成0,则表示不根据events数据来滚动文件;
  • idleTimeout: 默认值:0当目前被打开的临时文件在该参数指定的时间(秒)内,没有任何数据写入,则将该临时文件关闭并重命名成目标文件;
  • batchSize: 默认值:100每个批次刷新到HDFS上的events数量;
  • codeC: 文件压缩格式,包括:gzip, bzip2, lzo, lzop, snappy
  • fileType: 默认值:SequenceFile文件格式,包括:SequenceFile, DataStream,CompressedStream当使用DataStream时候,文件不会被压缩,不需要设置hdfs.codeC;当使用CompressedStream时候,必须设置一个正确的hdfs.codeC值;
  • maxOpenFiles : 默认值:5000最大允许打开的HDFS文件数,当打开的文件数达到该值,最早打开的文件将会被关闭;
  • minBlockReplicas : 默认值:HDFS副本数写入HDFS文件块的最小副本数。该参数会影响文件的滚动配置,一般将该参数配置成1,才可以按照配置正确滚动文件。待研究。
  • writeFormat : 写sequence文件的格式。包含:Text, Writable(默认)
  • callTimeout : 默认值:10000       执行HDFS操作的超时时间(单位:毫秒);
  • threadsPoolSize : 默认值:10hdfs sink启动的操作HDFS的线程数。
  • rollTimerPoolSize : 默认值:1hdfs sink启动的根据时间滚动文件的线程数。
  • kerberosPrincipal : HDFS安全认证kerberos配置;
  • kerberosKeytab : HDFS安全认证kerberos配置;
  • proxyUser : 代理用户
  • roundm :默认值:false是否启用时间上的”舍弃”,这里的”舍弃”,类似于”四舍五入”,后面再介绍。如果启用,则会影响除了%t的其他所有时间表达式;
  • roundValue : 默认值:1时间上进行“舍弃”的值;
  • roundUnit : 默认值:seconds时间上进行”舍弃”的单位,包含:second,minute,hour 示例:a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%Sa1.sinks.k1.hdfs.round = truea1.sinks.k1.hdfs.roundValue = 10a1.sinks.k1.hdfs.roundUnit = minute当时间为2015-10-16 17:38:59时候,hdfs.path依然会被解析为:/flume/events/20151016/17:30/00因为设置的是舍弃10分钟内的时间,因此,该目录每10分钟新生成一个。
  • timeZone : 默认值:Local Time时区。
  • useLocalTimeStamp : 默认值:flase是否使用当地时间。
  • closeTries : 默认值:0hdfs sink关闭文件的尝试次数;如果设置为1,当一次关闭文件失败后,hdfs sink将不会再次尝试关闭文件,这个未关闭的文件将会一直留在那,并且是打开状态。设置为0,当一次关闭失败后,hdfs sink会继续尝试下一次关闭,直到成功。
  • retryInterval : 默认值:180(秒)hdfs sink尝试关闭文件的时间间隔,如果设置为0,表示不尝试,相当于于将hdfs.closeTries设置成1.
  • serializer : 默认值:TEXT序列化类型。其他还有:avro_event或者是实现了EventSerializer.Builder的类名。

apache hive 函数row_number(),rank(),dense_rank()的应用介绍

场景:hsql(hive sql)处理数据的时候会有取分组后topN的需求,这个时候们可能会用到同一检索条件下的组内排序,涉及到的函数有row_number(),rank(),dense_rank()

  • row_number:不管排名是否有相同的,都按照顺序1,2,3…..n
  • rank:排名相同的名次一样,同一排名有几个,后面排名就会跳过几次
  • dense_rank:排名相同的名次一样,且后面名次不跳跃

数据准备

# province, city, id
浙江,杭州,300
浙江,宁波,150
浙江,温州,200
浙江,嘉兴,100
江苏,南京,270
江苏,苏州,299
江苏,某市,200
江苏,某某市,100
top n 样例数据

普通排序

hive > select * from hive_table order by id desc;
浙江    杭州    300
浙江    宁波    150
浙江    温州    200
浙江    嘉兴    100
江苏    南京    270
江苏    苏州    299
江苏    某市    200
江苏    某某市    100

综合查询排序

hive > select province,city,
rank() over (order by id desc) rank,
dense_rank() over (order by id desc) dense_rank,
row_number() over(order by id desc) row_number
from hive_table
group by province,city,people;
浙江    杭州    300    1    1    1
江苏    苏州    299    2    2    2
江苏    南京    270    3    3    3
江苏    某市    200    4    4    4
浙江    温州    200    4    4    5
浙江    宁波    150    6    5    6
江苏    某某市    100    7    6    7
浙江    嘉兴    100    7    6    8

主要说明讲解如下:

  • row_number顺序下来, 顺序下来的序号不会有重复的
  • rank:在遇到数据相同项时,会留下空位,(红框内第一列,4,4,6) ,相同的会占一个位置,接下来的数字变成 +2
  • dense_rank:在遇到数据相同项时,不会留下空位,(红框内第一列,4,4,5) 相同的是同一个序号,但是接下来的还是 +1

分区统计查询排序

hive > select province,city,
rank() over (partition by province order by id desc) rank,
dense_rank() over (partition by province order by id desc) dense_rank,
row_number() over(partition by province order by id desc) row_number
from hive_table
group by province,city,people;
江苏    苏州    299    1    1    1
江苏    南京    270    2    2    2
江苏    某市    200    3    3    3
江苏    某某市    100    4    4    4
浙江    杭州    300    1    1    1
浙江    温州    200    2    2    2
浙江    宁波    150    3    3    3
浙江    嘉兴    100    4    4    4

apache hive中四种排序order by,sort by, distribute by, cluster by

1. order by

Hive中的order by跟传统的sql语言中的order by作用是一样的,会对查询的结果做一次全局排序,所以说,只有hive的sql中制定了order by所有的数据都会到同一个reducer进行处理(不管有多少map,也不管文件有多少的block只会启动一个reducer)。但是对于大量数据这将会消耗很长的时间去执行。

    这里跟传统的sql还有一点区别:如果指定了hive.mapred.mode=strict(默认值是nonstrict),这时就必须指定limit来限制输出条数,原因是:所有的数据都会在同一个reducer端进行,数据量大的情况下可能不能出结果,那么在这样的严格模式下,必须指定输出的条数。

set hive.mapred.mode=nonstrict; (default value / 默认值)
set hive.mapred.mode=strict;
hive> select * from test order by id;     
FAILED: Error in semantic analysis: 1:28 In strict mode, if ORDER BY is specified, LIMIT must also be specified. Error encountered near token 'id'

原因: 在order by 状态下所有数据会到一台服务器进行reduce操作也即只有一个reduce,如果在数据量大的情况下会出现无法输出结果的情况,如果进行 limit n ,那只有  n * map number 条记录而已。只有一个reduce也可以处理过来。

2、sort by

Hive中指定了sort by,那么在每个reducer端都会做排序,也就是说保证了局部有序(每个reducer出来的数据是有序的,但是不能保证所有的数据是有序的,除非只有一个reducer),好处是:执行了局部排序之后可以为接下去的全局排序提高不少的效率(其实就是做一次归并排序就可以做到全局排序了)。

  • sort by 不受 hive.mapred.mode 是否为strict ,nostrict 的影响
  •  sort by 的数据只能保证在同一reduce中的数据可以按指定字段排序。 
  • 使用sort by 你可以指定执行的reduce 个数 (set mapred.reduce.tasks=<number>) 这样可以输出更多的数据。
  • 对输出的数据再执行归并排序,即可以得到全部结果。

注意:可以用limit子句大大减少数据量。使用limit n后,传输到reduce端(单机)的数据记录数就减少到n* (map个数)。否则由于数据过大可能出不了结果。

3、ditribute by

ditribute by是控制map的输出在reducer是如何划分的。按照指定的字段对数据进行划分到不同的输出reduce。

默认情况下,MapReduce计算框架会依据map输入的键计算相应的哈希值,然后按照得到的哈希值将键-值对均匀分发到多个reducer中去,不过不幸的是,这也是意味着当我们使用sort by 时,不同reducer的输出内容会有明显的重叠,至少对于排序顺序而已只这样,即使每个reducer的输出的数据都有序的。如果我们想让同一年的数据一起处理,那么就可以使用distribute by 来保证具有相同年份的数据分发到同一个reducer中进行处理,然后使用sort by 来安装我们的期望对数据进行排序:

 hive> insert overwrite local directory '/home/hadoop/out' select * from test order by name distribute by length(name);  

 上边例子中的此方法会根据name的长度划分到不同的reduce中,最终输出到不同的文件中。  length 是内建函数,也可以指定其他的函数或这使用自定义函数。

4、cluster by

cluster by的功能就是distribute by和sort by相结合;

注意被cluster by指定的列只能是降序,不能指定asc和desc。