【后端】构建简洁的音频转写系统:基于火山引擎ASR实现
在当今数字化时代,语音识别技术已经成为许多应用不可或缺的一部分。无论是会议记录、语音助手还是内容字幕,将语音转化为文本的能力对提升用户体验和工作效率至关重要。本文将介绍如何构建一个简洁的音频转写系统,专注于文件上传、云存储以及ASR(自动语音识别)的集成,特别是基于火山引擎ASR服务的实现。
系统架构概览
一个简洁的音频转写系统需要包含以下几个关键组件:
- 前端界面:提供用户上传音频文件的入口
- API服务层:处理请求和业务逻辑
- 云存储服务:安全存储音频文件
- ASR服务:将音频转写为文本(本文使用火山引擎ASR服务)
系统流程如下:
用户 → 上传音频 → 存储到云服务 → 触发ASR转写 → 获取转写结果 → 返回给用户
技术选型
我们的最小实现基于以下技术栈:
- 后端框架:FastAPI(Python)
- 云存储:兼容S3协议的对象存储
- ASR服务:火山引擎ASR服务
- 异步处理:基于asyncio的异步请求处理
详细实现
1. 音频文件上传流程
实现音频上传有两种主要方式:
1.1 预签名URL上传
这种方式适合大文件上传,减轻服务器负担:
async def create_upload_url(file_name, file_size, mime_type):"""创建上传链接"""# 生成唯一文件名timestamp = datetime.now().strftime("%Y%m%d%H%M%S")random_suffix = os.urandom(4).hex()file_ext = os.path.splitext(file_name)[1]filename = f"{timestamp}_{random_suffix}{file_ext}"# 生成存储路径storage_path = f"audio/{filename}"# 获取预签名URLupload_url = storage_client.generate_presigned_url(storage_path,expiry=300, #5分钟有效期http_method="PUT",content_length=file_size)return {"upload_url": upload_url,"storage_path": storage_path}
前端调用示例:
// 1. 获取上传URL
const response = await fetch('/api/audio/upload-url', {method: 'POST',headers: { 'Content-Type': 'application/json' },body: JSON.stringify({file_name: file.name,file_size: file.size,mime_type: file.type})
});
const { upload_url, storage_path } = await response.json();// 2. 使用预签名URL上传文件
await fetch(upload_url, {method: 'PUT',body: file,headers: { 'Content-Type': file.type }
});// 3. 触发转写
const transcriptResponse = await fetch('/api/audio/transcribe', {method: 'POST',headers: { 'Content-Type': 'application/json' },body: JSON.stringify({ storage_path })
});
const transcriptResult = await transcriptResponse.json();
1.2 直接上传方式
适合较小文件,通过API直接上传:
async def upload_audio(file):"""直接上传音频文件"""# 验证文件类型if file.content_type not in ALLOWED_AUDIO_TYPES:raise ValueError("不支持的文件类型")# 读取文件内容contents = await file.read()if len(contents) == 0:raise ValueError("文件内容为空")# 生成唯一文件名timestamp = datetime.now().strftime("%Y%m%d%H%M%S")random_suffix = os.urandom(4).hex()file_ext = os.path.splitext(file.filename)[1]filename = f"{timestamp}_{random_suffix}{file_ext}"# 存储路径storage_path = f"audio/{filename}"# 上传到云存储storage_client.upload(storage_path, contents)# 生成访问URLaccess_url = storage_client.generate_presigned_url(storage_path,expiry=3600, # 1小时有效期http_method="GET")return {"file_name": file.filename,"storage_path": storage_path,"file_size": len(contents),"mime_type": file.content_type,"access_url": access_url,"url_expires_at": datetime.now() + timedelta(hours=1)}
2. ASR语音转写实现
可以通过两种方式调用ASR服务:基于存储路径或直接通过URL。
2.1 基于存储路径的转写
async def transcribe_audio_by_storage_path(storage_path):"""通过存储路径转写音频文件"""# 生成可访问的URLaccess_url = storage_client.generate_presigned_url(storage_path,expiry=3600,http_method="GET")# 调用ASR服务transcript_result = await _call_asr_service(access_url)return {"storage_path": storage_path,"transcript": transcript_result.get("text", ""),"segments": transcript_result.get("segments", []),"duration": transcript_result.get("duration")}
2.2 基于URL的转写
async def transcribe_audio_by_url(audio_url):"""通过URL转写音频"""# 调用ASR服务transcript_result = await _call_asr_service(audio_url)return {"audio_url": audio_url,"transcript": transcript_result.get("text", ""),"segments": transcript_result.get("segments", []),"duration": transcript_result.get("duration")}
2.3 上传并立即转写
async def upload_and_transcribe(file):"""上传并立即转写音频文件"""# 上传文件upload_result = await upload_audio(file)# 转写音频transcript_result = await _call_asr_service(upload_result["access_url"])# 组合结果return {"file_name": upload_result["file_name"],"storage_path": upload_result["storage_path"],"file_size": upload_result["file_size"],"mime_type": upload_result["mime_type"],"access_url": upload_result["access_url"],"transcript": transcript_result.get("text", ""),"segments": transcript_result.get("segments", []),"duration": transcript_result.get("duration")}
3. 火山引擎ASR服务调用实现
以下是基于火山引擎ASR服务的详细实现:
async def _call_asr_service(audio_url):"""调用火山引擎ASR服务进行转写"""# 生成唯一任务IDtask_id = str(uuid.uuid4())# 火山引擎ASR服务API端点submit_url = "https://openspeech.bytedance.com/api/v3/auc/bigmodel/submit"query_url = "https://openspeech.bytedance.com/api/v3/auc/bigmodel/query"# 构建请求头headers = {"Content-Type": "application/json","X-Api-App-Key": APP_KEY,"X-Api-Access-Key": ACCESS_KEY,"X-Api-Resource-Id": "volc.bigasr.auc","X-Api-Request-Id": task_id,"X-Api-Sequence": "-1"}# 请求体payload = {"audio": {"url": audio_url}}# 提交转写任务async with aiohttp.ClientSession() as session:async with session.post(submit_url, headers=headers, data=json.dumps(payload)) as response:if response.status != 200:error_detail = await response.text()raise ValueError(f"提交ASR任务失败: {error_detail}")response_headers = response.headersstatus_code = response_headers.get("X-Api-Status-Code")log_id = response_headers.get("X-Tt-Logid", "")if status_code not in ["20000000", "20000001", "20000002"]:raise ValueError(f"ASR任务提交错误: {response_headers.get('X-Api-Message', '未知错误')}")# 轮询查询结果max_retries = 10for i in range(max_retries):# 等待一段时间再查询await asyncio.sleep(0.5)# 查询转写结果async with aiohttp.ClientSession() as session:query_headers = {"Content-Type": "application/json","X-Api-App-Key": APP_KEY,"X-Api-Access-Key": ACCESS_KEY,"X-Api-Resource-Id": "volc.bigasr.auc","X-Api-Request-Id": task_id,"X-Tt-Logid": log_id}async with session.post(query_url, headers=query_headers,data=json.dumps({})) as response:if response.status != 200:continuequery_status_code = response.headers.get("X-Api-Status-Code")# 如果完成,返回结果if query_status_code == "20000000":try:response_data = await response.json()result = response_data.get("result", {})text = result.get("text", "")utterances = result.get("utterances", [])return {"text": text, "utterances": utterances}except Exception as e:raise ValueError(f"解析ASR响应失败: {str(e)}")# 如果仍在处理,继续等待elif query_status_code in ["20000001", "20000002"]:await asyncio.sleep(0.5)continueelse:error_message = response.headers.get("X-Api-Message", "未知错误")raise ValueError(f"ASR任务查询失败: {error_message}")# 超过最大重试次数raise ValueError("ASR转写超时,请稍后重试")
4. API接口设计
完整的API接口设计,专注于最小功能实现:
# 1. 获取上传URL
@router.post("/audio/upload-url")
async def create_upload_url(request: dict):return await audio_service.create_upload_url(request["file_name"], request["file_size"], request["mime_type"])# 2. 直接上传音频
@router.post("/audio/upload")
async def upload_audio(file: UploadFile):return await audio_service.upload_audio(file)# 3. 转写音频 (通过存储路径)
@router.post("/audio/transcribe")
async def transcribe_audio(request: dict):return await audio_service.transcribe_audio_by_storage_path(request["storage_path"])# 4. 通过URL转写音频
@router.post("/audio/transcribe-by-url")
async def transcribe_by_url(request: dict):return await audio_service.transcribe_audio_by_url(request["audio_url"])# 5. 上传并转写音频
@router.post("/audio/upload-and-transcribe")
async def upload_and_transcribe(file: UploadFile):return await audio_service.upload_and_transcribe(file)
性能与可靠性优化
在实际生产环境中,我们还应关注以下几点:
1. 大文件处理
对于大型音频文件,应当:
- 使用分块上传方式
- 实现断点续传
- 限制文件大小
- 采用预签名URL方式,避免通过API服务器中转
2. 错误处理和重试
增强系统稳定性:
- 实现指数退避重试策略
- 添加详细日志记录
- 设置超时处理
3. 安全性考虑
保护用户数据:
- 实现访问控制
- 对音频URL设置短期有效期
- 考虑临时文件清理机制
完整示例:构建最小可行实现
下面是一个使用FastAPI构建的基于火山引擎ASR的最小可行实现示例:
import os
import uuid
import json
import asyncio
import aiohttp
from datetime import datetime, timedelta
from fastapi import FastAPI, UploadFile, File
from typing import Dict, Any, Optionalapp = FastAPI()# 配置项
ALLOWED_AUDIO_TYPES = ["audio/mpeg", "audio/wav", "audio/mp4", "audio/x-m4a"]
APP_KEY = os.getenv("VOLCANO_ASR_APP_ID")
ACCESS_KEY = os.getenv("VOLCANO_ASR_ACCESS_TOKEN")# 简单的存储客户端模拟
class SimpleStorageClient:def upload(self, path, content):# 实际项目中应连接到S3、OSS等云存储print(f"Uploading {len(content)} bytes to {path}")return Truedef generate_presigned_url(self, path, expiry=3600, http_method="GET", **kwargs):# 简化示例,实际应返回带签名的URLreturn f"https://storage-example.com/{path}?expires={expiry}&method={http_method}"storage_client = SimpleStorageClient()# API端点
@app.post("/audio/upload-url")
async def create_upload_url(file_name: str, file_size: int, mime_type: str):"""获取上传URL"""timestamp = datetime.now().strftime("%Y%m%d%H%M%S")random_suffix = os.urandom(4).hex()file_ext = os.path.splitext(file_name)[1]filename = f"{timestamp}_{random_suffix}{file_ext}"storage_path = f"audio/{filename}"upload_url = storage_client.generate_presigned_url(storage_path,expiry=300,http_method="PUT",content_length=file_size)return {"upload_url": upload_url,"storage_path": storage_path}@app.post("/audio/upload")
async def upload_audio(file: UploadFile = File(...)):"""直接上传音频文件"""if file.content_type not in ALLOWED_AUDIO_TYPES:return {"error": "不支持的文件类型"}contents = await file.read()if len(contents) == 0:return {"error": "文件内容为空"}timestamp = datetime.now().strftime("%Y%m%d%H%M%S")random_suffix = os.urandom(4).hex()file_ext = os.path.splitext(file.filename)[1]filename = f"{timestamp}_{random_suffix}{file_ext}"storage_path = f"audio/{filename}"storage_client.upload(storage_path, contents)access_url = storage_client.generate_presigned_url(storage_path,expiry=3600,http_method="GET")return {"file_name": file.filename,"storage_path": storage_path,"file_size": len(contents),"mime_type": file.content_type,"access_url": access_url,"url_expires_at": (datetime.now() + timedelta(hours=1)).isoformat()}@app.post("/audio/transcribe")
async def transcribe_audio(storage_path: str):"""通过存储路径转写音频"""access_url = storage_client.generate_presigned_url(storage_path,expiry=3600,http_method="GET")transcript_result = await _call_volcano_asr(access_url)return {"storage_path": storage_path,"transcript": transcript_result}@app.post("/audio/transcribe-by-url")
async def transcribe_by_url(audio_url: str):"""通过URL转写音频"""transcript_result = await _call_volcano_asr(audio_url)return {"audio_url": audio_url,"transcript": transcript_result}@app.post("/audio/upload-and-transcribe")
async def upload_and_transcribe(file: UploadFile = File(...)):"""上传并转写音频文件"""upload_result = await upload_audio(file)if "error" in upload_result:return upload_resulttranscript_result = await _call_volcano_asr(upload_result["access_url"])return {**upload_result,"transcript": transcript_result}async def _call_volcano_asr(audio_url):"""调用火山引擎ASR服务"""if not APP_KEY or not ACCESS_KEY:return {"text": "火山引擎ASR配置缺失,请设置环境变量"}# 生成任务IDtask_id = str(uuid.uuid4())# 火山引擎ASR服务API端点submit_url = "https://openspeech.bytedance.com/api/v3/auc/bigmodel/submit"query_url = "https://openspeech.bytedance.com/api/v3/auc/bigmodel/query"# 提交请求头headers = {"Content-Type": "application/json","X-Api-App-Key": APP_KEY,"X-Api-Access-Key": ACCESS_KEY,"X-Api-Resource-Id": "volc.bigasr.auc","X-Api-Request-Id": task_id,"X-Api-Sequence": "-1"}# 请求体payload = {"audio": {"url": audio_url}}try:# 提交任务async with aiohttp.ClientSession() as session:async with session.post(submit_url, headers=headers, data=json.dumps(payload)) as response:status_code = response.headers.get("X-Api-Status-Code")log_id = response.headers.get("X-Tt-Logid", "")if status_code not in ["20000000", "20000001", "20000002"]:return {"error": f"提交转写任务失败: {response.headers.get('X-Api-Message', '未知错误')}"}# 查询结果max_retries = 10for i in range(max_retries):await asyncio.sleep(1) # 等待1秒# 查询请求头query_headers = {"Content-Type": "application/json","X-Api-App-Key": APP_KEY,"X-Api-Access-Key": ACCESS_KEY,"X-Api-Resource-Id": "volc.bigasr.auc","X-Api-Request-Id": task_id,"X-Tt-Logid": log_id}async with aiohttp.ClientSession() as session:async with session.post(query_url, headers=query_headers, data="{}") as response:query_status = response.headers.get("X-Api-Status-Code")if query_status == "20000000": # 转写完成result = await response.json()text = result.get("result", {}).get("text", "")return {"text": text}elif query_status in ["20000001", "20000002"]: # 处理中continueelse:return {"error": f"查询转写结果失败: {response.headers.get('X-Api-Message', '未知错误')}"}return {"error": "转写超时,请稍后查询结果"}except Exception as e:return {"error": f"转写过程发生错误: {str(e)}"}if __name__ == "__main__":import uvicornuvicorn.run(app, host="0.0.0.0", port=8000)
结论
构建一个简洁的音频转写系统可以不依赖数据库,只需要专注于文件上传、获取URL和ASR转写三个核心功能。通过集成火山引擎ASR服务,我们可以快速实现高质量的语音转文本功能,无需自行构建复杂的语音识别模型。
本文的最小可行实现充分利用了火山引擎ASR的API功能,提供了一个完整的工作流程,包括文件上传、URL生成和转写调用。这种方式不仅开发效率高,而且可以在不断迭代中逐步增强功能。
进一步的拓展方向
在有了最小可行实现后,可以考虑以下拓展:
- 添加数据库存储转写历史
- 实现用户认证和授权
- 支持实时语音转写
- 多语言转写支持
- 说话人分离功能
- 情感分析集成
- 关键词提取和主题识别