Flume+Kafka+Storm模拟应用日志的实时处理
模拟应用需求
- 采集订单系统应用打印的日志文件。
日志文件使用log4j生成,滚动生成。使用tail -F xxx.log
来监控文件名称,理解tail -f和tail -F的区别。
- 将采集的日志文件保存到Kafka中。
(source)输入:tail -F xxx.log
(channel)存储:内存
(sink)输出:Kafka
config样例,
a1.source = s1
a1.channel = c1
a1.sink = k1
source exec tail -F xxx.log
channel RAM
sink xxxx.xxxx.xxxx.KafkaSink // 该类必须存放lib目录
sink.topic = orderMq
sink.itcast = itcast
map = getConfig();
value = map.get("itcast")
- 通过Storm程序消费Kafka中数据。
KafkaSpout
Bolt1()
Bolt2()
业务模拟:统计双十一当前的订单金额、订单数量、订单人数。订单金额/数量/人数(整个网站、各个业务线、各个品类、各个店铺、各个品牌、每个商品)。
环境配置
应用安装的一般流程:下载、解压、配置、分发。
在Flume官方网站下载Flume,解压Flume安装包,
tar -zxvf apache-flume-1.6.0-bin.tar.gz -C /export/servers/
cd /export/servers/
ln -s apache-flume-1.6.0-bin flume
配置Flume环境变量,
vi /etc/profile
export FLUME_HOME=/export/servers/apache-flume-1.6.0-bin
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$ZK_HOME/bin:$STORM_HOME/bin:$KAFKA_HOME/bin:$FLUME_HOME/bin
source /etc/profile
创建Flume配置文件,
cd /export/servers/flume/conf/
mkdir myconf
cd myconf/
vi exec.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /export/data/flume_sources/click_log/1.log
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = orderMq
a1.sinks.k1.brokerList = kafka01:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1
准备模拟应用日志数据的目录,
mkdir -p /export/data/flume_sources/click_log
通过脚本模拟生产应用日志数据,
# click_log_out.sh
for((i=0;i<=50000;i++));
do echo "message-"+$i >> /export/data/flume_sources/click_log/1.log;
done
chomd +x click_log_out.sh
打通所有流程,第一步,启动zk集群,
zkServer.sh start
zkServer.sh status
第二步,封装Kafka集群启动和停止的脚本,启动Kafka集群,
start-kafka.sh
第三步,启动Flume客户端,监控日志数据生成,
./bin/flume-ng agent -n a1 -c conf -f conf/myconf/exec.conf -Dflume.root.logger=INFO,console
第四步,创建一个topic并开启consumer,在客户端模拟消费,
kafka-console-consumer.sh --zookeeper zk01:2181 --topic orderMq
第五步,执行应用日志数据生产脚本,
sh click_log_out.sh
整合Storm程序的bug解决
服务端没有启动ZooKeeper,
ERROR org.apache.curator.ConnectionState - Connection timed out for connection string (zk01:2181,zk02:2181,zk03:2181) and timeout (15000) / elapsed (15071)
本地调试Storm程序,本机没有配置kafka的解析,
kafka.consumer.SimpleConsumer - Reconnect due to error:
java.nio.channels.ClosedChannelException: null
放到Storm集群运行,相关环境和jar包冲突,把Storm相关的依赖去掉。
SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
Exception in thread "main" java.lang.ExceptionInInitializerError
at backtype.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:106)
at cn.itcast.storm.kafkaAndStorm.KafkaAndStormTopologyMain.main(KafkaAndStormTopologyMain.java:27)
Caused by: java.lang.RuntimeException: Found multiple defaults.yaml resources. You're probably bundling the Storm jars with your topology jar. [jar:file:/home/hadoop/kafka2storm.jar!/defaults.yaml, jar:file:/export/servers/apache-storm-0.9.5/lib/storm-core-0.9.5.jar!/defaults.yaml]
at backtype.storm.utils.Utils.findAndReadConfigFile(Utils.java:133)
at backtype.storm.utils.Utils.readDefaultConfig(Utils.java:160)
at backtype.storm.utils.Utils.readStormConfig(Utils.java:184)
at backtype.storm.utils.Utils.<clinit>(Utils.ja
本文首发于steem,感谢阅读,转载请注明。
微信公众号「数据分析」,分享数据科学家的自我修养,既然遇见,不如一起成长。
读者交流电报群
知识星球交流群
@padluo, 就知道你写文很有潜力!
多谢捧场支持呀
感谢分享!不过为什么大家都喜欢建个小密圈...
多谢大佬支持哈
学习了,有空来踩博。。
谢谢