当前位置: 首页 > news >正文

RAG工程-基于LangChain 实现 Naive RAG

摘要

        本篇文章以实现简单的第一范式 RAG-Naive RAG为目标,并最终创建并实现一个基于RAG的论文分析器的项目。

LangChain 文档加载

        文档加载是 RAG 流程的起点,它负责从各种数据源读取原始文档,将其转化为程序可处理的格式。LangChain 支持多种文档类型的加载,具体支持的加载文档类型,可以参考我写的这篇文章:

langchain框架-文档加载器详解-CSDN博客

  • 接下来,我将以ArxivLoader 论文加载器为示例,基于langchain实现一个论文文档加载器:
  • 首先阅读项目源码说明得知,使用ArxivLoader 需要安装两个模块 arxiv 和 pyMuPDF:

pip install -U arxiv pymupdf
  • 注意:pymupdy 依赖需要Visual Studio 工具包的支持,如果没有安装的话会报python 依赖加载异常。

  • 继续阅读源码说明确定其参数:

  • 示例代码
def load_arxiv_paper(arxiv_id):try:# 初始化ArxivLoaderloader = ArxivLoader(query=arxiv_id,load_max_docs=1,  # 限制加载文档数量load_all_available_meta=True  # 加载所有可用的元数据)# 加载论文文档documents = loader.load()logger.info(f"成功加载论文,标题: {documents[0].metadata.get('Title', 'Unknown')}")return documentsexcept Exception as e:logger.error(f"加载论文时发生错误: {str(e)}")return None
  •  并将最终结果打印并保存到文件中缓存,整个文档加载的流程就结束了。
    # 论文的arxiv IDarxiv_id = "2402.19473"# 加载论文documents = load_arxiv_paper(arxiv_id)if not documents:return# 存入 getParper.txt 文件中。with open("getPaper.txt", "w", encoding="utf-8") as f:for doc in documents:f.write(doc.page_content + "\n")exit()
  • 最终获取展示:注意ArxivLoader最终获取的论文只是文本部分,不能包含图片部分的内容。

LangChain 文本分割

            在完成文档加载之后,对加载的文档进行合理的文本分割显得尤为重要。因为在检索增强生成(RAG)流程中,优质的文本分块方式对最终结果有着显著影响。

        考虑到本项目所涉及的论文为英文文档,并且为了确保论文内容的连贯性,我们决定采用语义分割的策略。具体而言,我们将借助 LangChain 框架中的 NLTKTextSplitter 工具。该工具依托自然语言工具包(NLTK),能够精准地按照句子对文本进行分割,从而为后续的 RAG 任务奠定坚实基础。

        Langchain 框架中包含了很多文本分割方法,如果读者想系统学习langchain 的文本分割,可以参考我写的以下文章:

langchain框架-文档分割器详解(官方库)-CSDN博客

langchain框架-文档分割器详解(非官方库)-CSDN博客

langchain框架-文档分割器总结即补充-CSDN博客

NLTK 语料包下载

        使用NLTKTextSplitter 需要先下载NLTK 语料工具包。但由于网络等原因,我们可能无法正常下载该工具包。我们可以采用网络代理的方式,如果不能用代理的小伙伴,可以下载我的上传资源进行练习。具体的代码如下:

def getNLTKData():import nltkos.environ['http_proxy'] = 'http://127.0.0.1:33210'os.environ['https_proxy'] = 'http://127.0.0.1:33210'# 指定自定义的数据目录custom_data_dir = 'E:\\nltk_data'nltk.data.path.append(custom_data_dir)# 下载资源if not os.path.exists(os.path.join(custom_data_dir, 'tokenizers', 'punkt')):nltk.download('punkt', download_dir=custom_data_dir)if not os.path.exists(os.path.join(custom_data_dir, 'tokenizers', 'punkt_tab')):nltk.download('punkt_tab', download_dir=custom_data_dir)

CSDN语料包下载地址

https://download.csdn.net/download/weixin_41645817/90627575

分割文本方法

def process_documents(documents):if not documents:return Nonetry:# 初始化文本分割器nltk_splitter = NLTKTextSplitter(chunk_size=1000,chunk_overlap=200,)# 分割文档texts = nltk_splitter.split_documents(documents)logger.info(f"文档已分割为 {len(texts)} 个片段")return textsexcept Exception as e:print(e)logger.error(f"处理文档时发生错误: {str(e)}")return None

