聊聊Spring AI Alibaba的ElasticsearchDocumentReader
序
本文主要研究一下Spring AI Alibaba的ElasticsearchDocumentReader
ElasticsearchDocumentReader
community/document-readers/spring-ai-alibaba-starter-document-reader-elasticsearch/src/main/java/com/alibaba/cloud/ai/document/reader/es/ElasticsearchDocumentReader.java
public class ElasticsearchDocumentReader implements DocumentReader {private final ElasticsearchConfig config;private final ElasticsearchClient client;/*** Constructor that initializes the Elasticsearch client with the provided* configuration.* @param config The Elasticsearch configuration*/public ElasticsearchDocumentReader(ElasticsearchConfig config) {this.config = config;try {this.client = createClient();}catch (Exception e) {throw new RuntimeException("Failed to create Elasticsearch client", e);}}@Overridepublic List<Document> get() {try {// Get all documentsSearchResponse<Map> response = client.search(s -> s.index(config.getIndex()).query(q -> q.matchAll(m -> m)).size(config.getMaxResults()),Map.class);return getDocuments(response);}catch (IOException e) {throw new RuntimeException("Failed to get documents from Elasticsearch", e);}}@NotNullprivate List<Document> getDocuments(SearchResponse<Map> response) {List<Document> documents = new ArrayList<>();response.hits().hits().forEach(hit -> {Map<String, Object> source = hit.source();if (source != null) {Document document = new Document(source.getOrDefault(config.getQueryField(), "").toString(), source);documents.add(document);}});return documents;}/*** Get a document by its ID.* @param id The document ID* @return The document if found, null otherwise*/public Document getById(String id) {try {var response = client.get(g -> g.index(config.getIndex()).id(id), Map.class);if (!response.found() || response.source() == null) {return null;}return new Document(response.source().getOrDefault(config.getQueryField(), "").toString(),response.source());}catch (IOException e) {throw new RuntimeException("Failed to get document from Elasticsearch with id: " + id, e);}}/*** Read documents matching the specified query.* @param query The search query* @return List of matching documents*/public List<Document> readWithQuery(String query) {try {// Build the search request with querySearchResponse<Map> response = client.search(s -> s.index(config.getIndex()).query(q -> q.match(new MatchQuery.Builder().field(config.getQueryField()).query(query).build())).size(config.getMaxResults()), Map.class);return getDocuments(response);}catch (IOException e) {throw new RuntimeException("Failed to read documents from Elasticsearch with query: " + query, e);}}private ElasticsearchClient createClient()throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {// Create HttpHosts for all nodesHttpHost[] httpHosts;if (!CollectionUtils.isEmpty(config.getNodes())) {httpHosts = config.getNodes().stream().map(node -> {String[] parts = node.split(":");return new HttpHost(parts[0], Integer.parseInt(parts[1]), config.getScheme());}).toArray(HttpHost[]::new);}else {// Fallback to single node configurationhttpHosts = new HttpHost[] { new HttpHost(config.getHost(), config.getPort(), config.getScheme()) };}var restClientBuilder = RestClient.builder(httpHosts);// Add authentication if credentials are providedif (StringUtils.hasText(config.getUsername()) && StringUtils.hasText(config.getPassword())) {CredentialsProvider credentialsProvider = new BasicCredentialsProvider();credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials(config.getUsername(), config.getPassword()));// Create SSL context if using HTTPSif ("https".equalsIgnoreCase(config.getScheme())) {SSLContext sslContext = SSLContextBuilder.create().loadTrustMaterial(null, (chains, authType) -> true).build();restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider).setSSLContext(sslContext).setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE));}else {restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));}}// Create the transport and clientElasticsearchTransport transport = new RestClientTransport(restClientBuilder.build(), new JacksonJsonpMapper());return new ElasticsearchClient(transport);}}
ElasticsearchDocumentReader使用elasticsearch官方的ElasticsearchClient去读取数据,它默认读取指定索引的10条记录,并将指定字段的内容作为document的内容
示例
@EnabledIfEnvironmentVariable(named = "ES_HOST", matches = ".+")
public class ElasticsearchDocumentReaderTest {private static final String TEST_INDEX = "spring-ai-test";private static final String TEST_DOC_ID = "1";// Get ES configuration from environment variables, use defaults if not setprivate static final String ES_HOST = System.getenv("ES_HOST") != null ? System.getenv("ES_HOST") : "localhost";private static final int ES_PORT = System.getenv("ES_PORT") != null ? Integer.parseInt(System.getenv("ES_PORT")): 9200;private static final String ES_USERNAME = System.getenv("ES_USERNAME") != null ? System.getenv("ES_USERNAME"): "elastic";private static final String ES_PASSWORD = System.getenv("ES_PASSWORD") != null ? System.getenv("ES_PASSWORD"): "r-tooRd7RgrX_uZV0klZ";private static final String ES_SCHEME = System.getenv("ES_SCHEME") != null ? System.getenv("ES_SCHEME") : "https";private static ElasticsearchClient client;private static ElasticsearchDocumentReader reader;private static ElasticsearchDocumentReader clusterReader;// Flag to indicate if ES is availableprivate static boolean esAvailable = false;static {if (System.getenv("ES_HOST") == null) {System.out.println("ES_HOST environment variable is not set. Tests will be skipped.");}}/*** Check if Elasticsearch is available* @return true if ES is available, false otherwise*/public static boolean isElasticsearchAvailable() {return esAvailable;}/*** Try to connect to Elasticsearch* @return true if connection successful, false otherwise*/private static boolean canConnectToElasticsearch() {try (Socket socket = new Socket()) {socket.connect(new InetSocketAddress(ES_HOST, ES_PORT), 1000);return true;}catch (Exception e) {System.out.println("Cannot connect to Elasticsearch: " + e.getMessage());return false;}}@BeforeAllstatic void setUp() throws IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {// Check if ES_HOST environment variable is setString esHost = System.getenv("ES_HOST");assumeTrue(esHost != null && !esHost.isEmpty(),"Skipping test because ES_HOST environment variable is not set");// Check if we can connect to ESesAvailable = canConnectToElasticsearch();// Skip setup if ES is not availableif (!esAvailable) {System.out.println("Skipping Elasticsearch tests because ES server is not available: " + ES_HOST + ":" + ES_PORT);return;}try {// Create SSL context that trusts all certificatesSSLContext sslContext = SSLContextBuilder.create().loadTrustMaterial(null, (chains, authType) -> true).build();// Create client with authentication and SSLCredentialsProvider credentialsProvider = new BasicCredentialsProvider();credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials(ES_USERNAME, ES_PASSWORD));RestClient restClient = RestClient.builder(new HttpHost(ES_HOST, ES_PORT, ES_SCHEME)).setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider).setSSLContext(sslContext).setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)).build();client = new ElasticsearchClient(new RestClientTransport(restClient, new JacksonJsonpMapper()));// Delete index if existsboolean indexExists = client.indices().exists(e -> e.index(TEST_INDEX)).value();if (indexExists) {DeleteIndexResponse deleteResponse = client.indices().delete(c -> c.index(TEST_INDEX));assertThat(deleteResponse.acknowledged()).isTrue();}// Create test index with mappingCreateIndexResponse createResponse = client.indices().create(c -> c.index(TEST_INDEX).mappings(m -> m.properties("content", p -> p.text(t -> t.analyzer("standard"))).properties("title", p -> p.keyword(k -> k))));assertThat(createResponse.acknowledged()).isTrue();// Configure and create single node readerElasticsearchConfig config = new ElasticsearchConfig();config.setHost(ES_HOST);config.setPort(ES_PORT);config.setIndex(TEST_INDEX);config.setQueryField("content");config.setUsername(ES_USERNAME);config.setPassword(ES_PASSWORD);config.setScheme(ES_SCHEME);reader = new ElasticsearchDocumentReader(config);// Configure and create cluster readerElasticsearchConfig clusterConfig = new ElasticsearchConfig();clusterConfig.setNodes(Arrays.asList(ES_HOST + ":" + ES_PORT, ES_HOST + ":9201", ES_HOST + ":9202"));clusterConfig.setIndex(TEST_INDEX);clusterConfig.setQueryField("content");clusterConfig.setUsername(ES_USERNAME);clusterConfig.setPassword(ES_PASSWORD);clusterConfig.setScheme(ES_SCHEME);clusterReader = new ElasticsearchDocumentReader(clusterConfig);// Index test documentsindexTestDocuments();}catch (Exception e) {System.out.println("Failed to set up Elasticsearch test environment: " + e.getMessage());esAvailable = false;}}@AfterAllstatic void tearDown() throws IOException {// Skip cleanup if ES is not available or client is nullif (!esAvailable || client == null) {return;}try {DeleteIndexResponse deleteResponse = client.indices().delete(c -> c.index(TEST_INDEX));assertThat(deleteResponse.acknowledged()).isTrue();}catch (Exception e) {System.out.println("Failed to clean up Elasticsearch test environment: " + e.getMessage());}}@Test@EnabledIf("isElasticsearchAvailable")void testGet() {List<Document> documents = reader.get();assertThat(documents).hasSize(3);assertThat(documents.get(0).getText()).contains("Spring Framework");assertThat(documents.get(0).getMetadata()).containsKey("title");}//......private static void indexTestDocuments() throws IOException {// First documentMap<String, Object> doc1 = new HashMap<>();doc1.put("content", "Spring Framework is the most popular application development framework for Java.");doc1.put("title", "Spring Introduction");client.index(i -> i.index(TEST_INDEX).id(TEST_DOC_ID).document(doc1));// Second documentMap<String, Object> doc2 = new HashMap<>();doc2.put("content","Spring Boot makes it easy to create stand-alone, production-grade Spring based Applications.");doc2.put("title", "Spring Boot Guide");client.index(i -> i.index(TEST_INDEX).document(doc2));// Third documentMap<String, Object> doc3 = new HashMap<>();doc3.put("content", "Java is a popular programming language and platform.");doc3.put("title", "Java Programming");client.index(i -> i.index(TEST_INDEX).document(doc3));// Refresh index to make documents searchableclient.indices().refresh();}}
这里ElasticsearchConfig指定了读取content字段
小结
spring-ai-alibaba-starter-document-reader-elasticsearch提供了ElasticsearchDocumentReader用于读取es文档的指定字段作为document的内容。
doc
- java2ai