当前位置: 首页 > news >正文

实时步数统计系统 kafka + spark +redis

基于微服务架构设计并实现了一个实时步数统计系统,采用生产者-消费者模式,利用Kafka实现消息队列,Spark Streaming处理实时数据流,Redis提供高性能数据存储,实现了一个高并发、低延迟的数据处理系统,支持多用户运动数据的实时采集、传输、处理和统计分析。

1.介绍

1.数据采集与生产者(StepDataProducer)
  • 作用:负责生成用户步数数据并发送到Kafka主题。
  • 原理:生产者会随机生成用户步数数据(包括用户ID、位置、步数和时间戳),然后将这些数据序列化为JSON格式,通过Kafka Producer API发送到指定的Kafka主题。
2. 消息队列(Kafka)
  • 作用:作为消息中间件,负责解耦数据生产者和消费者。
  • 原理:Kafka是一个分布式流处理平台,支持高吞吐量的消息发布和订阅。生产者将数据发送到Kafka主题,消费者从主题中读取数据。Kafka保证消息的顺序性和持久性,支持水平扩展。
3. 实时数据处理(Spark Streaming)
  • 作用:实时处理从Kafka中消费的数据流。
  • 原理:Spark Streaming是一个实时数据处理框架,能够处理实时数据流。它从Kafka中消费数据,进行数据解析和处理(如步数统计),并将处理结果输出到下游系统。Spark Streaming支持微批处理,能够在低延迟的情况下处理大规模数据。
4. 数据存储(Redis)
  • 作用:存储处理后的用户步数数据,支持快速读写。
  • 原理:Redis是一个高性能的内存数据库,支持多种数据结构。处理后的用户步数数据会被存储在Redis中,Redis的高性能读写能力确保了系统的实时性和响应速度。
5. 消费者(StepCounterApp)
  • 作用:从Kafka中消费数据,进行处理并更新到Redis。
  • 原理:消费者从Kafka主题中读取数据,使用Spark Streaming进行实时处理,然后将处理结果(如用户的累计步数)存储到Redis中。消费者负责整个数据处理链路的执行。

2.文件结构

microservices/
├── pom.xml
├── dependency-reduced-pom.xml
├── 教程.txt
├── query
├── target/
└── src/└── main/├── resources/│   └── application.properties  (包含Kafka、Redis和Spark配置)└── java/└── com/└── example/└── stepcounter/├── StepCounterApp.java├── StepDataProducer.java├── service/├── config/└── model/

3.具体代码