执行并保存

    # 处理文档split_documents = process_documents(documents)if not split_documents:return# 将分割后的文本存入 getPaperChunk2.json 文件中chunks_data = {"total_chunks": len(split_documents),"chunks": []}for index, text in enumerate(split_documents, 1):chunk_info = {"chunk_id": f"chunk_{index}","page_content": text.page_content}chunks_data["chunks"].append(chunk_info)# 将JSON数据写入文件with open("getPaperChunk2.json", "w", encoding="utf-8") as f:json.dump(chunks_data, f, ensure_ascii=False, indent=4)print(f"成功保存 {len(split_documents)} 个文本块到 getPaperChunk2.json")

LangChain 嵌入模型

        实现文本切分后,我们需要将切分的文档进行向量化处理,在这里我将使用ZhipuAIEmbeddings 进行文档的向量化处理,主要是因为我买了智谱AI embedding-3 的流量包了哈。小伙伴们可以自行选择向量化模型,LangChain 框架也集成了众多embeddings类的实现,具体可以参考我的这篇文章:

LangChain框架-嵌入模型详解-CSDN博客

LangChain 向量存储

        文本向量化后,我们需要将向量化后的数据存入到向量数据库中。LangChain 框架集成了众多向量数据库的实现,我们可以很方便的通过LangChain 将数据存入到向量数据库。在这里,我将使用一款基于内存的轻量级数据库Faiss 作为示例进行向量存储。如果您想系统性的学习LangChain 向量存储部分,可以参考我的这篇文章:

LangChain框架-向量存储详解-CSDN博客

向量化与向量存储代码示例

  • 安装zhipuai,faiss-cpu 依赖

pip install zhipuai,faiss-cpu

  • 向量化并存入Faiss库

def create_vector_store(documents):if not documents:logger.warning("没有文档需要处理")return Nonetry:# 使用绝对路径存储缓存cache_dir = os.path.join(os.path.dirname(__file__), "cache")os.makedirs(cache_dir, exist_ok=True)store = LocalFileStore(cache_dir)# 创建向量存储underlying_embeddings = ZhipuAIEmbeddings(model="embedding-3",api_key=os.getenv("ZHIPUAI_API_KEY"),)cached_embedder = CacheBackedEmbeddings.from_bytes_store(underlying_embeddings, store, namespace=underlying_embeddings.model)# 批量处理文档batch_size = 30vector_stores = []for i in range(0, len(documents), batch_size):batch_texts = documents[i:i + batch_size]try:batch_vector_store = FAISS.from_documents(documents=batch_texts,embedding=cached_embedder)vector_stores.append(batch_vector_store)logger.info(f"成功处理第 {i//batch_size + 1} 批文档")except Exception as batch_error:logger.error(f"处理第 {i//batch_size + 1} 批文档时出错: {str(batch_error)}")continueif not vector_stores:logger.error("没有成功创建任何向量存储")return None# 合并所有批次的向量存储merged_store = vector_stores[0]for store in vector_stores[1:]:merged_store.merge_from(store)logger.info(f"向量存储创建成功,共处理 {len(documents)} 个文档")return merged_storeexcept Exception as e:print(e)logger.error(f"创建向量存储时发生错误: {str(e)}")return None

        这段代码实现了一个向量存储创建功能,主要包含以下几个关键部分:

  • 使用LocalFileStore实现本地缓存存储,避免重复计算向量;

  • 使用智谱AI的embedding模型将文本转换为向量,并通过CacheBackedEmbeddings进行缓存管理;

  • 采用批处理方式,每批处理30个文档,(智谱AI 的最大批处理数量为64)

  • 使用FAISS创建向量存储

  • 将所有批次的向量存储合并成一个完整的向量库。

        这种实现方式既保证了处理大量文档的效率,又通过缓存机制优化了性能。

        cache中最终缓存的向量数据展示:

流程调用

    # 创建向量存储vectorstore = create_vector_store(split_documents)if not vectorstore:returnprint("向量存储创建成功")

