当前位置: 首页 > news >正文

基于论文的大模型应用:基于SmartETL的arXiv论文数据接入与预处理(三)

上一篇 介绍了数据接入处理的整体方案设计。本篇介绍基于SmartETL框架的流程实现。

5. 流程开发

5.1.简单采集流程

从指定时间(yy年 mm月)开始,持续采集arXiv论文。基于月份和顺序号,构造论文ID,进而下载论文PDF文件,保存到本地。

5.1.1.Loader设计

定制开发一个新的Loader“web.ArXivTaskEmit”,基于月份和顺序号构造并输出论文ID。每个月的顺序号从00001到29999。输出格式样例:2503.00001(字符串)。代码如下:

import time
import os
import jsonfrom wikidata_filter.util.dates import current_date
from wikidata_filter.loader.base import DataProviderclass ArXivTaskEmit(DataProvider):ts_file = ".arxiv.ts"def __init__(self, start_month: int = None, end_month: int = None):self.month = start_month or 2501self.seq = 1if os.path.exists(self.ts_file):with open(self.ts_file, encoding="utf8") as fin:nums = json.load(fin)self.month = nums[0]self.seq = nums[1]self.end_month = end_month or int(current_date('%y%m'))print(f"from {self.month}.{self.seq} to {self.end_month}")def write_ts(self):row = [self.month, self.seq]with open(self.ts_file, "w", encoding="utf8") as out:json.dump(row, out)def iter(self):while self.month <= self.end_month:while self.seq < 30000:new_id = f'{self.month}.{self.seq:05d}'print("processing:", new_id)yield new_idself.seq += 1self.write_ts()time.sleep(3)self.month += 1if self.month % 100 > 12:self.month += int(self.month/100)*100 + 101self.seq = 1self.write_ts()

5.1.2.Processor设计

