当前位置: 首页 > news >正文

聊聊Spring AI Alibaba的ElasticsearchDocumentReader

本文主要研究一下Spring AI Alibaba的ElasticsearchDocumentReader

ElasticsearchDocumentReader

community/document-readers/spring-ai-alibaba-starter-document-reader-elasticsearch/src/main/java/com/alibaba/cloud/ai/document/reader/es/ElasticsearchDocumentReader.java

public class ElasticsearchDocumentReader implements DocumentReader {private final ElasticsearchConfig config;private final ElasticsearchClient client;/*** Constructor that initializes the Elasticsearch client with the provided* configuration.* @param config The Elasticsearch configuration*/public ElasticsearchDocumentReader(ElasticsearchConfig config) {this.config = config;try {this.client = createClient();}catch (Exception e) {throw new RuntimeException("Failed to create Elasticsearch client", e);}}@Overridepublic List<Document> get() {try {// Get all documentsSearchResponse<Map> response = client.search(s -> s.index(config.getIndex()).query(q -> q.matchAll(m -> m)).size(config.getMaxResults()),Map.class);return getDocuments(response);}catch (IOException e) {throw new RuntimeException("Failed to get documents from Elasticsearch", e);}}@NotNullprivate List<Document> getDocuments(SearchResponse<Map> response) {List<Document> documents = new ArrayList<>();response.hits().hits().forEach(hit -> {Map<String, Object> source = hit.source();if (source != null) {Document document = new Document(source.getOrDefault(config.getQueryField(), "").toString(), source);documents.add(document);}});return documents;}/*** Get a document by its ID.* @param id The document ID* @return The document if found, null otherwise*/public Document getById(String id) {try {var response = client.get(g -> g.index(config.getIndex()).id(id), Map.class);if (!response.found() || response.source() == null) {return null;}return new Document(response.source().getOrDefault(config.getQueryField(), "").toString(),response.source());}catch (IOException e) {throw new RuntimeException("Failed to get document from Elasticsearch with id: " + id, e);}}/*** Read documents matching the specified query.* @param query The search query* @return List of matching documents*/public List<Document> readWithQuery(String query) {try {// Build the search request with querySearchResponse<Map> response = client.search(s -> s.index(config.getIndex()).query(q -> q.match(new MatchQuery.Builder().field(config.getQueryField()).query(query).build())).size(config.getMaxResults()), Map.class);return getDocuments(response);}catch (IOException e) {throw new RuntimeException("Failed to read documents from Elasticsearch with query: " + query, e);}}private ElasticsearchClient createClient()throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {// Create HttpHosts for all nodesHttpHost[] httpHosts;if (!CollectionUtils.isEmpty(config.getNodes())) {httpHosts = config.getNodes().stream().map(node -> {String[] parts = node.split(":");return new HttpHost(parts[0], Integer.parseInt(parts[1]), config.getScheme());}).toArray(HttpHost[]::new);}else {// Fallback to single node configurationhttpHosts = new HttpHost[] { new HttpHost(config.getHost(), config.getPort(), config.getScheme()) };}var restClientBuilder = RestClient.builder(httpHosts);// Add authentication if credentials are providedif (StringUtils.hasText(config.getUsername()) && StringUtils.hasText(config.getPassword())) {CredentialsProvider credentialsProvider = new BasicCredentialsProvider();credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials(config.getUsername(), config.getPassword()));// Create SSL context if using HTTPSif ("https".equalsIgnoreCase(config.getScheme())) {SSLContext sslContext = SSLContextBuilder.create().loadTrustMaterial(null, (chains, authType) -> true).build();restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider).setSSLContext(sslContext).setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE));}else {restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));}}// Create the transport and clientElasticsearchTransport transport = new RestClientTransport(restClientBuilder.build(), new JacksonJsonpMapper());return new ElasticsearchClient(transport);}}

ElasticsearchDocumentReader使用elasticsearch官方的ElasticsearchClient去读取数据,它默认读取指定索引的10条记录,并将指定字段的内容作为document的内容

示例

@EnabledIfEnvironmentVariable(named = "ES_HOST", matches = ".+")
public class ElasticsearchDocumentReaderTest {private static final String TEST_INDEX = "spring-ai-test";private static final String TEST_DOC_ID = "1";// Get ES configuration from environment variables, use defaults if not setprivate static final String ES_HOST = System.getenv("ES_HOST") != null ? System.getenv("ES_HOST") : "localhost";private static final int ES_PORT = System.getenv("ES_PORT") != null ? Integer.parseInt(System.getenv("ES_PORT")): 9200;private static final String ES_USERNAME = System.getenv("ES_USERNAME") != null ? System.getenv("ES_USERNAME"): "elastic";private static final String ES_PASSWORD = System.getenv("ES_PASSWORD") != null ? System.getenv("ES_PASSWORD"): "r-tooRd7RgrX_uZV0klZ";private static final String ES_SCHEME = System.getenv("ES_SCHEME") != null ? System.getenv("ES_SCHEME") : "https";private static ElasticsearchClient client;private static ElasticsearchDocumentReader reader;private static ElasticsearchDocumentReader clusterReader;// Flag to indicate if ES is availableprivate static boolean esAvailable = false;static {if (System.getenv("ES_HOST") == null) {System.out.println("ES_HOST environment variable is not set. Tests will be skipped.");}}/*** Check if Elasticsearch is available* @return true if ES is available, false otherwise*/public static boolean isElasticsearchAvailable() {return esAvailable;}/*** Try to connect to Elasticsearch* @return true if connection successful, false otherwise*/private static boolean canConnectToElasticsearch() {try (Socket socket = new Socket()) {socket.connect(new InetSocketAddress(ES_HOST, ES_PORT), 1000);return true;}catch (Exception e) {System.out.println("Cannot connect to Elasticsearch: " + e.getMessage());return false;}}@BeforeAllstatic void setUp() throws IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {// Check if ES_HOST environment variable is setString esHost = System.getenv("ES_HOST");assumeTrue(esHost != null && !esHost.isEmpty(),"Skipping test because ES_HOST environment variable is not set");// Check if we can connect to ESesAvailable = canConnectToElasticsearch();// Skip setup if ES is not availableif (!esAvailable) {System.out.println("Skipping Elasticsearch tests because ES server is not available: " + ES_HOST + ":" + ES_PORT);return;}try {// Create SSL context that trusts all certificatesSSLContext sslContext = SSLContextBuilder.create().loadTrustMaterial(null, (chains, authType) -> true).build();// Create client with authentication and SSLCredentialsProvider credentialsProvider = new BasicCredentialsProvider();credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials(ES_USERNAME, ES_PASSWORD));RestClient restClient = RestClient.builder(new HttpHost(ES_HOST, ES_PORT, ES_SCHEME)).setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider).setSSLContext(sslContext).setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)).build();client = new ElasticsearchClient(new RestClientTransport(restClient, new JacksonJsonpMapper()));// Delete index if existsboolean indexExists = client.indices().exists(e -> e.index(TEST_INDEX)).value();if (indexExists) {DeleteIndexResponse deleteResponse = client.indices().delete(c -> c.index(TEST_INDEX));assertThat(deleteResponse.acknowledged()).isTrue();}// Create test index with mappingCreateIndexResponse createResponse = client.indices().create(c -> c.index(TEST_INDEX).mappings(m -> m.properties("content", p -> p.text(t -> t.analyzer("standard"))).properties("title", p -> p.keyword(k -> k))));assertThat(createResponse.acknowledged()).isTrue();// Configure and create single node readerElasticsearchConfig config = new ElasticsearchConfig();config.setHost(ES_HOST);config.setPort(ES_PORT);config.setIndex(TEST_INDEX);config.setQueryField("content");config.setUsername(ES_USERNAME);config.setPassword(ES_PASSWORD);config.setScheme(ES_SCHEME);reader = new ElasticsearchDocumentReader(config);// Configure and create cluster readerElasticsearchConfig clusterConfig = new ElasticsearchConfig();clusterConfig.setNodes(Arrays.asList(ES_HOST + ":" + ES_PORT, ES_HOST + ":9201", ES_HOST + ":9202"));clusterConfig.setIndex(TEST_INDEX);clusterConfig.setQueryField("content");clusterConfig.setUsername(ES_USERNAME);clusterConfig.setPassword(ES_PASSWORD);clusterConfig.setScheme(ES_SCHEME);clusterReader = new ElasticsearchDocumentReader(clusterConfig);// Index test documentsindexTestDocuments();}catch (Exception e) {System.out.println("Failed to set up Elasticsearch test environment: " + e.getMessage());esAvailable = false;}}@AfterAllstatic void tearDown() throws IOException {// Skip cleanup if ES is not available or client is nullif (!esAvailable || client == null) {return;}try {DeleteIndexResponse deleteResponse = client.indices().delete(c -> c.index(TEST_INDEX));assertThat(deleteResponse.acknowledged()).isTrue();}catch (Exception e) {System.out.println("Failed to clean up Elasticsearch test environment: " + e.getMessage());}}@Test@EnabledIf("isElasticsearchAvailable")void testGet() {List<Document> documents = reader.get();assertThat(documents).hasSize(3);assertThat(documents.get(0).getText()).contains("Spring Framework");assertThat(documents.get(0).getMetadata()).containsKey("title");}//......private static void indexTestDocuments() throws IOException {// First documentMap<String, Object> doc1 = new HashMap<>();doc1.put("content", "Spring Framework is the most popular application development framework for Java.");doc1.put("title", "Spring Introduction");client.index(i -> i.index(TEST_INDEX).id(TEST_DOC_ID).document(doc1));// Second documentMap<String, Object> doc2 = new HashMap<>();doc2.put("content","Spring Boot makes it easy to create stand-alone, production-grade Spring based Applications.");doc2.put("title", "Spring Boot Guide");client.index(i -> i.index(TEST_INDEX).document(doc2));// Third documentMap<String, Object> doc3 = new HashMap<>();doc3.put("content", "Java is a popular programming language and platform.");doc3.put("title", "Java Programming");client.index(i -> i.index(TEST_INDEX).document(doc3));// Refresh index to make documents searchableclient.indices().refresh();}}	

这里ElasticsearchConfig指定了读取content字段

小结

spring-ai-alibaba-starter-document-reader-elasticsearch提供了ElasticsearchDocumentReader用于读取es文档的指定字段作为document的内容。

doc