LangChain 检索器

        通过langchain 框架,我们可以很容易的将向量存储转换为检索器对象。如果您想系统性的学习langchain 的检索器,可以参考我的这篇文章:

LangChain框架-检索器详解-CSDN博客

        示例项目将向量存储转化为检索器,最后使用了invoke()方法,接收检索的对象文档。代码以及最终的检索结果如下:

处理代码

retriever = vectorstore.as_retriever()
doc =retriever.invoke("这篇论文讲述了什么内容?")
print(doc)

LLM内容整合

        最后,我们可以通过调用DeepSeek,再对检索器检索的内容进行总结。在这里我使用了ChatPromptTemplate 对话提示词模板和 create_stuff_documents_chain 处理文档问答的工具链形成一个最终答案,代码如下:

    system_prompt = ChatPromptTemplate.from_messages([("system", """根据提供的上下文: {context} \n\n 使用回答问题: {input}""")])# 初始化大模型llm = ChatOpenAI(api_key=os.getenv("DEEPSEEK_API_KEY"),base_url="https://api.deepseek.com/v1",model="deepseek-chat")# 构建链  这个链将文档作为输入,并使用之前定义的提示模板和初始化的大模型来生成答案chain = create_stuff_documents_chain(llm,system_prompt)res = chain.invoke({"input": "这篇论文讲述了什么内容?", "context": all_context})print("最终答案:" +res)

最终结果展示

最终答案:这篇论文是一篇关于**检索增强生成(Retrieval-Augmented Generation, RAG)**的系统性综述,旨在全面梳理RAG的基础理论、优化方法、多模态应用、基准测试、当前局限及未来方向。以下是核心内容总结:

---

### **1. 研究背景与动机**
- **AIGC的挑战**:尽管生成式AI(如大模型)取得显著进展,但仍面临知识更新滞后、长尾数据处理、数据泄露风险、高训练/推理成本等问题。
- **RAG的作用**:通过结合信息检索(IR)与生成模型,RAG能够动态检索外部知识库中的相关内容,提升生成结果的准确性和鲁棒性,同时降低模型幻觉(hallucination)。

---

### **2. 主要贡献**
- **全面性综述**:覆盖RAG的**基础框架**(检索器Retriever与生成器Generator的协同)、**优化方法**(输入增强、检索器改进、生成器调优等)、**跨模态应用**(文本、图像、视频等)、**评测基准**及未来挑战。
- **填补空白**:相比已有研究(如Zhao et al.仅关注多模态应用或忽略基础理论),本文首次系统整合RAG的全链条技术,强调其在多领域的适用性。

---

### **3. 核心内容**
- **RAG基础**(Section II-III)  
  介绍检索器(如稠密检索Dense Retrieval)与生成器(如LLMs)的协同机制,以及如何通过检索动态增强生成过程。
  
- **优化方法**(Section III-B)  
  提出5类增强策略:  
  - **输入增强**:如查询转换(Query2Doc、HyDE生成伪文档扩展查询语义)。  
  - **检索器改进**:优化索引结构或相似度计算。  
  - **生成器调优**:利用检索结果指导生成。  
  - **结果后处理**:对输出进行过滤或重排序。  
  - **端到端优化**:联合训练检索与生成模块。

- **多模态应用**(Section IV)  
  超越文本生成,探讨RAG在图像、视频、音频等模态中的独特应用(如跨模态检索增强生成)。

- **评测与挑战**(Section V-VI)  
  分析现有基准(如检索质量、生成效果评估),指出RAG的局限性(如检索效率、多模态对齐)及未来方向(如轻量化部署、动态知识更新)。

---

### **4. 论文结构**
1. **引言**:RAG的动机与综述范围。  
2. **基础理论**:检索器与生成器的协同机制。  
3. **优化方法**:输入、检索、生成等环节的增强策略。  
4. **多模态应用**:跨领域案例(如医疗、金融、创作)。  
5. **评测基准**:现有评估框架与指标。  
6. **局限与展望**:技术瓶颈与潜在解决方案。  
7. **结论**:总结RAG的价值与发展路径。

---

### **5. 创新点**
- **多模态视角**:强调RAG在非文本领域的潜力(如视觉问答、视频摘要)。  
- **技术整合**:首次系统化分类RAG优化方法(如输入端的查询转换策略)。  
- **实践指导**:为研究者提供跨领域应用的设计思路(如如何选择检索源或生成模型)。

