目标:flume11监控文件hive.log,flume-22监控某一个端口的数据流,flume11与flume-22将数据发送给flume-33,flume33将最终数据写入到HDFS。
分步实现:
1) 创建flume11.conf,用于监控hive.log文件,同时sink数据到flume-33:
# 1 agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 2 source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/Shalltest a1.sources.r1.shell = /bin/bash -c # 3 sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = bigdata166 a1.sinks.k1.port = 4141 # 4 channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 5. Bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
创建flume22.conf,用于监控端口44444数据流,同时sink数据到flume-33:
# 1 agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # 2 source a2.sources.r1.type = netcat a2.sources.r1.bind = bigdata166 a2.sources.r1.port = 44444 #3 sink a2.sinks.k1.type = avro a2.sinks.k1.hostname = bigdata166 a2.sinks.k1.port = 4141 # 4 channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # 5 Bind a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
创建flume33.conf,用于接收flume11与flume22发送过来的数据流,最终合并后sink到HDFS:
# 1 agent a3.sources = r1 a3.sinks = k1 a3.channels = c1 # 2 source a3.sources.r1.type = avro a3.sources.r1.bind = bigdata166 a3.sources.r1.port = 4141 # 3 sink a3.sinks.k1.type = hdfs a3.sinks.k1.hdfs.path = hdfs://bigdata166:9000/flume3/%H #上传文件的前缀 a3.sinks.k1.hdfs.filePrefix = flume3- #是否按照时间滚动文件夹 a3.sinks.k1.hdfs.round = true #多少时间单位创建一个新的文件夹 a3.sinks.k1.hdfs.roundValue = 1 #重新定义时间单位 a3.sinks.k1.hdfs.roundUnit = hour #是否使用本地时间戳 a3.sinks.k1.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a3.sinks.k1.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a3.sinks.k1.hdfs.fileType = DataStream #多久生成一个新的文件 a3.sinks.k1.hdfs.rollInterval = 600 #设置每个文件的滚动大小大概是128M a3.sinks.k1.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a3.sinks.k1.hdfs.rollCount = 0 #最小冗余数 a3.sinks.k1.hdfs.minBlockReplicas = 1 # 4 channel a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 # 5 Bind a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1
执行测试:
分别开启对应flume-job(依次启动flume-33,flume-22,flume11),同时产生文件变动并观察结果:
$ bin/flume-ng agent --conf conf/ --name a3 --conf-file jobconf/flume33.conf
$ bin/flume-ng agent --conf conf/ --name a2 --conf-file jobconf/flume22.conf
$ bin/flume-ng agent --conf conf/ --name a1 --conf-file jobconf/flume11.conf
telnet bigdata111 44444 打开后发送5555555
在/opt/Shalltest中追加666666
debug思想:
因为Flume是基于事务处理的,因此当启动顺序混乱的时候会在管道内保留一阵子,当重试次数过多后会断开连接,如果不成功可以多注意一下顺序,还有就是可以看下日志以及连接配置相关的一些信息。