当前位置: 首页 > news >正文

Kafka和flume整合

需求1:利用flume监控某目录中新生成的文件,将监控到的变更数据发送给kafka,kafka将收到的数据打印到控制台:

在flume/conf下添加.conf文件,

vi flume-kafka.conf

# 定义 Agent 组件

a1.sources=r1

a1.sinks=k1

a1.channels=c1

# 配置 Source(监控目录)

a1.sources.r1.type=spooldir

a1.sources.r1.spoolDir=/root/flume-kafka/

a1.sources.r1.inputCharset=utf-8

# 配置 Sink(写入 Kafka)

a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink

#指定写入数据到哪一个topic

a1.sinks.k1.kafka.topic=testTopic

#指定写入数据到哪一个集群

a1.sinks.k1.kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092

#指定写入批次

a1.sinks.k1.kafka.flumeBatchSize=20

#指定acks机制

a1.sinks.k1.kafka.producer.acks=1

# 配置 Channel(内存缓冲)

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

# 最大存储 1000 个 Event

a1.channels.c1.transactionCapacity=100

# 每次事务处理 100 个 Event

 

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

 

在指定目录之下创建文件夹:

kafka中创建topic:

kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --topic testTopic --partitions 3 --replication-factor 3

启动flume:

flume-ng agent -c /opt/software/flume/conf/ -f /opt/software/flume/conf/flume-kafka.conf -n a1 -Dflume.root.logger=INFO,console

启动kafka消费者,验证数据写入成功

kafka-console-consumer.sh --topic testTopic --bootstrap-server node01:9092,node02:9029,node03:9092 --from-beginning

新增测试数据:

echo "hello flume,hello kafka" >> /root/flume-kafka/1.txt

flume:

Kafka消费者

 

需求2:Kafka生产者生成的数据利用Flume进行采集,将采集到的数据打印到Flume的控制台上。

vi kafka-flume.conf

# 定义 Agent 组件

a1.sources=r1

a1.sinks=k1

a1.channels=c1

# 将 Flume Source 设置为 Kafka 消费者,从指定 Kafka 主题拉取数据。

a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource

#指定zookeeper集群地址

a1.sources.r1.zookeepers=node01:2181,node02:2181,node03:2181

#指定kafka集群地址

a1.sources.r1.kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092

#指定生成消息的topic

a1.sources.r1.kafka.topics=testTopic

# 将 Flume 传输的数据内容直接打印到日志中,

a1.sinks.k1.type=logger

# 配置 Channel(内存缓冲)

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transcationCapacity=100

 

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

启动Kafka生产者

kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic testTopic

启动Flume

flume-ng agent -c /opt/software/flume/conf/ -f /opt/software/flume/conf/kafka-flume.conf -n a1 -Dflume.root.logger=INFO,console

在生产者中写入数据

Flume中采集到数据 

 

 

 

 

相关文章:

  • HOW - 如何模拟实现 gpt 展示答案的交互效果
  • Python判断语句-语法:if,if else,if elif else,嵌套,if else语句扁平式写法,案例
  • android jatpack Compose 多数据源依赖处理:从状态管理到精准更新的架构设计
  • kafka整合flume与DStream转换
  • #苍穹外卖# day 10-11
  • Move Registry 发布,实现 Sui 的超级互操作性
  • ubuntu22.04部署Snipe-IT
  • MYSQL 常用字符串函数 和 时间函数详解
  • 信息学奥赛一本通 1509:【例 1】Intervals | OpenJudge 百练 1201:Intervals
  • 云服务器centos 安装hadoop集群
  • CS001-7-hbao
  • 海之淀攻略
  • 【视频时刻检索】Text-Video Retrieval via Multi-Modal Hypergraph Networks 论文阅读
  • 驱动开发硬核特训 · Day 21(上篇) 抽象理解 Linux 子系统:内核工程师的视角
  • Spring的xxxAware接口工作原理-笔记
  • 高等数学第三章---微分中值定理与导数的应用(3.1微分中值定理3.2洛必达法则)
  • 如何设置极狐GitLab 议题截止日?
  • 050_基于springboot的音乐网站
  • 图解YOLO(You Only Look Once)目标检测(v1-v5)
  • 零基础上手Python数据分析 (23):NumPy 数值计算基础 - 数据分析的加速“引擎”
  • 经济日报:美离间国际关系注定徒劳无功
  • 第六次“太空会师”,神舟二十号3名航天员顺利进驻中国空间站
  • 长三角与粤港澳大湾区融合发展,无锡何以成为窗口?
  • 被电诈100万元又要被骗71万元,女子经民警近8小时劝阻幡然醒悟
  • 杨国荣丨《儒耶对话与中国现代思想的生成和发展》序
  • “动漫短剧”值不值得做?