当前位置: 首页 > news >正文

老旧测试用例生成平台异步任务与用户通知优化

在现代 Web 开发中,异步任务处理和用户通知是两个重要的功能。由于老旧测试平台【测试用例生成平台,源码分享】进行智能化升级后,未采用异步任务处理,大模型推理时间较长,导致任务阻塞,无法处理其他任务,体验较差。本文将以 Django 框架为基础,结合 Celery 和 Redis,完成一个完整的异步任务系统,并实现用户通知功能。我们还会对接阿里云百炼 DeepSeek-R1 模型,通过大模型生成测试用例,并提供数据的增删改查功能。以下是详细的实现步骤和完整代码示例。


一、需求分析与功能拆解

1.1 功能需求

本项目的主要需求如下:

  1. 用户通过 API 输入任务名称和字段信息,系统为任务生成唯一的 UUID,并提示用户任务已创建。
  2. 异步任务系统调用阿里云百炼 DeepSeek-R1 模型,传递字段信息生成 JSON 格式的测试用例。
  3. 将模型生成的测试用例存储到数据库的测试用例预览表中。
  4. 通知用户任务完成,支持通过 WebSocket 或邮件通知。
  5. 提供一个页面对生成的测试用例数据进行增删改查操作。

1.2 技术选型

  • Django:作为后端框架,负责处理 API 请求、任务状态管理和数据库操作。
  • Celery:用于实现任务的异步处理。
  • Redis:作为 Celery 的消息队列。
  • 阿里云百炼 DeepSeek-R1:调用大模型接口生成测试用例。
  • Django Channels:实现 WebSocket 通知功能。
  • 前端页面:用于展示和管理测试用例数据。

二、项目创建与环境配置

2.1 安装依赖

在开始开发之前,需要安装以下依赖工具和库:

pip install django celery redis requests channels

2.2 项目目录结构

项目的目录结构如下:

your_project_name/
├── your_project_name/
│   ├── __init__.py
│   ├── settings.py
│   ├── urls.py
│   ├── asgi.py
│   ├── celery.py
├── your_app_name/
│   ├── __init__.py
│   ├── models.py
│   ├── views.py
│   ├── tasks.py
│   ├── signals.py
│   ├── urls.py
├── manage.py

三、系统实现

3.1 配置 Celery 和 Redis

3.1.1 在 settings.py 中配置 Celery
# Celery 配置
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'

# Django Channels 配置
ASGI_APPLICATION = 'your_project_name.asgi.application'
CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            'hosts': [('127.0.0.1', 6379)],
        },
    },
}
3.1.2 创建 celery.py

在项目根目录下创建 celery.py 文件,用于初始化 Celery:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# 设置 Django 默认的 settings 模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project_name.settings')

app = Celery('your_project_name')

# 从 Django 的 settings.py 加载配置
app.config_from_object('django.conf:settings', namespace='CELERY')

# 自动发现任务模块
app.autodiscover_tasks()
3.1.3 配置 __init__.py

在项目目录下的 __init__.py 文件中添加以下内容:

from __future__ import absolute_import, unicode_literals

# Celery 应用
from .celery import app as celery_app

__all__ = ('celery_app',)
3.1.4 配置 asgi.py

在项目根目录下的 asgi.py 文件中配置 WebSocket 支持:

import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project_name.settings')

application = ProtocolTypeRouter({
    'http': get_asgi_application(),
    'websocket': AuthMiddlewareStack(
        URLRouter([
            # WebSocket 路由
        ])
    ),
})

3.2 数据模型设计

models.py 中定义两张表:任务表测试用例预览表

from django.db import models
import uuid

# 测试用例预览表
class TestCasePreview(models.Model):
    id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
    field_name = models.CharField(max_length=255)
    field_type = models.CharField(max_length=255)
    field_value = models.TextField()
    generated_test_case = models.JSONField()  # 存储生成的 JSON 测试用例
    created_at = models.DateTimeField(auto_now_add=True)

# 异步任务表
class AsyncTask(models.Model):
    TASK_STATUS = (
        ('PENDING', 'Pending'),
        ('PROCESSING', 'Processing'),
        ('COMPLETED', 'Completed'),
        ('FAILED', 'Failed'),
    )
    id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
    name = models.CharField(max_length=255, unique=True)
    status = models.CharField(max_length=20, choices=TASK_STATUS, default='PENDING')
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    result_message = models.TextField(null=True, blank=True)

3.3 异步任务实现

tasks.py 中实现异步任务逻辑:

import requests
from celery import shared_task
from .models import AsyncTask, TestCasePreview
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync

