分布式消息队列Kafka学习笔记

in #cn7 years ago (edited)

Kafka概述

a distributed streaming platform

Kafka架构和核心概念

producer, 生产者,生产馒头。

consumer, 消费者,吃馒头。

broker, 篮子。

topic, 主题,给馒头带一个标签,topica的馒头是给你吃的,topicb的馒头是给你弟弟吃。

Zookeeper集群部署

安装包解压

tar -xzvf zookeeper-3.4.5.tar.gz -C /export/servers

zookeeper配置文件修改

cp zoo_sample.cfg zoo.cfg
vi zoo.cfg

#数据目录. 可以是任意目录,其中的dataDir目录和dataLogDir需要提前建立好
#注意 应该谨慎地选择日志存放的位置,使用专用的日志存储设备能够大大地提高系统的性能,如果将日志存储在比较繁忙的存储设备上,那么将会在很大程度上影响系统的性能。
dataDir=/export/servers/data/zookeeper

#log目录, 同样可以是任意目录. 如果没有设置该参数, 将使用和dataDir相同的设置,其中的dataDir目录和dataLogDir需要提前建立好
#注意 应该谨慎地选择日志存放的位置,使用专用的日志存储设备能够大大地提高系统的性能,如果将日志存储在比较繁忙的存储设备上,那么将会在很大程度上影响系统的性能。
dataLogDir=/export/servers/logs/zookeeper

# 主机名:心跳端口:数据端口
server.1=zk01:2888:3888
server.2=zk02:2888:3888
server.3=zk03:2888:3888

myid记录到数据文件夹

# zk01
mkdir -p /export/servers/data/zookeeper
echo 1 > myid
cat myid

zookeeper分发到其他节点

sudo scp -r /export/servers/zookeeper-3.4.5 hadoop@zk02:/export/servers/
# zk02
mkdir -p /export/servers/data/zookeeper
echo 2 > myid

sudo scp -r /export/servers/zookeeper-3.4.5 hadoop@zk03:/export/servers/
# zk03
mkdir -p /export/servers/data/zookeeper
echo 3 > myid

配置环境变量

vi /etc/profile

export ZK_HOME=/export/servers/zookeeper-3.4.5
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$ZK_HOME/bin

source /etc/profile

启动

# 启动
zkServer.sh start
# 查看集群状态和主从信息
zkServer.sh status
# 查看进程
jps -m

export变量作用域解析

export A=1,定义的变量,会对自己所在的shell进程及子进程生效。

B=1,定义的变量,只对自己所在的shell进程生效。

在script.sh中定义的变量,在当前登陆的shell进程中,source script.sh时,脚本中定义的变量也会进入当前登陆的进程。

要在父进程shell可见,可source一下定义export变量的脚本文件,让当前shell可见。

Zookeeper集群启动和停止脚本,可先配置集群机器间的免密登录。

#!/bin/sh
# start-zkServer-cluster.sh

zkServers="zk01 zk02 zk03"
echo "start zkServer..."
for i in $zkServers
do
    echo "start zkServer on ${i} ..."
    ssh $i "source /etc/profile;nohup sh /export/servers/zookeeper-3.4.5/bin/zkServer.sh start > /dev/null 2>&1 &"
done
#!/bin/sh
# stop-zkServer-cluster.sh

zkServers="zk01 zk02 zk03"
echo "stop zkServer..."
for i in $zkServers
do
    echo "stop zkServer on ${i} ..."
    ssh $i "source /etc/profile;nohup sh /export/servers/zookeeper-3.4.5/bin/zkServer.sh stop > /dev/null 2>&1 &"
done

Kafka集群部署及使用

安装包解压

tar -xzvf kafka_2.11-0.9.0.1.tar.gz -C /export/servers

$KAFKA_HOME/config/server.properties修改,集群的每个节点的broker.id和host.name都需要修改。

#broker的全局唯一编号,不能重复
broker.id=0
#用来监听链接的端口,producer或consumer将在此端口建立连接
port=9092
#kafka运行日志存放的路径
log.dirs=/export/servers/logs/kafka
#broker需要使用zookeeper保存meta数据
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
#此处的host.name为本机IP(重要),如果不改,则客户端会抛出:Producer connection to localhost:9092 unsuccessful 错误!
host.name=kafka01

Kafka集群启动和停止脚本

