当前位置: 首页 > news >正文

SpringBoot3集成ES8.15实现余额监控

1. gradle依赖新增

implementation 'org.springframework.boot:spring-boot-starter-data-elasticsearch'
implementation 'co.elastic.clients:elasticsearch-java:8.15.0'

2. application.yml配置

spring:elasticsearch:uris: http://localhost:9200username: elasticpassword: elasticsearch
#    ssl:
#      certificate-authorities: /path/to/http_ca.crt

3. 配置相关bean

@Configuration
public class ElasticsearchConfig {@Value("${spring.elasticsearch.uris}")private String elasticsearchUrl;@Beanpublic RestClient restClient() {// 创建不验证SSL的HttpClientSSLContext sslContext;try {sslContext = SSLContexts.custom().loadTrustMaterial(null, (chain, authType) -> true).build();} catch (Exception e) {throw new RuntimeException("Failed to create SSL context", e);}return RestClient.builder(HttpHost.create(elasticsearchUrl)).setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setSSLContext(sslContext).setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)).build();}@Beanpublic ElasticsearchTransport elasticsearchTransport(RestClient restClient) {return new RestClientTransport(restClient, new JacksonJsonpMapper());}@Beanpublic ElasticsearchClient elasticsearchClient(ElasticsearchTransport transport) {return new ElasticsearchClient(transport);}
}

4. 查询示例

@Service
@RequiredArgsConstructor
@Slf4j
public class DogeServiceImpl implements DogeService {// 存储每个地址的最近余额private static final Map<String, Long> balanceHistory = new HashMap<>();private static final long THRESHOLD = 1_000_000; // 变化阈值:1000万 Dogeprivate final ElasticsearchClient elasticsearchClient;private static final int TOP_ADDRESS_LIMIT = 100;private static final int HISTORY_QUERY_LIMIT = 10000;@Overridepublic List<DogeDto> getTop100DogeBalanceHistory(DogeQueryDto dogeQueryDto) {try {// 1. 获取当前排名前100的地址(保持rank顺序)List<Doge> topAddresses = getCurrentTop100Addresses();// 2. 查询这些地址的历史记录(保持rank顺序)return getHistoryForAddresses(topAddresses, dogeQueryDto);} catch (IOException e) {throw new RuntimeException("查询Doge历史余额失败", e);}}@Override@Scheduled(cron = "0 33 18 * * ?")public void checkDogeBalance() {List<DogeDto> doges = getTop100DogeBalanceHistory(new DogeQueryDto());if (doges == null || doges.isEmpty()) {System.out.println("未获取到 Dogecoin 数据");return;}for (DogeDto dto : doges) {String address = dto.getAddress();long currentBalance = dto.getHistory().get(0).getBalance();String wallet = dto.getWallet();// 初始化或获取上一次余额Long previousBalance = balanceHistory.getOrDefault(address, null);// 检查余额变化if (previousBalance == null) {
//                log.info("[" + java.time.LocalDate.now() + "] 初次记录地址 " + address +
//                        " 的余额: " + currentBalance);} else {long difference = currentBalance - previousBalance;if (Math.abs(difference) >= THRESHOLD) {String action = difference > 0 ? "增加" : "减少";String message = "[" + java.time.LocalDate.now() + "] 警告: 地址 " + address +" 的余额" + action + "了 " + Math.abs(difference) +" Dogecoin (之前: " + previousBalance + ", 现在: " + currentBalance + ")";if (StringUtils.isNotBlank(wallet)) {message += " [重点监控钱包: " + wallet + "]";}log.info(message);} else {
//                    log.info("[" + java.time.LocalDate.now() + "] 地址 " + address +
//                            " 的余额变化: " + difference + " (未达提醒阈值)");}}// 更新历史余额balanceHistory.put(address, currentBalance);// 对于有 wallet 标记的地址,检查历史记录中的变化if (StringUtils.isNotBlank(wallet)) {List<DogeDto.History> histories = dto.getHistory();if (histories != null && !histories.isEmpty()) {long lastHistoryBalance = histories.get(0).getBalance(); // 假设第一个是最近的历史记录for (int i = 1; i < histories.size(); i++) {long olderBalance = histories.get(i).getBalance();long historyDifference = lastHistoryBalance - olderBalance;if (Math.abs(historyDifference) >= THRESHOLD) {String action = historyDifference > 0 ? "减少" : "增加";log.info("[" + histories.get(i).getDate() + "] 历史警告: 地址 " + address +" 的余额" + action + "了 " + Math.abs(historyDifference) +" Dogecoin [重点监控钱包: " + wallet + "]");}lastHistoryBalance = olderBalance;}}}}}/*** 获取当前排名前100的地址(按rank升序)*/private List<Doge> getCurrentTop100Addresses() {SearchResponse<Doge> response = null;try {response = elasticsearchClient.search(s -> s.index("doge").size(TOP_ADDRESS_LIMIT).sort(so -> so.field(f -> f.field("rank").order(SortOrder.Asc))).collapse(c -> c.field("address")) // 按地址去重.source(sc -> sc.filter(f -> f.includes("address", "wallet", "rank"))) // 只获取必要字段, Doge.class);} catch (IOException e) {throw new RuntimeException(e);}return response.hits().hits().stream().map(Hit::source).filter(Objects::nonNull).sorted(Comparator.comparing(Doge::getRank)) // 确保排序.collect(Collectors.toList());}/*** 根据地址列表查询历史记录*/private List<DogeDto> getHistoryForAddresses(List<Doge> topAddresses, DogeQueryDto queryDto) throws IOException {// 提取地址列表(保持rank顺序)List<String> addresses = topAddresses.stream().map(Doge::getAddress).collect(Collectors.toList());// 构建基础查询Query baseQuery = Query.of(q -> q.bool(b -> b.must(m -> m.terms(t -> t.field("address").terms(tv -> tv.value(addresses.stream().map(FieldValue::of).collect(Collectors.toList())))))));// 执行查询SearchResponse<Doge> response = elasticsearchClient.search(s -> s.index("doge").size(HISTORY_QUERY_LIMIT).query(baseQuery).sort(so -> so.field(f -> f.field("c_date").order(SortOrder.Asc))) // 按日期排序, Doge.class);// 按地址分组历史记录Map<String, List<Doge>> groupedByAddress = response.hits().hits().stream().map(Hit::source).filter(Objects::nonNull).collect(Collectors.groupingBy(Doge::getAddress));// 按照原始rank顺序构建结果return topAddresses.stream().map(addressDoc -> {List<Doge> historyDocs = groupedByAddress.getOrDefault(addressDoc.getAddress(), Collections.emptyList());return convertToDogeDto(addressDoc, historyDocs);}).filter(Objects::nonNull).collect(Collectors.toList());}/*** 转换为DTO对象*/private DogeDto convertToDogeDto(Doge addressDoc, List<Doge> historyDocs) {if (historyDocs.isEmpty()) {return null;}DogeDto dto = new DogeDto();dto.setAddress(addressDoc.getAddress());dto.setWallet(addressDoc.getWallet());// 历史记录按日期排序dto.setHistory(historyDocs.stream().sorted(Comparator.comparing(Doge::getC_date)).map(this::convertToHistoryDto).collect(Collectors.toList()));return dto;}/*** 转换为History DTO*/private DogeDto.History convertToHistoryDto(Doge doc) {DogeDto.History history = new DogeDto.History();history.setDate(doc.getC_date());history.setBalance(doc.getBalance());history.setRank(doc.getRank());history.setPercent(doc.getPercent());return history;}
}

相关文章:

