java kafka
安装
安装下载
导入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>kafka</artifactId><groupId>com.tt</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>kafkaProducer</artifactId><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.0</version> <!-- 根据需要选择合适的版本 --></dependency></dependencies></project>
创建producer项目
配置文件
# application.yaml
spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: myGroup#auto.offset.reset = latest 的含义:#若一个 Topic 有历史数据,但消费者组是首次启动且未提交过 offset,则不会消费历史消息,只会接收启动后新产生的数据。#earliest:无 offset 时从头消费,适合需处理全量数据的场景。#none:无 offset 时抛出异常,需手动处理,适合对重复消费敏感的业务。auto-offset-reset: earliestenable-auto-commit: truekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerapplication:name: producer
server:port: 9124
发送
package com.tt.control;import com.tt.common.Rs;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import java.util.List;@RestController
public class Controller {@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;@GetMapping("/test")public Rs tt(){kafkaTemplate.send("tetete","6");return Rs.success();}}
创建consumer
依赖一样,配置文件一样
消费
package com.tt.control;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class HelloKafka {@KafkaListener(topics = "tetete")public void onMessage(String data){System.out.println(data);}
}
可视化工具
Kafka Tool