#!/bin/bash
#start-kafka-cluster.sh
brokers="kafka01 kafka02 kafka03"
kafka_home="/export/servers/kafka_2.11-0.9.0.1"
for i in $brokers
do
    echo "Starting kafka on ${i} ... "
    ssh ${i} "source /etc/profile; nohup sh ${kafka_home}/bin/kafka-server-start.sh ${kafka_home}/config/server.properties > /dev/null 2>&1 &"
    if [[ $? -eq 0 ]];  then
        echo "Start kafka on ${i} is OK !"
    fi
done
echo all kafka are started !
exit 0
#!/bin/bash
#stop-kafka-cluster.sh
brokers="kafka01 kafka02 kafka03"
kafka_home="/export/servers/kafka_2.11-0.9.0.1"
for i in $brokers
do
    echo "stop kafka on ${i} ... "
    ssh ${i} "source /etc/profile; nohup sh ${kafka_home}/bin/kafka-server-stop.sh ${kafka_home}/config/server.properties > /dev/null 2>&1 &"
    if [[ $? -eq 0 ]];  then
        echo "stop kafka on ${i} is OK !"
    fi
done
echo all kafka are stoped !
exit 0

启动Kafkastart-kafka-cluster.sh

创建topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic hello_topic

查看所有topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

kafka-topics.sh --list --zookeeper localhost:2181

发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

kafka-console-producer.sh --broker-list zk01:9092 --topic hello_topic

消费消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

kafka-console-consumer.sh --zookeeper zk01:2181 --topic hello_topic --from-beginning

查看topic详细信息

kafka-topics.sh -describe --zookeeper zk01:2181 --topic hello_topic 

整合Flume和Kafka完成实时数据的采集

分布式日志收集框架Flume学习笔记的应用需求3中,将A服务器上的日志实时采集到B服务器,打印到控制台,通过整合Flume和Kafka,把logger sink改为kafka sink,这里的kafka sink是作为producer的角色,通过控制台起一个consumer进行消费来验证。

技术选型:

exec-memory-avro.conf: exec source + memory channel + avro sink

avro-memory-logger.conf: avro source + memory channel + kafka sink

整合Flume和Kafka完成实时数据的采集

# avro-memory-kafka.conf
# Name the components on this agent
avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.channels = memory-channel

# Describe/configure the source
avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = 192.168.169.100
avro-memory-kafka.sources.avro-source.port = 44444

# Describe the sink
avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.brokerList = kafka01:9092,kafka02:9092,kafka03:9092
avro-memory-kafka.sinks.kafka-sink.topic = hello_topic
avro-memory-kafka.sinks.kafka-sink.batchSize = 5
avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1

# Use a channel which buffers events in memory
avro-memory-kafka.channels.memory-channel.type = memory

# Bind the source and sink to the channel
avro-memory-kafka.sources.avro-source.channels = memory-channel
avro-memory-kafka.sinks.kafka-sink.channel = memory-channel

验证,先启动avro-memory-kafka.conf,因为它监听192.168.169.100的44444端口,

flume-ng agent \
--name avro-memory-kafka \
--conf $FLUME_HOME/conf/myconf \
--conf-file $FLUME_HOME/conf/myconf/avro-memory-kafka.conf \
-Dflume.root.logger=INFO,console
flume-ng agent \
--name exec-memory-avro \
--conf $FLUME_HOME/conf/myconf \
--conf-file $FLUME_HOME/conf/myconf/exec-memory-avro.conf \
-Dflume.root.logger=INFO,console

在控制台启动消费者验证,

kafka-console-consumer.sh --zookeeper zk01:2181 --topic hello_topic
echo hellokafka1 >> data.log
echo hellokafka2 >> data.log

本文首发于steem,感谢阅读,转载请注明。

https://steemit.com/@padluo


微信公众号「数据分析」,分享数据科学家的自我修养,既然遇见,不如一起成长。

数据分析


读者交流电报群

https://t.me/sspadluo


知识星球交流群

知识星球读者交流群

Sort:  

@padluo, 这是小可可我在steemit最好的邂逅,好喜欢你的贴(^∀^)哇~~~ img

Coin Marketplace

STEEM 0.21
TRX 0.20
JST 0.033
BTC 92098.56
ETH 3097.80
USDT 1.00
SBD 3.03