Kafka和flume整合
需求1:利用flume监控某目录中新生成的文件,将监控到的变更数据发送给kafka,kafka将收到的数据打印到控制台:
在flume/conf下添加.conf文件,
vi flume-kafka.conf
# 定义 Agent 组件
a1.sources=r1
a1.sinks=k1
a1.channels=c1
# 配置 Source(监控目录)
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/root/flume-kafka/
a1.sources.r1.inputCharset=utf-8
# 配置 Sink(写入 Kafka)
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
#指定写入数据到哪一个topic
a1.sinks.k1.kafka.topic=testTopic
#指定写入数据到哪一个集群
a1.sinks.k1.kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092
#指定写入批次
a1.sinks.k1.kafka.flumeBatchSize=20
#指定acks机制
a1.sinks.k1.kafka.producer.acks=1
# 配置 Channel(内存缓冲)
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
# 最大存储 1000 个 Event
a1.channels.c1.transactionCapacity=100
# 每次事务处理 100 个 Event
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
在指定目录之下创建文件夹:
kafka中创建topic:
kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --topic testTopic --partitions 3 --replication-factor 3
启动flume:
flume-ng agent -c /opt/software/flume/conf/ -f /opt/software/flume/conf/flume-kafka.conf -n a1 -Dflume.root.logger=INFO,console
启动kafka消费者,验证数据写入成功
kafka-console-consumer.sh --topic testTopic --bootstrap-server node01:9092,node02:9029,node03:9092 --from-beginning
新增测试数据:
echo "hello flume,hello kafka" >> /root/flume-kafka/1.txt
flume:
Kafka消费者
需求2:Kafka生产者生成的数据利用Flume进行采集,将采集到的数据打印到Flume的控制台上。
vi kafka-flume.conf
# 定义 Agent 组件
a1.sources=r1
a1.sinks=k1
a1.channels=c1
# 将 Flume Source 设置为 Kafka 消费者,从指定 Kafka 主题拉取数据。
a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource
#指定zookeeper集群地址
a1.sources.r1.zookeepers=node01:2181,node02:2181,node03:2181
#指定kafka集群地址
a1.sources.r1.kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092
#指定生成消息的topic
a1.sources.r1.kafka.topics=testTopic
# 将 Flume 传输的数据内容直接打印到日志中,
a1.sinks.k1.type=logger
# 配置 Channel(内存缓冲)
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transcationCapacity=100
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
启动Kafka生产者
kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic testTopic
启动Flume
flume-ng agent -c /opt/software/flume/conf/ -f /opt/software/flume/conf/kafka-flume.conf -n a1 -Dflume.root.logger=INFO,console
在生产者中写入数据
Flume中采集到数据