基于论文的大模型应用:基于SmartETL的arXiv论文数据接入与预处理(三)
上一篇 介绍了数据接入处理的整体方案设计。本篇介绍基于SmartETL框架的流程实现。
5. 流程开发
5.1.简单采集流程
从指定时间(yy年 mm月)开始,持续采集arXiv论文。基于月份和顺序号,构造论文ID,进而下载论文PDF文件,保存到本地。
5.1.1.Loader设计
定制开发一个新的Loader“web.ArXivTaskEmit”,基于月份和顺序号构造并输出论文ID。每个月的顺序号从00001到29999。输出格式样例:2503.00001(字符串)。代码如下:
import time
import os
import jsonfrom wikidata_filter.util.dates import current_date
from wikidata_filter.loader.base import DataProviderclass ArXivTaskEmit(DataProvider):ts_file = ".arxiv.ts"def __init__(self, start_month: int = None, end_month: int = None):self.month = start_month or 2501self.seq = 1if os.path.exists(self.ts_file):with open(self.ts_file, encoding="utf8") as fin:nums = json.load(fin)self.month = nums[0]self.seq = nums[1]self.end_month = end_month or int(current_date('%y%m'))print(f"from {self.month}.{self.seq} to {self.end_month}")def write_ts(self):row = [self.month, self.seq]with open(self.ts_file, "w", encoding="utf8") as out:json.dump(row, out)def iter(self):while self.month <= self.end_month:while self.seq < 30000:new_id = f'{self.month}.{self.seq:05d}'print("processing:", new_id)yield new_idself.seq += 1self.write_ts()time.sleep(3)self.month += 1if self.month % 100 > 12:self.month += int(self.month/100)*100 + 101self.seq = 1self.write_ts()
5.1.2.Processor设计
处理过程包括:
- 将输入数据包装为dict结构。基于内置
ToDict
转换算子。转换后,数据变成:{‘d’: ‘2503.00001’} - 构造PDF文件URL。基于内置
ConcatFields
转换算子。转换后,数据变成:{‘d’: 2503.00001’, ‘url_pdf’: ‘https://arxiv.org/pdf/2503.00001’} - 构造保存本地的文件名。基于内置
ConcatFields
转换算子。转换后,数据变成:{‘d’: 2503.00001’, ‘url_pdf’: ‘https://arxiv.org/pdf/2503.00001’, ‘filename’: ‘data/arxiv/2503.00001.pdf’} - 判断文件不存在。基于内置
Not
算子,组合util.files.exists
函数。 - 文件下载。基于内置
Map
算子,组合util.http.content
函数,将获取的文件内容(bytes
类型)作为字段content
的值。 - 保存文件。基于内置
SaveFiles
算子。
上述流程SmartETL的YAML流程定义语法,基于已有的组件进行实例化,并通过Chain
进行组合,形成顺序处理流程,完成文件持续下载。
5.1.3.流程定义
name: arXiv简单采集流程
description: 对指定时间范围arXiv论文进行采集loader: web.ArXivTaskEmitnodes:as_dict: ToDictmake_url: ConcatFields('url_pdf', 'd', prefix=’https://arxiv.org/pdf/’)make_filename: ConcatFields('filename', 'd', prefix='data/arxiv/', suffix='.pdf')file_not_exists: Not('util.files.exists', key='filename')download: Map('util.http.content', key='url_pdf', target_key='content', most_times=3, ignore_error=True)save_file: WriteFiles('data/arxiv', name_key=’filename’)processor: Chain(as_dict, make_url, make_filename, file_not_exists, download, save_file)
5.2.论文搜索&HTML采集流程
针对指定关键词,利用arXiv API搜索论文,下载论文HTML页面。
5.2.1.Loader设计
通过命令行的交互输入(input
函数)获取关键词作为任务输入。
5.2.2.Processor设计
处理过程包括:
- 通过命令行获取用户输入关键词。输出数据为字符串,例如“RAG”。
- 基于输入关键词进行检索,基于arXiv官网搜索API进行检索。由于检索结果为XML格式,基于
xmltodict
库转换为JSON格式,对论文结构附加url_pdf
、url_html
字段,表示PDF和HTML的下载链接。这一步采用定制开发函数gestata.arxiv.search
,配合Map
算子进行处理。 - 【可选的】数组打散。如果前一步
search
函数返回为数组结构,则需要通过Flat
组件进行打散;如果为生成器yield
方式返回,则框架自行处理。 - 下载HTML。内置
util.http.content
函数提供http请求(基于requests
库),结合Map
算子,实现arxiv HTML文件下载,将获取的文件内容(bytes)作为字段content
的值。 - 过滤。判断
content
是否为空,如果网络连接失败或者HTML文件本身不存在,就可以终止。 - 保存HTML。将前一步获取的HTML内容(即
content
字段值)保存为文件。
搜索核心代码如下:
ARXIV_API_BASE = "http://export.arxiv.org/api/query"
ARXIV_BASE = "http://arxiv.org"def search(topic: str, max_results: int = 50):"""基于arXiv API的论文搜索"""if ':' not in topic:topic = 'all:' + topicparams = {"search_query": topic,"max_results": max_results,"sortBy": "lastUpdatedDate","sortOrder": "descending"}res = requests.get(ARXIV_API_BASE, params=params)doc = xmltodict.parse(res.text)feed = doc.get("feed")if "entry" not in feed:return []papers = feed.get("entry")for paper in papers:_id = paper["id"]_id = _id[_id.rfind('/')+1:]paper["_id"] = _idpaper["url_pdf"] = f"{ARXIV_BASE}/pdf/{_id}"paper["url_html"] = f"{ARXIV_BASE}/html/{_id}"return papers
在SmartETL框架最新设计中,通过将函数动态绑定为处理节点,只需要将业务处理逻辑实现为一个函数,就能够在流程中进行调用,极大方便了数据处理算子开发流程。
5.2.3.流程定义
name: arXiv搜索采集流程
description: 基于用户提供的关键词进行arXiv论文并下载loader: Input('请输入arXiv论文搜索关键词:')nodes:search: Map('gestata.arxiv.search', max_results=10)download: Map('util.http.content', key='url_html', target_key='content', most_times=3, ignore_error=True)filter: FieldsNonEmpty('content')save: WriteFiles('data/arxiv', name_key='_id', suffix='.html')processor: Chain(search, Flat(), download, filter, save)
5.3.HTML解析建索引流程
对下载的HTML页面进行解析,并对论文摘要和正文建立向量化索引。
5.3.1.HTML解析
由于arXiv论文的HTML页面具有特定的网页结构,通过lxml或beaultifulsoup库,可实现对论文标题、作者、章节(包括其中的图片、表格、公示等)列表、附录、参考文献等信息的精准识别。
5.3.2.Loader设计
可直接复用前文所述的自动生成论文ID的“web.ArXivTaskEmit”,也可以采用基于关键词搜索的Input
+gestata.arxiv.search
。
5.3.3.Processor设计
处理过程包括:
- HTML解析。设计函数组件
gestata.arxiv.extract
,从输入的HTML(str
类型)中进行论文结构抽取。注意,为了对HTML中的相对链接进行绝对定位,解析HTML时需要提供该HTML的基路径(base path)。此外,设计的函数考虑到可能在其他地方调用,建议提供相关参数配置。 - 论文摘要索引。论文摘要是一个单独的字段,并且通常长度较短,符合向量化索引chunk的长度要求,可以通过以下两步建立索引:
2.1 向量化。通过内置的model.embed.Local
组件,调用embedding模型,生成摘要的向量
2.2 写向量库。利用提前初始化的Qdrant
组件,基于内置的DatabaseWriter
组件进行写入。 - 论文正文索引。论文包括多个章节(section),并且篇幅较长,通常需要进行chunk化拆分。一种简单的方式先按section拆分(避免不同章节的内容成为一个chunk),然后根据段落和句子边界进行长度拆分。过程如下:
3.1 对sections进行打散,即转成每个section作为一条记录输出。
3.2 对每个section的content
字段进行chunk化拆分并打散
3.3 向量化。同上。
3.4 写向量库。同上。
5.3.4.流程定义
name: arXiv论文HTML解析与索引流程
description: 对论文HTML进行解析并建立摘要和正文的向量索引loader: Directory('data/arxiv', '.html')nodes:qd: util.database.qdrant.Qdrant(**qdrant)select: SelectVal('data')extract: Map('gestata.arxiv.extract_from_html')vector1: model.embed.Local(api_base=bge_large_en, key='abstract', target_key='vector')write1: DatabaseWriter(qd, buffer_size=1, collection='chunk_abstract')chain1: Chain(Select('abstract'), vector1, write1)flat: FlatProperty('sections', inherit_props=True)select2: Select('content')chunk: Map('util.split.simple', key='content',target_key='chunks')flat_chunk: Flat(key='chunks')vector2: model.embed.Local(api_base=bge_large_en, key='chunks', target_key='vector')write2: DatabaseWriter(qd, buffer_size=1, collection='chunk_content')chain2: Chain(Select('sections', flat, select2, chunk, flat_chunk, vector2, write2)processor: Chain(select, extract, Fork(chain1, chain2, copy_data=True))