Appconfig.java
package com.example.stepcounter.config;import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;public class AppConfig {private static final Properties properties = new Properties();static {try (InputStream input = AppConfig.class.getClassLoader().getResourceAsStream("application.properties")) {if (input == null) {throw new RuntimeException("Unable to find application.properties");}properties.load(input);} catch (IOException e) {throw new RuntimeException("Error loading application.properties", e);}}public static String getProperty(String key) {String value = properties.getProperty(key);return value != null ? value.trim() : null;}public static Properties getKafkaProperties() {Properties kafkaProps = new Properties();kafkaProps.put("bootstrap.servers", getProperty("kafka.bootstrap.servers"));kafkaProps.put("group.id", getProperty("kafka.group.id"));kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return kafkaProps;}
} 
Userstep.java
package com.example.stepcounter.model;import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.AllArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class UserStep {private String userId;private String location;private Integer steps;private Long timestamp;
} 
RedisService.java
package com.example.stepcounter.service;import com.example.stepcounter.config.AppConfig;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;public class RedisService {private static final JedisPool jedisPool;static {JedisPoolConfig poolConfig = new JedisPoolConfig();poolConfig.setMaxTotal(10);poolConfig.setMaxIdle(5);poolConfig.setMinIdle(1);String host = AppConfig.getProperty("redis.host");int port = Integer.parseInt(AppConfig.getProperty("redis.port"));String password = AppConfig.getProperty("redis.password");int database = Integer.parseInt(AppConfig.getProperty("redis.database"));if (password != null && !password.trim().isEmpty()) {jedisPool = new JedisPool(poolConfig, host, port, 2000, password, database);} else {jedisPool = new JedisPool(poolConfig, host, port, 2000);}}public static void incrementUserSteps(String userId, int steps) {try (Jedis jedis = jedisPool.getResource()) {String key = "user:" + userId + ":steps";jedis.incrBy(key, steps);}}public static Long getUserTotalSteps(String userId) {try (Jedis jedis = jedisPool.getResource()) {String key = "user:" + userId + ":steps";String steps = jedis.get(key);return steps != null ? Long.parseLong(steps) : 0L;}}
} 
StepCounterApp.java
package com.example.stepcounter;import com.example.stepcounter.config.AppConfig;
import com.example.stepcounter.model.UserStep;
import com.example.stepcounter.service.RedisService;
import com.google.gson.Gson;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;public class StepCounterApp {private static final Gson gson = new Gson();public static void main(String[] args) throws InterruptedException {// 创建Spark配置SparkConf sparkConf = new SparkConf().setAppName(AppConfig.getProperty("spark.app.name")).setMaster(AppConfig.getProperty("spark.master"));// 创建StreamingContextJavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf,Durations.seconds(Integer.parseInt(AppConfig.getProperty("spark.streaming.batch.duration"))));// 配置Kafka消费者Properties kafkaProps = AppConfig.getKafkaProperties();String topic = AppConfig.getProperty("kafka.topic");// 将Properties转换为MapMap<String, Object> kafkaParams = new HashMap<>();for (String key : kafkaProps.stringPropertyNames()) {kafkaParams.put(key, kafkaProps.get(key));}// 创建Kafka输入流JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(Collections.singletonList(topic), kafkaParams));// 处理数据流stream.foreachRDD(rdd -> {rdd.foreach(record -> {String json = record.value();UserStep userStep = gson.fromJson(json, UserStep.class);// 更新Redis中的用户步数RedisService.incrementUserSteps(userStep.getUserId(), userStep.getSteps());// 获取并打印用户总步数Long totalSteps = RedisService.getUserTotalSteps(userStep.getUserId());System.out.printf("User %s at %s walked %d steps, total steps: %d%n",userStep.getUserId(),userStep.getLocation(),userStep.getSteps(),totalSteps);});});// 启动Streaming处理streamingContext.start();streamingContext.awaitTermination();}
} 
StepDataProducer.java
package com.example.stepcounter;import com.example.stepcounter.config.AppConfig;
import com.example.stepcounter.model.UserStep;
import com.google.gson.Gson;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.Random;public class StepDataProducer {private static final Gson gson = new Gson();private static final Random random = new Random();private static final String[] LOCATIONS = {"Home", "Park", "Office", "Gym", "Mall"};private static final String[] USER_IDS = {"user1", "user2", "user3", "user4", "user5"};public static void main(String[] args) throws InterruptedException {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfig.getProperty("kafka.bootstrap.servers"));props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<>(props);String topic = AppConfig.getProperty("kafka.topic");try {while (true) {UserStep userStep = generateRandomUserStep();String json = gson.toJson(userStep);producer.send(new ProducerRecord<>(topic, userStep.getUserId(), json));System.out.println("Sent: " + json);Thread.sleep(1000); // 每秒发送一条数据}} finally {producer.close();}}private static UserStep generateRandomUserStep() {String userId = USER_IDS[random.nextInt(USER_IDS.length)];String location = LOCATIONS[random.nextInt(LOCATIONS.length)];int steps = random.nextInt(100) + 1; // 1-100步long timestamp = System.currentTimeMillis();return new UserStep(userId, location, steps, timestamp);}
} 
application.properties
# Kafka Configuration
kafka.bootstrap.servers=localhost:9092
kafka.topic=user-steps
kafka.group.id=step-counter-group# Redis Configuration
redis.host=localhost
redis.port=6379
redis.password=
redis.database=0# Spark Configuration
spark.app.name=StepCounter
spark.master=local[2]
spark.streaming.batch.duration=5 
dependency-reduced-pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>step-counter</artifactId><version>1.0-SNAPSHOT</version><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target></configuration></plugin><plugin><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><transformer><mainClass>com.example.stepcounter.StepCounterApp</mainClass></transformer><transformer /></transformers></configuration></execution></executions></plugin></plugins></build><dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.26</version><scope>provided</scope></dependency></dependencies><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><spark.version>3.3.0</spark.version><maven.compiler.target>1.8</maven.compiler.target><kafka.version>3.5.0</kafka.version></properties>
</project>
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>step-counter</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><spark.version>3.3.0</spark.version><kafka.version>3.5.0</kafka.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!-- Spark Core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency><!-- Spark Streaming --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>${spark.version}</version></dependency><!-- Spark Streaming Kafka --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency><!-- Kafka Clients --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.version}</version></dependency><!-- Redis --><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>4.3.1</version></dependency><!-- Lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.26</version><scope>provided</scope></dependency><!-- Gson --><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.10.1</version></dependency><!-- SLF4J --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.36</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.36</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.example.stepcounter.StepCounterApp</mainClass></transformer><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/></transformers></configuration></execution></executions></plugin></plugins></build>
</project> 

4.运行

在window开了6个cmd

REM 启动zookeeper!!C:\kafka\kafka_2.12-3.6.1\bin\windows>zookeeper-server-start.bat ..\..\config\zookeeper.propertiesREM 启动Redis!!!C:\Users\86182>redis-server.exeREM 启动kafka!!!C:\kafka\kafka_2.12-3.6.1\bin\windows>kafka-server-start.bat ..\..\config\server.propertiesREM 创建kafka主题!!!C:\kafka\kafka_2.12-3.6.1\bin\windows>kafka-topics.bat --create --topic user-steps --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Created topic user-steps.REM 消费者!!!C:\microservices>java -cp target/step-counter-1.0-SNAPSHOT.jar com.example.stepcounter.StepCounterAppREM 生产者!!!C:\microservices>java -cp target/step-counter-1.0-SNAPSHOT.jar com.example.stepcounter.StepDataProducer

相关文章:

  • springboot在eclipse里面运行 run as 是Java Application还是 Maven
  • 如何创建一个父类 Maven项目,然后在父类下再创建子项目,构建多模块 Maven 项目
  • 深入浅出JavaScript常见设计模式:从原理到实战(1)
  • 基于Python+Flask的MCP SDK响应式文档展示系统设计与实现
  • 第J5周:DenseNet+SE-Net实战
  • 机器学习漏洞大汇总——利用机器学习服务
  • 手撕C++STL list:深入理解双向链表的实现
  • 电子病历高质量语料库构建方法与架构项目(计划篇)
  • CSS3 基础(背景-文本效果)
  • Flask + ajax上传文件(二)--多文件上传
  • 【数据分析】酵母实验多指标数据的 R 语言分析与可视化
  • Day-3 应急响应实战
  • 深入解析微软MarkitDown:原理、应用与二次开发指南
  • 使用深度 Q 学习解决Lunar lander问题
  • arm64适配系列文章-第六章-arm64环境上rabbitmq-management的部署,构建cluster-operator
  • Web3钱包开发功能部署设计
  • Pikachu靶场
  • 【LLM+Code】Windsurf Agent 模式PromptTools详细解读
  • Rundeck 介绍及安装:自动化调度与执行工具
  • 如何在 Odoo 18 中配置自动化动作
  • 加拿大财长:加拿大需要抗击美国关税
  • 纳斯达克中国金龙指数涨2.93%,金价油价大幅下挫
  • 泽连斯基提议乌俄“立即、全面和无条件”停火
  • 小米首次参加上海车展:没有雷军依旧人气爆棚,YU7上市时间未推迟
  • 聚焦“共赢蓝色未来” “海洋命运共同体”上海论坛举行
  • 中国空间站已在轨实施了200余项科学与应用项目