当前位置: 首页 > news >正文

【最后203篇系列】022 用Deepseek14b提取新闻事件

这算是之前一直想做的一件事,趁周末赶快做了。

业务意义:现实中有大量的舆情,这对我们的决策会有比较重要的作用

技术依赖:

  • 1 模型基础能力
  • 2 消息队列
  • 3 异步获取消息
  • 4 时间序列库

1 模型基础能力

大模型发展到现在,已经有了很大的变化。过去,在本地执行的大模型能力是完全不够的,deepseek14b出来之后我测了一下,就基础能力还是已经ok,但是推理过程我不知道是不是模型执行本身的代价,但其实我是不需要的。

本地模型能力够了之后,那么就很有意思。一方面,它可以比较节约成本。对于一些研究性质的任务应该是够的,比如我这次就打算用4060TI(16G)和Mac MiniM4(16G)来运行模型。另一方面,由于数据不出去,所以对数据隐私保护就比较好,这就可以尝试更多重要的任务。

另外就是模型的尺寸问题了,我粗略的感觉了一下

尺寸能力输入上限
1.5b也许可以做一些特别特别简单的任务2k
8b可以做简单任务上8k
14b可以做中等复杂的任务8k
32b可以做较复杂任务
100b +可以直接商业化

目前我觉得14b可能是性价比最高的模型尺寸。

小模型有一个明显的问题,就是可能无法严格按照指令执行任务。以生成json为例,可能会出现更高的失败率。当然尺寸模型也会碰到同样的问题。所以,对于大模型应用来说,应该用一些封装方式转为可靠节点。

小模型,很像以前集成学习中的弱分类器,它的使用并不会那么直接。需要有一套体系来进行校验和评估,有意义的是,这套校验和评估方法同样适合更大尺寸的模型,尤其是面对现在层出不同的大模型时。

标准化封装,然后使用体系进行归纳校验

2 消息队列

消息队列非常重要,缓冲和回放是最核心的功能,kafka这方面做的真挺不错。

像类似新闻的摘要数据到来后进入队列,一方面是缓冲,另一方面也是进行按时间的自然截断。如果觉得有必要,是可以让这些数据进入持久队列的,我给自己准备的存储空间非常多,可能未来还会更多(火力不足恐惧症)

由于处理不是唯一的,所以需要有多个出队列,这时候就要用回放了。例如,我先在有 input_topic1,然后我需要进行下一步etl,可能我先想到了方法1 :只提取新闻的标题和正文,满足了现在的需求,于是写入 tier2_topic_etl1。可能过一阵我又想到方法2:再提取评论连接。这时候就会写入tier2_topic_etl2。最后也许会放弃方法1,又或者放弃方法2,但这个就比较简单了。

类似的,或者不可避免的,我会采用多个不同的模型去处理相同的数据,这时候就又可以继续往后分叉了。可以非常方便的服务于多种探索,只要换一个消费组id就行了。这些不同的分支,可以在不同层级的部分进行合并,这方面mongo又非常合适。所以生成唯一主键是必要的。

当然,最后如果结论是采纳的,一定会在时间序列库中表现为事件。到了这个层级,才是(基本)可决策的。

3 异步获取消息

对于大量的数据获取和流转,一定要采用异步/协程的方式。

从去年开始,我才深深体会到这个问题。以前很多时候是批量传入,批量处理,并没有感觉。在大量实时性的任务中,一定会有数量庞大,但是数据又很小的任务,且会受到网络波动。没有异步和有异步效率可能差百倍。

在大模型时代,很多请求是独立到来的,比如用户的一个问题。我粗略估算过,大模型时代单条数据处理的成本可能是以前一百万倍。所以,必须要能够服务好单条的请求。

目前我的大部分服务已经转向了异步和微批次,在worker端重新搭建了fastapi+aps+celery,主要是发现其他的一些成熟异步框架也是这样的,所以还不如自己搭一下。搭好有一阵子了,之前还没想好怎么去进行统一化调度,所以也没有立即启用。我想接着这个周末,需要将这个推入试产状态。

4 时间序列库