  • java2ai

相关文章:

  • opencv图像旋转(单点旋转的原理)
  • linux oracle 19c 静默安装
  • 使用Redis实现实时排行榜
  • Redis(持久化)
  • Gradle与Idea整合
  • python(八)-数据类型转换
  • Vue3 + Three.js 场景编辑器开发实践
  • JAVA学习-多线程
  • 【云馨AI-大模型】2025年4月第三周AI领域全景观察:硬件革命、生态博弈与国产化突围
  • Linux:基础IO---动静态库
  • Python爬虫实战:获取B站查询数据
  • 【一起学Rust】使用Thunk工具链实现Rust应用对Windows XP/7的兼容性适配实战
  • 车载诊断新架构--- SOVD初入门(上)
  • 数据库基础-B+树
  • (二)Trae 配置C++ 编译
  • 少儿编程路线规划
  • 什么是零缺陷质量管理?
  • PHP怎样判断浏览器类型和浏览器语言?
  • CF1016赛后总结
  • 2025年Q1数据安全政策、规范、标准以及报告汇总共92份(附下载)
  • 解除近70家煤电厂有毒物质排放限制,特朗普能重振煤炭吗?
  • 观察|智驾监管升级挤掉宣传水分,行业或加速驶入安全快车道
  • 华夏幸福:累计未能如期偿还债务金额合计为227.91亿元
  • 老总们带着产品直奔对接会,外贸拓内销找到更多“新路子”
  • 同程旅行斥资24.9亿元收购万达酒管:“看好中国酒店管理市场的增长潜力”
  • 业绩激活新消费,押中爆款哪吒IP的泛娱乐龙头卡游再冲港股IPO