当前位置: 首页 > news >正文

Langchain对管道操作符|的重构实现链式流程

目录

  • 一、管道操作符
  • 二、LangChain 的链式流程
  • 三、源码剖析

一、管道操作符

  1. Python 中的管道操作符
    在 Python 3.10 及以上版本中,管道操作符 | 主要用于以下场景:

(1)集合操作
管道操作符可以用于集合的并集操作。

set1 = {1, 2, 3}
set2 = {3, 4, 5}
result = set1 | set2  # 并集: {1, 2, 3, 4, 5}

(2)类型联合
在类型注解中,| 可以表示“或”的关系。

def func(x: int | float) -> int | float:
    return x * 2

(3)模式匹配
在 match 语句中,| 可以用于匹配多个模式。

value = 3
match value:
    case 1 | 2 | 3:
        print("Small number")
    case _:
        print("Other number")

二、LangChain 的链式流程

在 LangChain 中,管道操作符 | 用于将多个步骤串联起来,形成一个链式流程。

chain = (
    {"question": RunnablePassthrough()}
    | prompt_template
    | llm
    | output_parser
)

每一步的输出会作为下一步的输入。

这种设计使得代码更加模块化和可读。

三、源码剖析

在 LangChain 的实际源码中,管道操作符的实现位于 langchain/schema/runnable.py 文件中。以下是一些关键代码片段:

class Runnable:
    def __or__(self, other: Union["Runnable", Callable]) -> "RunnableSequence":
        from langchain.schema.runnable import RunnableSequence
        return RunnableSequence([self, other])

or 方法,它是 Python 中的特殊方法(也称为魔术方法或双下方法)。or 方法用于重载 | 操作符的行为。具体来说,这段代码的作用是允许一个对象与另一个对象(或可调用对象)通过 | 操作符连接,并返回一个新的 RunnableSequence 对象。
or 方法定义了 | 操作符的行为。当你在代码中使用 | 操作符连接两个对象时,Python 会调用左侧对象的 or 方法,并将右侧对象作为参数传递。

step1 = Runnable()
step2 = Runnable()

# 使用 | 操作符连接两个对象
chain = step1 | step2

Union[“Runnable”, Callable] 的含义
Union 是 Python 类型注解中的一个工具,表示“或”的关系。Union[“Runnable”, Callable] 表示 other 可以是以下两种类型之一:

Runnable:一个实现了 Runnable 接口的对象。

Callable:一个可调用对象(如函数、lambda 表达式等)。

链式流程有递归的思想

from typing import Union, Callable

class Runnable:
    def __or__(self, other: Union["Runnable", Callable]) -> "RunnableSequence":
        if callable(other):
            # 如果 other 是可调用对象,将其包装为 Runnable
            other = RunnableWrapper(other)
        return RunnableSequence(self, other)

class RunnableSequence:
    def __init__(self, first: Runnable, second: Runnable):
        self.first = first
        self.second = second

    def run(self, input):
        # 先运行第一个步骤
        intermediate = self.first.run(input)
        # 再运行第二个步骤
        return self.second.run(intermediate)

class RunnableWrapper(Runnable):
    def __init__(self, func: Callable):
        self.func = func

    def run(self, input):
        return self.func(input)

相关文章:

  • nodejs 038: Js Worker线程消息传递 Worker.postMessage() Worker.postMessage()
  • Django快速入门
  • 【论文投稿】Python 网络爬虫:探秘网页数据抓取的奇妙世界
  • 归并排序的应用—计算逆序对的个数
  • 使用 pgvector 实现 PostgreSQL 语义搜索和 RAG:完整指南
  • 长视频生成、尝试性检索、任务推理 | Big Model Weekly 第56期
  • 为AI聊天工具添加一个知识系统 之107 详细设计之48 理解和角色
  • 从零到上线:Node.js 项目的完整部署流程(包含 Docker 和 CICD)
  • Spring Boot 项目启动报错 “找不到或无法加载主类” 解决笔记
  • 【ISO 14229-1:2023 UDS诊断全量测试用例清单系列:第五节】
  • MySQL数据库三:操作数据库(二)
  • 【ISO 14229-1:2023 UDS诊断全量测试用例清单系列:第十节】
  • mac docker镜像加速正确配置方式
  • 【MySQL常见疑难杂症】常见文件及其所存储的信息
  • 尚硅谷爬虫note005
  • 基于Knative的无服务器引擎重构:实现毫秒级冷启动的云原生应用浪潮
  • 数据结构笔记之时间复杂度O(n)中的O是什么的缩写,为什么要用O这个字母?
  • 快速设置 Docker 网络代理配置
  • 手机ROM是什么
  • 网络安全|网络安全学习方法
  • 赛力斯拟赴港上市:去年扭亏为盈净利59亿元,三年内实现百万销量目标
  • 国家发改委:建立实施育儿补贴制度
  • 三杀皇马剑指四冠,硬扛到底的巴萨,赢球又赢人
  • “两高”司法解释:升档为境外非法提供商业秘密罪的量刑标准
  • 美媒称特朗普考虑大幅下调对华关税、降幅或超一半,外交部回应
  • 2025一季度,上海有两把刷子