技术和工具只是一个表象和约束,关键是背后的逻辑。我觉得line protocal挺好的,不仅约定了一条简洁的时间数据从技术上怎么定义的,同时也反过来促使业务人员思考:

  • 1 什么时候要建立一个bucket (db level)
  • 2 可能会有哪些measurement (collection level)
  • 3 对于某条数据来说,哪些是tags,哪些是field

这种结构会对数据的最终处理带来影响,不是唯一的,但是是非常有用的。最近在看clickhouse的书,里面也是做了一些取舍(不支持update,delete, 不太支持单条精确查询等),然后利用一些新的有利特性(CPU的SIMD),从而达到令人惊叹的性能(极高效取数、极高效统计和极少磁盘占用)。

终于可以言归正传:现在的 point是我能每分钟拿到一些公开新闻摘要,我需要从中间提取事件,然后找出一类特定事件(比如打击证券行业犯罪等)。所以这里有两部分:①提取数据 ②判定事件

因为获取到的数据结构性非常好,所以我会先用简单的正则将数据提取出来,校验后就可以送到下一步。

然后在下一步对事件进行判断,然后给到类别的判定和置信度,这里用到deepseek-r1-14b。

数据的规律是以时间开头的固定元组。我简单写了一个清洗逻辑:

import re
def is_valid_time(text= None):
    pattern = r'^([01]?[0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]$'
    return bool(re.match(pattern, text))

import re

def is_has_brackets(text = None):
    pattern = r'【[^】]*】'
    return bool(re.search(pattern, text))
some_msg.split('\n')
s = pd.Series(some_msg.split('\n'))
s_list = list(s)
# 1 标记
time_tag = s.apply(is_valid_time)
import numpy as np 
time_tag_arr = time_tag.values

time_tag_pos = np.argwhere(time_tag_arr ==True).ravel()
time_tag_pos1 = np.append(time_tag_pos, time_tag_pos[-1] + 6)
# time_tag_pos_s = pd.Series(time_tag_pos)
# time_tag_pos_s1 = time_tag_pos_s.diff(1)

time_tag_pos_start = time_tag_pos1[:-1]
time_tag_pos_end = time_tag_pos1[1:]

# 将起始点和结束点组合成一个二维数组
time_tag_pos2 = np.column_stack((time_tag_pos_start, time_tag_pos_end))

candidate_tuple_list = []
for tem_tuple in time_tag_pos2:
    tem_list = s_list[tem_tuple[0]:tem_tuple[1]]
    if is_has_brackets(tem_list[1]):
        tem_dict = {}
        tem_dict['event_time'] = tem_list[0]
        tem_dict['title'] =  tem_list[1].replace('*','')
        for j in range(2,len(tem_list)):
            tem_v = tem_list[j]
            if tem_v.startswith('阅'):
                tem_dict['read'] = tem_v 
            elif tem_v.startswith('[评论'):
                tem_dict['comments'] = tem_v 
            elif tem_v.startswith('分享'):
                tem_dict['share'] = tem_v
            else:
                pass 
        candidate_tuple_list.append(tem_dict)

理解原文结构和写代码大约花了我1.5个小时,主要是在怎么通过下标快速从列表中取出子列表这个问题上纠结了很久。如果只是为了实现逻辑的话,不会超过0.5个小时。

然后随之而来是一个问题:我相信在这个case中,数据的格式一般不会有大变化 。但是未来肯定是会变的,所以这种固定解析逻辑比较不那么靠谱,得要加一个校验。怎么样才能有更好的效率和效果呢?

如果我能抽象出其中一定有的元素,或者我需要的元素,这个可以比较抽象;然后让大模型去完成这个解析显然会更具有通用性。

这种方案后续再试,很多工具都在变,而且我这种非结构化爬取的内容应该不会很多。原则上,应该还是原始数据- 初筛 - 精筛。初筛是一定需要的,先把相关的提出来,或者把不相关的过滤掉。

好了,这一步就算etl,我先做掉。

做之前,需要先订立一些元数据,我会同步写往rocks和mongo。平时主要用rocks,除非数据挂了才用mongo。

