SenseVoice-small-onnx部署案例Airflow调度定时语音转写任务结果邮件推送1. 项目背景与需求场景在日常工作中我们经常需要处理大量的语音文件转写任务。比如会议录音整理、客服电话转录、多媒体内容处理等。传统的手工处理方式效率低下而自动化的语音识别服务能够显著提升工作效率。SenseVoice-small-onnx是一个基于ONNX量化的多语言语音识别模型支持中文、粤语、英语、日语、韩语等多种语言。结合Airflow工作流调度系统我们可以构建一个完整的自动化语音处理流水线。典型应用场景每日会议录音自动转写和分发客服质量检查录音批量处理多媒体内容生产自动化流水线多语言视频字幕生成2. 技术架构设计2.1 整体架构我们的解决方案采用分层架构设计语音文件输入 → Airflow调度 → SenseVoice识别 → 结果处理 → 邮件推送2.2 核心组件SenseVoice-small-onnx: 语音识别引擎负责音频到文本的转换Apache Airflow: 工作流调度系统管理任务依赖和执行时序SMTP服务: 邮件推送服务用于结果分发文件存储: 本地或云存储用于管理输入输出文件3. 环境准备与部署3.1 SenseVoice服务部署首先部署语音识别服务# 创建项目目录 mkdir voice-transcription-pipeline cd voice-transcription-pipeline # 安装依赖 pip install funasr-onnx gradio fastapi uvicorn soundfile jieba pip install apache-airflow python-dotemail smtplib # 启动SenseVoice服务 python3 app.py --host 0.0.0.0 --port 7860 3.2 Airflow环境配置配置Airflow工作流管理系统# 设置Airflow家目录 export AIRFLOW_HOME~/airflow # 初始化Airflow数据库 airflow db init # 创建管理员用户 airflow users create \ --username admin \ --firstname Admin \ --lastname User \ --role Admin \ --email adminexample.com \ --password admin4. 核心代码实现4.1 语音识别服务调用创建语音识别工具类# voice_recognizer.py import requests import json import os from typing import List, Dict class VoiceRecognizer: def __init__(self, base_urlhttp://localhost:7860): self.base_url base_url def transcribe_audio(self, audio_path: str, language: str auto) - Dict: 转录单个音频文件 try: with open(audio_path, rb) as audio_file: files { file: (os.path.basename(audio_path), audio_file), language: (None, language), use_itn: (None, true) } response requests.post( f{self.base_url}/api/transcribe, filesfiles ) if response.status_code 200: return response.json() else: raise Exception(f转录失败: {response.text}) except Exception as e: print(f处理音频 {audio_path} 时出错: {str(e)}) return {error: str(e)} def batch_transcribe(self, audio_files: List[str], language: str auto) - List[Dict]: 批量转录音频文件 results [] for audio_file in audio_files: result self.transcribe_audio(audio_file, language) results.append({ file: audio_file, result: result }) return results4.2 邮件推送服务创建邮件发送工具类# email_sender.py import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart import json from datetime import datetime class EmailSender: def __init__(self, smtp_server, smtp_port, username, password): self.smtp_server smtp_server self.smtp_port smtp_port self.username username self.password password def send_transcription_results(self, recipient: str, results: List[Dict], subject_prefix: str 语音转写结果): 发送转录结果邮件 # 构建邮件内容 subject f{subject_prefix} - {datetime.now().strftime(%Y-%m-%d %H:%M)} # HTML格式的邮件内容 html_content self._generate_html_content(results) # 创建邮件 msg MIMEMultipart() msg[From] self.username msg[To] recipient msg[Subject] subject # 添加HTML内容 msg.attach(MIMEText(html_content, html)) # 发送邮件 try: with smtplib.SMTP(self.smtp_server, self.smtp_port) as server: server.starttls() server.login(self.username, self.password) server.send_message(msg) print(f邮件已发送至: {recipient}) except Exception as e: print(f邮件发送失败: {str(e)}) def _generate_html_content(self, results: List[Dict]) - str: 生成HTML格式的邮件内容 html html head style body { font-family: Arial, sans-serif; margin: 20px; } .result { margin-bottom: 20px; padding: 10px; border: 1px solid #ddd; } .file-name { font-weight: bold; color: #333; } .transcription { margin-top: 10px; padding: 10px; background-color: #f9f9f9; } .error { color: red; } /style /head body h2语音转写结果汇总/h2 p处理时间: {time}/p p总计处理: {count} 个文件/p .format( timedatetime.now().strftime(%Y-%m-%d %H:%M:%S), countlen(results) ) for result in results: html div classresult html fdiv classfile-name文件: {result[file]}/div if error in result[result]: html fdiv classerror错误: {result[result][error]}/div else: transcription result[result].get(text, 无转录结果) html fdiv classtranscription转录结果: {transcription}/div html /div html /body /html return html4.3 Airflow DAG定义创建主要的调度任务# airflow_dag.py from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator import os from voice_recognizer import VoiceRecognizer from email_sender import EmailSender default_args { owner: airflow, depends_on_past: False, email_on_failure: False, email_on_retry: False, retries: 1, retry_delay: timedelta(minutes5), } def process_audio_files(): 处理音频文件的主要任务 # 配置参数 audio_dir /path/to/audio/files # 音频文件目录 output_dir /path/to/output # 输出目录 recipient_email recipientexample.com # 接收邮箱 # 确保输出目录存在 os.makedirs(output_dir, exist_okTrue) # 获取待处理的音频文件 audio_files [] for file in os.listdir(audio_dir): if file.endswith((.wav, .mp3, .m4a, .flac)): audio_files.append(os.path.join(audio_dir, file)) if not audio_files: print(没有找到待处理的音频文件) return print(f找到 {len(audio_files)} 个待处理音频文件) # 初始化语音识别器 recognizer VoiceRecognizer() # 批量处理音频文件 results recognizer.batch_transcribe(audio_files, languageauto) # 保存结果到文件 timestamp datetime.now().strftime(%Y%m%d_%H%M%S) output_file os.path.join(output_dir, ftranscription_results_{timestamp}.json) with open(output_file, w, encodingutf-8) as f: json.dump(results, f, ensure_asciiFalse, indent2) print(f结果已保存至: {output_file}) # 发送邮件通知 email_sender EmailSender( smtp_serversmtp.example.com, smtp_port587, usernameyour_emailexample.com, passwordyour_password ) email_sender.send_transcription_results(recipient_email, results) # 处理完成后移动音频文件 processed_dir os.path.join(audio_dir, processed) os.makedirs(processed_dir, exist_okTrue) for audio_file in audio_files: filename os.path.basename(audio_file) os.rename(audio_file, os.path.join(processed_dir, filename)) print(所有音频文件处理完成) # 定义DAG dag DAG( voice_transcription_pipeline, default_argsdefault_args, description定时语音转写任务流水线, schedule_intervaltimedelta(hours1), # 每小时执行一次 start_datedatetime(2024, 1, 1), catchupFalse, ) # 定义任务 process_task PythonOperator( task_idprocess_audio_files, python_callableprocess_audio_files, dagdag, )5. 系统配置与优化5.1 配置文件管理创建配置文件管理环境变量# config.py import os from dotenv import load_dotenv load_dotenv() class Config: # SenseVoice配置 SENSEVOICE_HOST os.getenv(SENSEVOICE_HOST, localhost) SENSEVOICE_PORT os.getenv(SENSEVOICE_PORT, 7860) # 邮件配置 SMTP_SERVER os.getenv(SMTP_SERVER, smtp.example.com) SMTP_PORT int(os.getenv(SMTP_PORT, 587)) SMTP_USERNAME os.getenv(SMTP_USERNAME, ) SMTP_PASSWORD os.getenv(SMTP_PASSWORD, ) # 文件路径配置 AUDIO_INPUT_DIR os.getenv(AUDIO_INPUT_DIR, /data/audio/input) AUDIO_PROCESSED_DIR os.getenv(AUDIO_PROCESSED_DIR, /data/audio/processed) OUTPUT_DIR os.getenv(OUTPUT_DIR, /data/output) # 收件人配置 RECIPIENT_EMAILS os.getenv(RECIPIENT_EMAILS, ).split(,)5.2 错误处理与重试机制增强系统的健壮性# error_handler.py import logging from typing import Callable from functools import wraps logger logging.getLogger(__name__) def retry_on_failure(max_retries: int 3, delay: int 5): 重试装饰器 def decorator(func: Callable): wraps(func) def wrapper(*args, **kwargs): last_exception None for attempt in range(max_retries): try: return func(*args, **kwargs) except Exception as e: last_exception e logger.warning(f尝试 {attempt 1}/{max_retries} 失败: {str(e)}) if attempt max_retries - 1: time.sleep(delay) raise last_exception return wrapper return decorator class ErrorHandler: staticmethod def handle_transcription_error(audio_file: str, error: Exception): 处理转录错误 error_log { file: audio_file, error: str(error), timestamp: datetime.now().isoformat() } # 记录错误日志 error_dir /var/log/voice_transcription os.makedirs(error_dir, exist_okTrue) error_file os.path.join(error_dir, ferrors_{datetime.now().strftime(%Y%m%d)}.log) with open(error_file, a) as f: f.write(json.dumps(error_log) \n) logger.error(f处理文件 {audio_file} 时出错: {str(error)})6. 部署与运行6.1 环境变量配置创建.env配置文件# SenseVoice配置 SENSEVOICE_HOSTlocalhost SENSEVOICE_PORT7860 # 邮件配置 SMTP_SERVERsmtp.gmail.com SMTP_PORT587 SMTP_USERNAMEyour_emailgmail.com SMTP_PASSWORDyour_app_password # 文件路径 AUDIO_INPUT_DIR/data/audio/input AUDIO_PROCESSED_DIR/data/audio/processed OUTPUT_DIR/data/output # 收件人 RECIPIENT_EMAILSrecipient1example.com,recipient2example.com6.2 启动服务启动完整的服务栈# 启动SenseVoice服务 nohup python3 app.py --host 0.0.0.0 --port 7860 sensevoice.log 21 # 启动Airflow调度器 airflow scheduler # 启动Airflow Web服务器 airflow webserver --port 8080 # 启用DAG airflow dags unpause voice_transcription_pipeline6.3 监控与日志设置日志监控# 查看SenseVoice日志 tail -f sensevoice.log # 查看Airflow日志 tail -f ~/airflow/logs/voice_transcription_pipeline/process_audio_files/*.log # 查看错误日志 tail -f /var/log/voice_transcription/errors_*.log7. 实际应用效果7.1 性能表现基于SenseVoice-small-onnx量化模型的语音识别服务表现出色处理速度: 10秒音频推理仅需70ms准确率: 多语言识别准确率达到90%以上并发能力: 支持批量处理适合大规模应用场景资源占用: 量化模型仅230MB内存占用低7.2 自动化效益通过Airflow调度系统实现的自动化流水线带来显著效益效率提升: 无需人工干预自动处理语音文件及时性: 定时任务确保结果及时推送可追溯: 完整的日志记录和错误处理机制扩展性: 易于添加新的处理步骤和功能8. 总结与展望本案例展示了如何将SenseVoice-small-onnx语音识别模型与Airflow工作流调度系统结合构建一个完整的自动化语音转写流水线。该系统具有以下特点核心优势多语言支持: 自动识别中文、英语、日语、韩语、粤语等50多种语言高效处理: 基于ONNX量化模型推理速度快资源占用低完整自动化: 从文件处理到结果推送的全流程自动化健壮可靠: 完善的错误处理和重试机制易于扩展: 模块化设计便于添加新功能适用场景企业会议记录自动化整理客服质量监控和录音分析多媒体内容生产流水线多语言视频字幕生成语音数据批处理任务未来可以进一步扩展的功能包括实时语音识别、语音情感分析、说话人分离、自定义词汇表等以满足更复杂的业务需求。获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。