  • 2. 什么是最普通的自动化“裸奔状态”?
  • 【Rust基础】使用Rocket从Token中提取用户信息
  • (mac)Grafana监控系统之监控Linux的Redis
  • Java基础类库常用类库 java.lang、java.util
  • 奇异递归模板设计模式-CRTP
  • Retinex系列图像/视频增强算法介绍
  • Docker Registry(镜像仓库)
  • Java开发软件
  • C++项目 —— 基于多设计模式下的同步异步日志系统(5)(单例模式)
  • Gen - CDPT举例说明:动态上下文前缀(输入先和标签结合,输出结果会更贴近标签内容)
  • 【ROS】航点导航功能
  • 解决vscode找不到Python自定义模块,报错No module named ‘xxx‘
  • 【Redis】Redis中的常见数据类型(一)
  • 通过爬虫方式实现头条号发布视频(2025年4月)
  • 常见的页面报错
  • Spring MVC 如何体现 Model-View-Controller 各自的职责?它们之间是如何协作的?
  • VS Code 远程连接服务器:Anaconda 环境与 Python/Jupyter 运行全指南。研0大模型学习(第六、第七天)
  • LicheeRV Nano 与Ubuntu官方risc-v 镜像混合
  • xss学习3之服务端session
  • 大数据开发知识1:数据仓库
  • 扫描类软件成泄密“推手”,网盘账号密码遭暴力破解
  • 人民日报和音:书写周边命运共同体建设新篇章
  • 市民建议公交广播增加“请勿大声喧哗”提示,上海交通委回复
  • 明查|美军“杜鲁门”号航空母舰遭胡塞武装打击已退役?
  • TP-LINK4.36亿元竞得上海青浦徐泾办公地块,需引入全球领先的总部型企业
  • 国新办将举行发布会,介绍《加快推进服务业扩大开放综合试点工作方案》有关情况