数据清洗完了之后就可以入库了,第一次可以将全量的数据写入。这次我没有写原始数据,之后可能需要备份一批,否则kafka只有7天 ,嗯,似乎也够了,新闻太老意义不大。

接下来是deepseek时间:对数据进行事件的判定

简单封装一下:

调用函数

import time 
from openai import OpenAI
def call_local_deepseek(api_key ='ollama',base_url="http://172.17.0.1:11434/v1",model="deepseek-r1:14b",system_prompt = '', user_prompt = ''):
    tick1 = time.time()
    client = OpenAI(
        base_url=base_url,  # 替换为远端服务器的 IP 或域名
        api_key=api_key,  # 必填项,但会被忽略
    )

    response = client.chat.completions.create(
        model=model,  # 替换为你下载的模型名称
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt}
        ],
        temperature=0  # 设置 temperature 参数,这里设置为 0.7,你可以根据需要调整
    )
    print('call_local_deepseek takes %.2f' %(time.time() - tick1))
    # 输出结果
    return response.choices[0].message.content
# 测试
call_local_deepseek(user_prompt ='你是谁')

结果的解析函数

import re
import json
def extract_json_from_response(response):
    """
    从大模型的回复中提取 JSON 部分。
    :param response: 大模型的回复内容
    :return: 解析后的 JSON 数据(字典形式),若未找到则返回空字典
    """
    # 定义正则表达式模式,用于匹配 JSON 内容
    json_pattern = r'```json\s*([\s\S]*?)\s*```'
    # 查找匹配的 JSON 内容
    match = re.search(json_pattern, response)
    if match:
        # 提取 JSON 字符串
        json_str = match.group(1)
        try:
            # 解析 JSON 字符串为 Python 字典
            json_data = json.loads(json_str)
            return json_data
        except json.JSONDecodeError:
            print("JSON 解析失败,请检查模型回复的 JSON 格式是否正确。")
    return None
# 以下是一个 Python 函数,用于提取大模型回复中指定标签内的内容。如果未找到指定标签,函数将返回空字符串。

import re
def extract_tag_content(response, tag):
    """
    从大模型的回复中提取指定标签内的内容。

    :param response: 大模型的回复内容
    :param tag: 要提取的标签,例如 'think'
    :return: 标签内的内容,如果未找到则返回空字符串
    """
    # 构建正则表达式模式,用于匹配指定标签内的内容
    pattern = fr'<{tag}>([\s\S]*?)</{tag}>'
    # 查找匹配的内容
    match = re.search(pattern, response)
    if match:
        # 提取标签内的内容
        return match.group(1)
    return None

看效果

在这里插入图片描述
用函数提取出内容:效果是不错的。不过r1废话多的特点导致了每条数据的处理时间太长了。4060ti有点顶不住啊,哈哈

In [11]: extract_json_from_response(res_content)
Out[11]:
{'market_punishment': 65,
 'market_down': 0,
 'explanation': '新闻标题明确提到了对两家私募机构的违规操作进行了处罚,这与A股市场的监管和处罚相关。阅读量较高(52.3万)和分享量(47次)也表明了该事件的关注度。因此,这条新闻的相关性评分较高,但考虑到涉及的是小规模私募,整体影响可能有限,所以评分为65分。'}

我试试并发,据说ollama允许4个,至少3个看起来是好的

In [16]: keyword_args_list = [{'system_prompt':system_prompt,  'user_prompt' : str(sample_data1)},
    ...:                      {'system_prompt':system_prompt,  'user_prompt' : str(sample_data2)},
    ...:                      {'system_prompt':system_prompt,  'user_prompt' : str(sample_data3)}
    ...:
    ...:                     ]
    ...:
    ...:
    ...: tick1 = time.time()
    ...: res_list = thread_concurrent_run(call_local_deepseek, keyword_args_list = keyword_args_list, max_workers= 3)
    ...: tick2 = time.time()
    ...:
    ...:
