Langchain+chain+数据库操作
chain或agents实现。Agents可以多次循环查询。
思路:用户问题—转换—SQL查询——执行SQL——回答问题。
初始化
1
import os from operator import itemgetterimport bs4 from langchain.chains.combine_documents import create_stuff_documents_chain from langchain.chains.history_aware_retriever import create_history_aware_retriever from langchain.chains.retrieval import create_retrieval_chain from langchain.chains.sql_database.query import create_sql_query_chain from langchain_chroma import Chroma from langchain_community.document_loaders import WebBaseLoader from langchain_community.tools import QuerySQLDataBaseTool from langchain_community.utilities import SQLDatabase from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder, PromptTemplate from langchain_core.runnables import RunnableWithMessageHistory, RunnablePassthrough from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_community.chat_message_histories import ChatMessageHistory from langchain_openai import ChatOpenAI, OpenAIEmbeddingsos.environ['http_proxy'] = '127.0.0.1:7890' os.environ['https_proxy'] = '127.0.0.1:7890'os.environ["LANGCHAIN_TRACING_V2"] = "true" os.environ["LANGCHAIN_PROJECT"] = "LangchainDemo" os.environ["LANGCHAIN_API_KEY"] = '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' # os.environ["TAVILY_API_KEY"] = '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'# 聊天机器人案例 # 创建模型 model = ChatOpenAI(model='gpt-3.5-turbo')
SQLAlchemy 连接数据库
语法:
第一个方法:默认使用dbapi,不推荐。剩下俩个需要pip install mysqlclient/pymysql
# sqlalchemy 初始化MySQL数据库的连接 HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'test_db8' USERNAME = 'root' PASSWORD = '123123' # mysqlclient驱动URL MYSQL_URI = 'mysql+mysqldb://{}:{}@{}:{}/{}?charset=utf8mb4'.format(USERNAME, PASSWORD, HOSTNAME, PORT, DATABASE)db = SQLDatabase.from_uri(MYSQL_URI)# 测试连接是否成功 # print(db.get_usable_table_names()) # print(db.run('select * from t_emp limit 10;'))
SQL_chain
- SQLDatabaseChain:基本的自然语言转SQL查询并执行。
from langchain_community.utilities.sql_database import SQLDatabase from langchain.chains import SQLDatabaseChaindb = SQLDatabase.from_uri("sqlite:///database.db") db_chain = SQLDatabaseChain.from_llm(llm=ChatOpenAI(), db=db) db_chain.run("员工数量最多的部门是哪个?")
- SQLDatabaseSequentialChain:处理需要多步骤或上下文相关的查询。
SQLDatabaseSequentialChain 作用:处理依赖上下文的多步骤查询,例如需要先查询表结构再生成最终 SQL。 场景:用户问题隐含多个子任务(如先查有哪些表,再查具体数据)。 组成: 一个 Chain 负责分析逻辑,确定步骤。 另一个 Chain 生成最终 SQL。
- create_sql_query_chain:专门生成SQL查询语句的Chain。
- create_sql_validation_chain:验证SQL语句的正确性。
- create_sql_custom_answer_chain:处理查询结果并生成用户友好的回答。
- SQLQueryCheckerChain或类似的:检查潜在问题,如数据隐私或查询效率。
- Agents+SQL
from langchain_community.agent_toolkits import SQLDatabaseToolkit from langchain.agents import create_sql_agentagent = create_sql_agent(llm=ChatOpenAI(),toolkit=SQLDatabaseToolkit(db=db, llm=llm),verbose=True ) agent.run("计算北京地区员工的平均工资,保留两位小数。")
SQLDatabaseChain
从自然语言生成并执行 SQL 单轮简单查询 SQLDatabaseSequentialChain
处理多步骤依赖的复杂查询 上下文依赖的多轮查询 create_sql_query_chain
仅生成 SQL 不执行(需人工验证) 开发调试 自定义验证链 检查 SQL 语法及安全性 生产环境风险控制 结果后处理链 将数据库结果转换为自然语言答案 用户友好输出 RAG+SQL 混合链 结构化与非结构化数据联合查询 复杂业务场景 create_sql_query_chain:创建数据库SQL查询链。invoke返回的是SQL语句,并不会执行
eg:print( resp = test_chain.invoke({'question': '请问:员工表中有多少条数据?'}) )
# 直接使用大模型和数据库整合, 只能根据你的问题生成SQL # 初始化生成SQL的chain test_chain = create_sql_query_chain(model, db) # resp = test_chain.invoke({'question': '请问:员工表中有多少条数据?'}) # print(resp)answer_prompt = PromptTemplate.from_template("""给定以下用户问题、SQL语句和SQL执行后的结果,回答用户问题。Question: {question}SQL Query: {query}SQL Result: {result}回答: """ )
执行SQL:
# from langchain_community.tools import QuerySQLDataBaseTool
assign
的核心作用:
添加或修改字段
在链式处理过程中,将某个步骤的输出结果作为新字段添加到当前数据对象中,供后续步骤使用。保留原始数据
RunnablePassthrough
会保留所有输入字段(如question
),而assign
仅追加新字段(如query
、result
),避免覆盖原有数据。支持多步骤串联
通过连续调用.assign()
,可以将多个处理步骤串联成一个完整的链,每个步骤专注于一个子任务。chain思路:
生成SQL:通过RunnablePassthrough接受生成的SQL语句,itemgetter('query')从数据流中提取
query
的值。执行SQL: | execute_sql_tool
链接其他组件:| Prompt |Model |parser
# 创建一个执行sql语句的工具 execute_sql_tool = QuerySQLDataBaseTool(db=db)# 1、生成SQL,2、执行SQL # 2、模板 chain = (RunnablePassthrough.assign(query=test_chain).assign(result=itemgetter('query') | execute_sql_tool)| answer_prompt| model| StrOutputParser() #这里有()是创建对象,也可以外部创建好,在传入对象名)rep = chain.invoke(input={'question': '请问:员工表中有多少条数据?'}) print(rep)
PS:不同模型对于,SQL查询输出不一样,如gpt-4比3.5多个SQLQuery前缀。因此不能用上述的模版,进行直接调用。