处理过程包括:

  1. 将输入数据包装为dict结构。基于内置ToDict转换算子。转换后,数据变成:{‘d’: ‘2503.00001’}
  2. 构造PDF文件URL。基于内置ConcatFields转换算子。转换后,数据变成:{‘d’: 2503.00001’, ‘url_pdf’: ‘https://arxiv.org/pdf/2503.00001’}
  3. 构造保存本地的文件名。基于内置ConcatFields转换算子。转换后,数据变成:{‘d’: 2503.00001’, ‘url_pdf’: ‘https://arxiv.org/pdf/2503.00001’, ‘filename’: ‘data/arxiv/2503.00001.pdf’}
  4. 判断文件不存在。基于内置Not算子,组合util.files.exists函数。
  5. 文件下载。基于内置Map算子,组合util.http.content函数,将获取的文件内容(bytes类型)作为字段content的值。
  6. 保存文件。基于内置SaveFiles算子。
    上述流程SmartETL的YAML流程定义语法,基于已有的组件进行实例化,并通过Chain进行组合,形成顺序处理流程,完成文件持续下载。

5.1.3.流程定义

name: arXiv简单采集流程
description: 对指定时间范围arXiv论文进行采集loader: web.ArXivTaskEmitnodes:as_dict: ToDictmake_url: ConcatFields('url_pdf', 'd', prefix=’https://arxiv.org/pdf/’)make_filename: ConcatFields('filename', 'd', prefix='data/arxiv/', suffix='.pdf')file_not_exists: Not('util.files.exists', key='filename')download: Map('util.http.content', key='url_pdf', target_key='content', most_times=3, ignore_error=True)save_file: WriteFiles('data/arxiv', name_key=’filename’)processor: Chain(as_dict, make_url, make_filename, file_not_exists, download, save_file)

5.2.论文搜索&HTML采集流程

针对指定关键词,利用arXiv API搜索论文,下载论文HTML页面。

5.2.1.Loader设计

通过命令行的交互输入(input函数)获取关键词作为任务输入。

5.2.2.Processor设计

处理过程包括:

  1. 通过命令行获取用户输入关键词。输出数据为字符串,例如“RAG”。
  2. 基于输入关键词进行检索,基于arXiv官网搜索API进行检索。由于检索结果为XML格式,基于xmltodict库转换为JSON格式,对论文结构附加url_pdfurl_html字段,表示PDF和HTML的下载链接。这一步采用定制开发函数gestata.arxiv.search,配合Map算子进行处理。
  3. 【可选的】数组打散。如果前一步search函数返回为数组结构,则需要通过Flat组件进行打散;如果为生成器yield方式返回,则框架自行处理。
  4. 下载HTML。内置util.http.content函数提供http请求(基于requests库),结合Map算子,实现arxiv HTML文件下载,将获取的文件内容(bytes)作为字段content的值。
  5. 过滤。判断content是否为空,如果网络连接失败或者HTML文件本身不存在,就可以终止。
  6. 保存HTML。将前一步获取的HTML内容(即content字段值)保存为文件。
    搜索核心代码如下:
ARXIV_API_BASE = "http://export.arxiv.org/api/query"
ARXIV_BASE = "http://arxiv.org"def search(topic: str, max_results: int = 50):"""基于arXiv API的论文搜索"""if ':' not in topic:topic = 'all:' + topicparams = {"search_query": topic,"max_results": max_results,"sortBy": "lastUpdatedDate","sortOrder": "descending"}res = requests.get(ARXIV_API_BASE, params=params)doc = xmltodict.parse(res.text)feed = doc.get("feed")if "entry" not in feed:return []papers = feed.get("entry")for paper in papers:_id = paper["id"]_id = _id[_id.rfind('/')+1:]paper["_id"] = _idpaper["url_pdf"] = f"{ARXIV_BASE}/pdf/{_id}"paper["url_html"] = f"{ARXIV_BASE}/html/{_id}"return papers

在SmartETL框架最新设计中,通过将函数动态绑定为处理节点,只需要将业务处理逻辑实现为一个函数,就能够在流程中进行调用,极大方便了数据处理算子开发流程。

5.2.3.流程定义

name: arXiv搜索采集流程
description: 基于用户提供的关键词进行arXiv论文并下载loader: Input('请输入arXiv论文搜索关键词:')nodes:search: Map('gestata.arxiv.search', max_results=10)download: Map('util.http.content', key='url_html', target_key='content', most_times=3, ignore_error=True)filter: FieldsNonEmpty('content')save: WriteFiles('data/arxiv', name_key='_id', suffix='.html')processor: Chain(search, Flat(), download, filter, save)

5.3.HTML解析建索引流程

对下载的HTML页面进行解析,并对论文摘要和正文建立向量化索引。

5.3.1.HTML解析

由于arXiv论文的HTML页面具有特定的网页结构,通过lxml或beaultifulsoup库,可实现对论文标题、作者、章节(包括其中的图片、表格、公示等)列表、附录、参考文献等信息的精准识别。

5.3.2.Loader设计

可直接复用前文所述的自动生成论文ID的“web.ArXivTaskEmit”,也可以采用基于关键词搜索的Input+gestata.arxiv.search

5.3.3.Processor设计

处理过程包括:

  1. HTML解析。设计函数组件gestata.arxiv.extract,从输入的HTML(str类型)中进行论文结构抽取。注意,为了对HTML中的相对链接进行绝对定位,解析HTML时需要提供该HTML的基路径(base path)。此外,设计的函数考虑到可能在其他地方调用,建议提供相关参数配置。
  2. 论文摘要索引。论文摘要是一个单独的字段,并且通常长度较短,符合向量化索引chunk的长度要求,可以通过以下两步建立索引:
    2.1 向量化。通过内置的model.embed.Local组件,调用embedding模型,生成摘要的向量
    2.2 写向量库。利用提前初始化的Qdrant组件,基于内置的DatabaseWriter组件进行写入。
  3. 论文正文索引。论文包括多个章节(section),并且篇幅较长,通常需要进行chunk化拆分。一种简单的方式先按section拆分(避免不同章节的内容成为一个chunk),然后根据段落和句子边界进行长度拆分。过程如下:
    3.1 对sections进行打散,即转成每个section作为一条记录输出。
    3.2 对每个section的content字段进行chunk化拆分并打散
    3.3 向量化。同上。
    3.4 写向量库。同上。

5.3.4.流程定义

name: arXiv论文HTML解析与索引流程
description: 对论文HTML进行解析并建立摘要和正文的向量索引loader: Directory('data/arxiv', '.html')nodes:qd: util.database.qdrant.Qdrant(**qdrant)select: SelectVal('data')extract: Map('gestata.arxiv.extract_from_html')vector1: model.embed.Local(api_base=bge_large_en, key='abstract', target_key='vector')write1: DatabaseWriter(qd, buffer_size=1, collection='chunk_abstract')chain1: Chain(Select('abstract'), vector1, write1)flat: FlatProperty('sections', inherit_props=True)select2: Select('content')chunk: Map('util.split.simple', key='content',target_key='chunks')flat_chunk: Flat(key='chunks')vector2: model.embed.Local(api_base=bge_large_en, key='chunks', target_key='vector')write2: DatabaseWriter(qd, buffer_size=1, collection='chunk_content')chain2: Chain(Select('sections', flat, select2, chunk, flat_chunk, vector2, write2)processor: Chain(select, extract, Fork(chain1, chain2, copy_data=True))

相关文章:

  • markdown语法大全- 最新markdown中文文档
  • Linux内核哈希表学习笔记
  • 【TeamFlow】4 用户管理系统的实现
  • 【每日八股】复习 MySQL Day1:事务
  • 【TeamFlow】3 Rust 与 WebAssembly (Wasm) 深度应用指南
  • 爱在冰川-慢就是快
  • 基于 pnpm + Monorepo + Turbo + 无界微前端 + Vite 的企业级前端工程实践
  • 【HarmonyOS 5】makeObserved接口详解
  • C++初阶的应用-日期管理系统的设计与实现
  • hackmyvm-quick3
  • 运维侠职场日记9:用DeepSeek三天通关详解自动化操作pdf批量提取PDF文字将PDF转Word文档(附上脚本代码)
  • aws(学习笔记第三十九课) iot-core
  • 【人工智能】Agent未来市场与技术潜力分析
  • 【网络原理】TCP协议如何实现可靠传输(确认应答和超时重传机制)
  • C++项目 —— 基于多设计模式下的同步异步日志系统(5)(建造者模式)
  • [操作系统] 信号
  • 2025.04.20
  • 代码随想录训练营第36天 ||1049. 最后一块石头的重量 II 494. 目标和 474. 一和零
  • 无回显RCE
  • 凤凰架构-数据管理与存储
  • 谁在贩卖个人信息?教培机构信息失守,电商平台“订单解密”
  • 经济参考报:安全是汽车智能化的终极目标
  • 合肥打造全球首个无人艇超级工厂,请看《浪尖周报》第21期
  • 全球南方声势卓然壮大的历史逻辑——写在万隆会议召开70周年之际
  • “杭州六小龙”爆火出圈后,浙江高规格部署人工智能发展
  • 女外交官杨扬出任中国驻圭亚那大使