---

### **总结**
该论文是RAG领域的里程碑式综述,不仅梳理了技术脉络,还指出了未来研究方向(如高效检索算法、多模态泛化能力),对学术界和工业界均有重要参考价值。

完整的代码脚本

import json
import logging
import osfrom dotenv import load_dotenv
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.embeddings import CacheBackedEmbeddings
from langchain.storage import LocalFileStore
from langchain_community.chat_models import ChatOpenAI
from langchain_community.document_loaders import ArxivLoader
from langchain_community.embeddings import ZhipuAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.prompts import ChatPromptTemplate
from langchain_text_splitters import NLTKTextSplitterlogger = logging.getLogger('my_logger')
# 加载 .env 文件中的环境变量
load_dotenv()def getNLTKData():import nltk# 指定自定义的数据目录custom_data_dir = 'D:\\nltk_data'nltk.data.path.append(custom_data_dir)# 下载资源if not os.path.exists(os.path.join(custom_data_dir, 'tokenizers', 'punkt')):os.environ['http_proxy'] = 'http://127.0.0.1:33210'os.environ['https_proxy'] = 'http://127.0.0.1:33210'nltk.download('punkt', download_dir=custom_data_dir)nltk.download('punkt_tab', download_dir=custom_data_dir)def load_arxiv_paper(arxiv_id):try:# 初始化ArxivLoaderloader = ArxivLoader(query=arxiv_id,load_max_docs=1,  # 限制加载文档数量load_all_available_meta=True  # 加载所有可用的元数据)# 加载论文文档documents = loader.load()logger.info(f"成功加载论文,标题: {documents[0].metadata.get('Title', 'Unknown')}")return documentsexcept Exception as e:logger.error(f"加载论文时发生错误: {str(e)}")return Nonedef process_documents(documents):if not documents:return Nonetry:# 初始化文本分割器nltk_splitter = NLTKTextSplitter(chunk_size=1000,chunk_overlap=200,)# 分割文档texts = nltk_splitter.split_documents(documents)logger.info(f"文档已分割为 {len(texts)} 个片段")return textsexcept Exception as e:print(e)logger.error(f"处理文档时发生错误: {str(e)}")return Nonedef create_vector_store(documents):if not documents:logger.warning("没有文档需要处理")return Nonetry:# 使用绝对路径存储缓存cache_dir = os.path.join(os.path.dirname(__file__), "cache")os.makedirs(cache_dir, exist_ok=True)store = LocalFileStore(cache_dir)# 创建向量存储underlying_embeddings = ZhipuAIEmbeddings(model="embedding-3",api_key=os.getenv("ZHIPUAI_API_KEY"),)cached_embedder = CacheBackedEmbeddings.from_bytes_store(underlying_embeddings,store,namespace=underlying_embeddings.model)# 批量处理文档batch_size = 30vector_stores = []for i in range(0, len(documents), batch_size):batch_texts = documents[i:i + batch_size]try:batch_vector_store = FAISS.from_documents(documents=batch_texts,embedding=cached_embedder)vector_stores.append(batch_vector_store)logger.info(f"成功处理第 {i // batch_size + 1} 批文档")except Exception as batch_error:logger.error(f"处理第 {i // batch_size + 1} 批文档时出错: {str(batch_error)}")continueif not vector_stores:logger.error("没有成功创建任何向量存储")return None# 合并所有批次的向量存储merged_store = vector_stores[0]for store in vector_stores[1:]:merged_store.merge_from(store)logger.info(f"向量存储创建成功,共处理 {len(documents)} 个文档")return merged_storeexcept Exception as e:print(e)logger.error(f"创建向量存储时发生错误: {str(e)}")return Nonedef NaiveRAG_main(arxiv_id, input):getNLTKData()# 加载论文documents = load_arxiv_paper(arxiv_id)if not documents:return# 存入 getParper.txt 文件中。with open("getPaper.txt", "w", encoding="utf-8") as f:for doc in documents:f.write(doc.page_content + "\n")# 处理文档split_documents = process_documents(documents)if not split_documents:return# 将分割后的文本存入 getPaperChunk2.json 文件中chunks_data = {"total_chunks": len(split_documents),"chunks": []}for index, text in enumerate(split_documents, 1):chunk_info = {"chunk_id": f"chunk_{index}","page_content": text.page_content}chunks_data["chunks"].append(chunk_info)# 将JSON数据写入文件with open("getPaperChunk2.json", "w", encoding="utf-8") as f:json.dump(chunks_data, f, ensure_ascii=False, indent=4)print(f"成功保存 {len(split_documents)} 个文本块到 getPaperChunk2.json")# 创建向量存储vectorstore = create_vector_store(split_documents)if not vectorstore:returnprint("向量存储创建成功")retriever = vectorstore.as_retriever()doc = retriever.invoke(input)print(doc)system_prompt = ChatPromptTemplate.from_messages([("system", """根据提供的上下文: {context} \n\n 使用回答问题: {input}""")])# 初始化大模型llm = ChatOpenAI(api_key=os.getenv("DEEPSEEK_API_KEY"),base_url="https://api.deepseek.com/v1",model="deepseek-chat")# 构建链  这个链将文档作为输入,并使用之前定义的提示模板和初始化的大模型来生成答案chain = create_stuff_documents_chain(llm, system_prompt)res = chain.invoke({"input": input, "context": doc})print("最终答案:" + res)if __name__ == '__main__':NaiveRAG_main(arxiv_id="2402.19473", input="这篇论文讲述了什么内容?")