@shared_task
def generate_test_cases(task_id, field_data):
    try:
        # 获取任务
        task = AsyncTask.objects.get(id=task_id)
        task.status = 'PROCESSING'
        task.save()

        # 调用阿里云百炼 DeepSeek-R1 API
        client = OpenAI(
            # 如果没有配置环境变量,请用百炼API Key替换:api_key="sk-xxx"
            # api_key='sk-xxx',
            api_key='sk-712a634dbaa7444d838d20b25eb938xx',  # todo 此处需更换
            base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
        )
        reasoning_content = ""  # 定义完整思考过程
        answer_content = ""  # 定义完整回复
        is_answering = False  # 判断是否结束思考过程并开始回复
        # 创建聊天完成请求
        completion = client.chat.completions.create(
            model="deepseek-r1",  # 此处以 deepseek-r1 为例,可按需更换模型名称
            messages=[
                {'role': 'user',
                 'content': prompt_param}
            ],
            stream=True,
            # 解除以下注释会在最后一个chunk返回Token使用量
            # stream_options={
            #     "include_usage": True
            # }
        )

        print("\n" + "=" * 20 + "思考过程" + "=" * 20 + "\n")

        for chunk in completion:
            # 如果chunk.choices为空,则打印usage
            if not chunk.choices:
                print("\nUsage:")
                print(chunk.usage)
            else:
                delta = chunk.choices[0].delta
                # 打印思考过程
                if hasattr(delta, 'reasoning_content') and delta.reasoning_content != None:
                    print(delta.reasoning_content, end='', flush=True)
                    reasoning_content += delta.reasoning_content
                else:
                    # 开始回复
                    if delta.content != "" and not is_answering:
                        print("\n" + "=" * 20 + "完整回复" + "=" * 20 + "\n")
                        is_answering = True
                    # 打印回复过程
                    print(delta.content, end='', flush=True)
                    answer_content += delta.content

        # 存储测试用例到数据库
        for field in field_data:
            TestCasePreview.objects.create(
                field_name=field["field_name"],
                field_type=field["field_type"],
                field_value=field["field_value"],
                generated_test_case=extract_json_objects(answer_content)
            )

        # 更新任务状态
        task.status = 'COMPLETED'
        task.result_message = "测试用例生成成功!"
        task.save()

        # 通过 WebSocket 通知用户
        channel_layer = get_channel_layer()
        async_to_sync(channel_layer.group_send)(
            f"task_{task_id}",
            {"type": "task_status", "message": "任务已完成"}
        )

    except Exception as e:
        task.status = 'FAILED'
        task.result_message = str(e)
        task.save()

3.4 用户接口

views.py 中提供创建任务的接口:

from django.http import JsonResponse
from .models import AsyncTask
from .tasks import generate_test_cases

def create_task(request):
    if request.method == 'POST':
        task_name = request.POST.get('task_name')
        field_data = request.POST.get('field_data')  # JSON 格式的字段列表

        # 检查任务名称是否唯一
        if AsyncTask.objects.filter(name=task_name).exists():
            return JsonResponse({"error": "任务名称已存在"}, status=400)

        # 创建任务
        task = AsyncTask.objects.create(name=task_name)

        # 调用异步任务
        generate_test_cases.delay(task.id, field_data)

        return JsonResponse({"message": "任务已创建", "task_id": task.id})

3.5 WebSocket 通知

WebSocket 消费者 consumers.py

from channels.generic.websocket import AsyncWebsocketConsumer
import json

class TaskConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.task_id = self.scope['url_route']['kwargs']['task_id']
        self.task_group_name = f"task_{self.task_id}"

        await self.channel_layer.group_add(
            self.task_group_name,
            self.channel_name
        )
        await self.accept()

    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(
            self.task_group_name,
            self.channel_name
        )

    async def task_status(self, event):
        message = event['message']
        await self.send(text_data=json.dumps({"message": message}))

3.6 前端页面展示

创建一个简单的页面用于展示测试用例:

<!DOCTYPE html>
<html lang="en">
<head>
    <title>测试用例预览</title>
</head>
<body>
    <h1>测试用例预览</h1>
    <table id="test-cases">
        <thead>
            <tr>
                <th>字段名称</th>
                <th>字段类型</th>
                <th>字段值</th>
                <th>测试用例</th>
            </tr>
        </thead>
        <tbody>
            <!-- 数据通过 Ajax 加载 -->
        </tbody>
    </table>
</body>
</html>

四、总结

本项目从需求分析出发,采用 Django 框架结合 Celery、Redis 和 Django Channels,完整实现了一个基于异步任务的测试用例生成系统。系统支持任务状态管理、用户通知,以及前端数据的增删改查功能,逻辑清晰,功能完善,非常适合在实际项目中推广应用。在老旧测试平台智能化中,异步是不可或缺的整改步骤,调试好异步任务后,后续优化将进入快速迭代阶段。加油吧~

相关文章:

  • OOM问题排查和解决
  • Java工厂模式解析:灵活对象创建的实践指南
  • Java LinkedList深度解析:双向链表的实现艺术与实战指南
  • 【Python爬虫】简单介绍
  • 16bit转8bit的常见方法(图像归一化)
  • 深入理解浏览器的 Cookie:全面解析与实践指南
  • UNITY 屏幕UI自适应
  • Ubuntu下解压ZIP压缩文件出现中文乱码问题的综合解决方案
  • python提升图片清晰度
  • 【学习】对抗训练-WGAN
  • RTT添加一个RTC时钟驱动,以DS1307为例
  • C语言超详细指针知识(三)
  • Java设计模式实战:装饰模式在星巴克咖啡系统中的应用
  • 【JavaScript】二十、电梯导航栏的实现
  • C++之二叉搜索树
  • arcgis几何与游标(1)
  • 【NLP】 自然语言处理笔记
  • Spring-AI-alibaba 结构化输出
  • 技术视界 | 人形机器人运动控制框架详解:解锁智能机器人的“灵动”密码
  • 如何使用maxscale实现mysql读写分离
  • 湖南永州公安全面推行“项目警官制”,为重点项目建设护航
  • 董明珠的接班人还是董明珠
  • 爱奇艺要转型做微剧?龚宇:是误解,微剧是增量业务,要提高投资回报效益
  • 航空货运三巨头去年净利合计超88亿元,密切关注关税政策变化和市场反应
  • 生态环境部:我国核电规模全球第一,总体安全可控
  • 大家聊中国式现代化|刘亮:因地制宜发展新质生产力,推动经济高质量发展