2025-03-22 23:04:49 - httpx - INFO - HTTP Request: POST http://172.17.0.1:11434/v1/chat/completions "HTTP/1.1 200 OK"
call_local_deepseek takes 17.80
2025-03-22 23:04:49 - httpx - INFO - HTTP Request: POST http://172.17.0.1:11434/v1/chat/completions "HTTP/1.1 200 OK"
call_local_deepseek takes 17.92
2025-03-22 23:04:52 - httpx - INFO - HTTP Request: POST http://172.17.0.1:11434/v1/chat/completions "HTTP/1.1 200 OK"
call_local_deepseek takes 21.00

解析出来

In [18]: res_list1 = [extract_json_from_response(x) for x in res_list]

In [19]: res_list1
Out[19]:
[{'market_punishment': 0,
  'market_down': 10,
  'explanation': '新闻标题提到市场一片大好,没有涉及A股市场的处罚信息,因此市场处罚评分为0。虽然整体情绪积极,但并未明确提及市场下跌或投资者损失,故市场下跌评分较低为10分。'},
 {'market_punishment': 0,
  'market_down': 90,
  'explanation': '新闻标题明确提到A股市场大跌1.5%,属于A股市场整体下跌的情况,相关性很高。'},
 {'market_punishment': 60,
  'market_down': 10,
  'explanation': '新闻标题提到对两家小规模私募机构的违规操作进行了处罚,这直接关联到A股市场的监管和整治,因此属于A股市场处罚类别。虽然没有提到具体的市场下跌情况,但处罚通常可能会影响市场情绪,所以相关性较高。阅读量和分享量也显示了一定的关注度。'}]

然后很奇怪的是,单发和多发的情况有点不一样。对于第一条,是稳定这样的。重复跑3条也是稳定的。暂时只能理解量大质量就下滑了。有点搞笑。之前我发现在批量实体识别的时候也有类似的问题。

{'market_punishment': 0,
 'market_down': 0,
 'explanation': '新闻标题提到市场一片大好,没有涉及A股市场的处罚或下跌内容,因此两个类别的相关性评分为0。'}

然后我试了下8b,就这个基础问题好像也还行,速度快了一倍。

在这里插入图片描述

最后我试了下1.5b,就完全bbq
在这里插入图片描述

因为mac可能是需要设置一下网络开放,速度感觉和4060ti也差不多。

实际跑了一个一分钟的数据,哈哈,有点尴尬,赶不上数据的速度,就先科研一下吧。
在这里插入图片描述

相关文章:

  • 官方通知 | 2025年CAIP人工智能职场应用师(AI职场应用师)职业能力认证正式发布
  • 【机器学习】机器学习四大分类
  • Camera2 与 CameraX 闲谈
  • 【惯性系与固连系速度位置加速度转换关系】
  • Redis 内存淘汰策略
  • Compose 原理解析
  • 【信息系统项目管理师】【高分范文】【历年真题】​论信息系统项目的风险管理
  • 基于大模型的甲状舌管囊肿全流程预测与临床方案研究报告
  • 【第22节】windows网络编程模型(WSAAsyncSelect模型)
  • 【江协科技STM32】软件SPI读写W25Q64芯片(学习笔记)
  • 小米AX6000解锁ssh避坑笔记
  • 【java面试】线程篇
  • AC交流采样电路
  • DL学习笔记:穿戴设备上的轻量级人体活动识别方法
  • AI Agent开发大全第四课-提示语工程:从简单命令到AI对话的“魔法”公式
  • 【赵渝强老师】在Docker中运行达梦数据库
  • Model Context Protocol:下一代AI系统集成范式革命
  • nebula graph传统使用Docker进行项目发版
  • Ceph集群2025(Squid版)快速对接K8S cephFS文件存储
  • Java 之「单调栈」:从入门到实战
  • 台媒称美派遣前军官出任“汉光演习”资深观察员,国防部回应
  • 养胃不是顿顿喝粥,这份“胃的使用说明书”请收好
  • 我国翻译从业人员达680.8万人,行业总需求仍在上升
  • 特写|“三峡千古情”出圈,一场演出给宜昌留下更多游客
  • 920余名在缅甸当阳等地实施跨境电信网络诈骗的中国籍犯罪嫌疑人被移交我方
  • 2025欧亚经济合作发展论坛在沪举办