requirements.txt依赖

logger~=1.4
langchain~=0.3.23
langchain-community~=0.3.21
langchain-text-splitters~=0.3.8
pandas~=2.2.3
arxiv~=2.2.0
pymupdf~=1.25.5
python-dotenv~=1.1.0
zhipuai~=2.1.5.20250415

总结

        本篇文章以实现第一范式 RAG-Naive RAG为目标,使用langchain框架构造了一个基于Naive RAG的论文分析器的项目。然而随着研究的深入,人们发现 Naive RAG 在检索质量、响应生成质量以及增强过程中存在挑战。于是 Advanced RAG 范式被提出,它在数据索引、检索前和检索后都进行了额外处理,如通过更精细的数据清洗等方法提升文本的一致性、准确性和检索效率,在检索前对问题进行重写、路由和扩充等,检索后对文档库进行重排序、上下文筛选与压缩等。接下来小编会以LangChain 框架实现RAG 的第二范式Advanced RAG为目标,对Naive RAG项目进行重构。感兴趣的小伙伴请关注小编的RAG 专栏,后续小编还会对Modular RAG 和 Agentic RAG 进行学习和总结,敬请期待!

相关文章:

  • 从GET到POST:HTTP请求的攻防实战与CTF挑战解析
  • 嵌入式linux系统中内存管理的方法与实现
  • 筑基挑战 | 第14期
  • UI文件上传
  • AI与IT的共生
  • 小测验——已经能利用数据集里面的相机外参调整后看到渲染图像
  • 网页聊天系统项目
  • 谷歌新域名结构:Hreflang的未来展望
  • C++ 基于多设计模式下的同步异步⽇志系统-1准备工作
  • 闩锁效应(latch up)
  • bat脚本转换为EXE应用程序文件
  • systemctl管理指令
  • opencv 给图片和视频添加水印
  • MySQL运维三部曲初级篇:从零开始打造稳定高效的数据库环境
  • Dify快速入门之chatflow
  • Linux网络编程——基于ET模式下的Reactor
  • 【正则表达式】正则表达式使用总结
  • 如何在3090显卡上使用老版本torch
  • python 库 下载 ,整合在一个小程序 UIUIUI
  • LeetCode 239 滑动窗口最大值
  • 谁在贩卖个人信息?教培机构信息失守,电商平台“订单解密”
  • 报告:去年物业服务百强企业营业收入均值同比增长3.52%
  • 瑞士工业巨头ABB拟分拆机器人业务独立上市,市场份额全球第二
  • 10亿美元拿下加纳金矿!“矿茅”紫金矿业黄金板块突围战再下一城
  • 中国央行:继续完善中马、中柬跨境人民币使用政策
  • 疑被激活操作系统后门,美国特工对亚冬会网攻窃密内情披露