1. gradle依赖新增
implementation 'org.springframework.boot:spring-boot-starter-data-elasticsearch'
implementation 'co.elastic.clients:elasticsearch-java:8.15.0'
2. application.yml配置
spring:elasticsearch:uris: http://localhost:9200username: elasticpassword: elasticsearch
3. 配置相关bean
@Configuration
public class ElasticsearchConfig {@Value("${spring.elasticsearch.uris}")private String elasticsearchUrl;@Beanpublic RestClient restClient() {SSLContext sslContext;try {sslContext = SSLContexts.custom().loadTrustMaterial(null, (chain, authType) -> true).build();} catch (Exception e) {throw new RuntimeException("Failed to create SSL context", e);}return RestClient.builder(HttpHost.create(elasticsearchUrl)).setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setSSLContext(sslContext).setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)).build();}@Beanpublic ElasticsearchTransport elasticsearchTransport(RestClient restClient) {return new RestClientTransport(restClient, new JacksonJsonpMapper());}@Beanpublic ElasticsearchClient elasticsearchClient(ElasticsearchTransport transport) {return new ElasticsearchClient(transport);}
}
4. 查询示例
@Service
@RequiredArgsConstructor
@Slf4j
public class DogeServiceImpl implements DogeService {private static final Map<String, Long> balanceHistory = new HashMap<>();private static final long THRESHOLD = 1_000_000; private final ElasticsearchClient elasticsearchClient;private static final int TOP_ADDRESS_LIMIT = 100;private static final int HISTORY_QUERY_LIMIT = 10000;@Overridepublic List<DogeDto> getTop100DogeBalanceHistory(DogeQueryDto dogeQueryDto) {try {List<Doge> topAddresses = getCurrentTop100Addresses();return getHistoryForAddresses(topAddresses, dogeQueryDto);} catch (IOException e) {throw new RuntimeException("查询Doge历史余额失败", e);}}@Override@Scheduled(cron = "0 33 18 * * ?")public void checkDogeBalance() {List<DogeDto> doges = getTop100DogeBalanceHistory(new DogeQueryDto());if (doges == null || doges.isEmpty()) {System.out.println("未获取到 Dogecoin 数据");return;}for (DogeDto dto : doges) {String address = dto.getAddress();long currentBalance = dto.getHistory().get(0).getBalance();String wallet = dto.getWallet();Long previousBalance = balanceHistory.getOrDefault(address, null);if (previousBalance == null) {
} else {long difference = currentBalance - previousBalance;if (Math.abs(difference) >= THRESHOLD) {String action = difference > 0 ? "增加" : "减少";String message = "[" + java.time.LocalDate.now() + "] 警告: 地址 " + address +" 的余额" + action + "了 " + Math.abs(difference) +" Dogecoin (之前: " + previousBalance + ", 现在: " + currentBalance + ")";if (StringUtils.isNotBlank(wallet)) {message += " [重点监控钱包: " + wallet + "]";}log.info(message);} else {
}}balanceHistory.put(address, currentBalance);if (StringUtils.isNotBlank(wallet)) {List<DogeDto.History> histories = dto.getHistory();if (histories != null && !histories.isEmpty()) {long lastHistoryBalance = histories.get(0).getBalance(); for (int i = 1; i < histories.size(); i++) {long olderBalance = histories.get(i).getBalance();long historyDifference = lastHistoryBalance - olderBalance;if (Math.abs(historyDifference) >= THRESHOLD) {String action = historyDifference > 0 ? "减少" : "增加";log.info("[" + histories.get(i).getDate() + "] 历史警告: 地址 " + address +" 的余额" + action + "了 " + Math.abs(historyDifference) +" Dogecoin [重点监控钱包: " + wallet + "]");}lastHistoryBalance = olderBalance;}}}}}private List<Doge> getCurrentTop100Addresses() {SearchResponse<Doge> response = null;try {response = elasticsearchClient.search(s -> s.index("doge").size(TOP_ADDRESS_LIMIT).sort(so -> so.field(f -> f.field("rank").order(SortOrder.Asc))).collapse(c -> c.field("address")) .source(sc -> sc.filter(f -> f.includes("address", "wallet", "rank"))) , Doge.class);} catch (IOException e) {throw new RuntimeException(e);}return response.hits().hits().stream().map(Hit::source).filter(Objects::nonNull).sorted(Comparator.comparing(Doge::getRank)) .collect(Collectors.toList());}private List<DogeDto> getHistoryForAddresses(List<Doge> topAddresses, DogeQueryDto queryDto) throws IOException {List<String> addresses = topAddresses.stream().map(Doge::getAddress).collect(Collectors.toList());Query baseQuery = Query.of(q -> q.bool(b -> b.must(m -> m.terms(t -> t.field("address").terms(tv -> tv.value(addresses.stream().map(FieldValue::of).collect(Collectors.toList())))))));SearchResponse<Doge> response = elasticsearchClient.search(s -> s.index("doge").size(HISTORY_QUERY_LIMIT).query(baseQuery).sort(so -> so.field(f -> f.field("c_date").order(SortOrder.Asc))) , Doge.class);Map<String, List<Doge>> groupedByAddress = response.hits().hits().stream().map(Hit::source).filter(Objects::nonNull).collect(Collectors.groupingBy(Doge::getAddress));return topAddresses.stream().map(addressDoc -> {List<Doge> historyDocs = groupedByAddress.getOrDefault(addressDoc.getAddress(), Collections.emptyList());return convertToDogeDto(addressDoc, historyDocs);}).filter(Objects::nonNull).collect(Collectors.toList());}private DogeDto convertToDogeDto(Doge addressDoc, List<Doge> historyDocs) {if (historyDocs.isEmpty()) {return null;}DogeDto dto = new DogeDto();dto.setAddress(addressDoc.getAddress());dto.setWallet(addressDoc.getWallet());dto.setHistory(historyDocs.stream().sorted(Comparator.comparing(Doge::getC_date)).map(this::convertToHistoryDto).collect(Collectors.toList()));return dto;}private DogeDto.History convertToHistoryDto(Doge doc) {DogeDto.History history = new DogeDto.History();history.setDate(doc.getC_date());history.setBalance(doc.getBalance());history.setRank(doc.getRank());history.setPercent(doc.getPercent());return history;}
}