Kafka和Spark-Streaming
Kafka和Spark-Streaming
一、Kafka
1、Kafka和Flume的整合
① 需求1:利用flume监控某目录中新生成的文件,将监控到的变更数据发送给kafka,kafka将收到的数据打印到控制台:
在flume/conf下添加.conf文件,
vi flume-kafka.conf
# 定义 Agent 组件
a1.sources=r1
a1.sinks=k1
a1.channels=c1
# 配置 Source(监控目录)
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/root/flume-kafka/
a1.sources.r1.inputCharset=utf-8
# 配置 Sink(写入 Kafka)
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
#指定写入数据到哪一个topic
a1.sinks.k1.kafka.topic=testTopic
#指定写入数据到哪一个集群
a1.sinks.k1.kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092
#指定写入批次
a1.sinks.k1.kafka.flumeBatchSize=20
#指定acks机制
a1.sinks.k1.kafka.producer.acks=1
# 配置 Channel(内存缓冲)
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
# 最大存储 1000 个 Event
a1.channels.c1.transactionCapacity=100
# 每次事务处理 100 个 Event
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
在指定目录之下创建文件夹:
kafka中创建topic:
kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --topic testTopic --partitions 3 --replication-factor 3
启动flume:
flume-ng agent -c /opt/software/flume/conf/ -f /opt/software/flume/conf/flume-kafka.conf -n a1 -Dflume.root.logger=INFO,console
启动kafka消费者,验证数据写入成功
kafka-console-consumer.sh --topic testTopic --bootstrap-server node01:9092,node02:9029,node03:9092 --from-beginning
新增测试数据:
echo "hello flume,hello kafka" >> /root/flume-kafka/1.txt
flume:
Kafka消费者:
② 需求2:Kafka生产者生成的数据利用Flume进行采集,将采集到的数据打印到Flume的控制台上。
vi kafka-flume.conf
# 定义 Agent 组件
a1.sources=r1
a1.sinks=k1
a1.channels=c1
# 将 Flume Source 设置为 Kafka 消费者,从指定 Kafka 主题拉取数据。
a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource
#指定zookeeper集群地址
a1.sources.r1.zookeepers=node01:2181,node02:2181,node03:2181
#指定kafka集群地址
a1.sources.r1.kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092
#指定生成消息的topic
a1.sources.r1.kafka.topics=testTopic
# 将 Flume 传输的数据内容直接打印到日志中,
a1.sinks.k1.type=logger
# 配置 Channel(内存缓冲)
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transcationCapacity=100
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
启动Kafka生产者
kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic testTopic
启动Flume
flume-ng agent -c /opt/software/flume/conf/ -f /opt/software/flume/conf/kafka-flume.conf -n a1 -Dflume.root.logger=INFO,console
在生产者中写入数据
Flume中采集到数据
2、Kafka和SparkStreaming的整合
① 导包。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.2</version>
</dependency>
② 代码实现。
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setNode01("local[*]")
.setAppName(this.getClass.getSimpleName)
val ssc= new StreamingContext(conf,Seconds(2))
// kafka的参数配置
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "hello_topic_group",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("helloTopic3")
//指定泛型的约定[String, String] key value
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.foreachRDD(rdd=>{
rdd.foreach(println)
})
ssc.start()
ssc.awaitTermination()
}
③ 利用Redis维护偏移量。使用Spark消费Kafka中的数据。
val config = ConfigFactory.load()
val conf = new SparkConf()
.setNode01("local[*]")
.setAppName(this.getClass.getSimpleName)
val ssc = new StreamingContext(conf, Seconds(3))
val groupId = "hello_topic_group"
val topic = "helloTopic7"
val topicArr = Array(topic)
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "earliest",
// 是否可以自动提交偏移量 自定义
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// 需要设置偏移量的值
val offsets = mutable.HashMap[TopicPartition, Long]()
// 从redis中获取到值
val jedis1 = JedisPoolUtils.getJedis()
val allPO: util.Map[String, String] = jedis1.hgetAll(groupId + "-" + topic)
// 导入转换
import scala.collection.JavaConversions._
for(i<- allPO){
// 主题 和分区 -> offset
offsets += (new TopicPartition(topic,i._1.toInt) -> i._2.toLong)
}
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
Subscribe[String, String](topicArr, kafkaParams, offsets)
)
stream.foreachRDD(rdd => {
// rdd ConsumerRecord[String, String]
val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val result = rdd.map(_.value()).map((_, 1)).reduceByKey(_ + _)
result.foreachPartition(it => {
val jedis = JedisPoolUtils.getJedis()
it.foreach(tp => jedis.hincrBy("streamkfkwc", tp._1, tp._2))
// 等迭代器中的数据,全部完成之后,再关
jedis.close()
})
// 把偏移量的Array 写入到redis中
val jedis = JedisPoolUtils.getJedis()
ranges.foreach(t => {
jedis.hset(groupId + "-" + t.topic, t.partition.toString, t.untilOffset + "")
})
jedis.close()
})
ssc.start()
ssc.awaitTermination()
二、Spark-Streaming核心编程(三)
DStream转换
DStream 上的操作与 RDD 的类似,分为 Transformations(转换)和 Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种 Window 相关的原语。
1、无状态转化操作
无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每一个 RDD。部分无状态转化操作列在了下表中。
注意,针对键值对的 DStream 转化操作(比如reduceByKey())要添加
import StreamingContext._才能在 Scala 中使用。
需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个 DStream 在内部是由许多 RDD(批次)组成,且无状态转化操作是分别应用到每个 RDD 上的。
例如:reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。
1.1、Transform
Transform 允许 DStream 上执行任意的 RDD-to-RDD 函数。即使这些函数并没有在 DStream的 API 中暴露出来,通过该函数可以方便的扩展 Spark API。该函数每一批次调度一次。其实也就是对 DStream 中的 RDD 应用转换。
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("transform")
val ssc = new StreamingContext(sparkConf,Seconds(3))
val lineDStream :ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999)
val wordAndCountDStream :DStream[(String,Int)] = lineDStream.transform(rdd => {
val words :RDD[String] = rdd.flatMap(_.split(" "))
val wordAndOne :RDD[(String,Int)] = words.map((_,1))
val value :RDD[(String,Int)] = wordAndOne.reduceByKey(_+_)
value
})
wordAndCountDStream.print()
ssc.start()
ssc.awaitTermination()
1.2、join
两个流之间的 join 需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的 RDD 进行 join,与两个 RDD 的 join 效果相同。
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("join")
val ssc = new StreamingContext(sparkConf,Seconds(3))
val lineDStream1 :ReceiverInputDStream[String] = ssc.
socketTextStream("node01",9999)
val lineDStream2 :ReceiverInputDStream[String] = ssc.
socketTextStream("node02",8888)
val wordToOneDStream :DStream[(String,Int)] = lineDStream1
.flatMap(_.split(" ")).map((_,1))
val wordToADstream :DStream[(String,String)] = lineDStream2
.flatMap(_.split(" ")).map((_,"a"))
val joinDStream :DStream[(String,(Int,String))]=wordToOneDStream
.join(wordToADstream)
joinDStream.print()
ssc.start()
ssc.awaitTermination()