Skip to Content
Documentation处理流程Worker 任务处理

Worker 任务处理

Worker 是连接 API 与 Speech 的桥梁:领取任务、拉取/上传音频、消费 SSE 流、增量回写字幕。

运行模型

  • 轮询间隔:WORKER_POLL_INTERVAL_SECONDS(默认 10 秒)
  • 鉴权:请求头 X-Worker-Key: {WORKER_API_KEY}

任务认领机制

API 使用乐观锁防止重复认领:

-- 伪代码 SELECT id FROM Media WHERE subtitleStatus = 'pending' ... ORDER BY createdAt LIMIT 1 UPDATE Media SET subtitleStatus = 'processing' WHERE id = ? AND subtitleStatus = 'pending' -- count = 0 表示被其他 Worker 抢走,返回 null

转写任务详解

1. 获取音频

  • 若上游 YouTube 任务刚完成,使用本地临时文件
  • 否则从 API 返回的 audioDownloadUrl(R2 预签名 GET)下载

2. 调用 Speech SSE

POST {SPEECH_SERVICE_URL}/transcribe Content-Type: multipart/form-data file=<音频> language={subtitleLanguage} # 默认 ja target_language={SUBTITLE_TARGET_LANGUAGE} # 默认 zh

Worker 解析 SSE 事件:

事件处理
segment映射为字幕片段,POST /subtitles/segments 写入 API
translation更新对应片段的 translation 字段
error抛出异常,任务标记 failed
done结束流

每写入 10 段或首段时,Worker 通过 PATCH /jobs/{id} 更新处理日志,供后台排查。

3. 完成标记

  • 有片段时:发送 { segments: [], done: true } 触发 API 侧 subtitleStatus = success
  • 最终 PATCH 写入 status: success 与汇总日志

4. 清理

临时音频文件在处理结束后删除(cleanupTempPath)。

字幕增量写入

POST /api/worker/jobs/{id}/subtitles/segments 支持:

  • 追加模式:传入新 segments 数组,按 segmentIndex upsert
  • 翻译更新:同一 index 再次写入时合并 translation
  • 完成标记done: true 时将媒体状态置为 success

这意味着 App 可以在 processing 阶段分页拉取已产出的片段,实现流式字幕体验

片段数据结构

每个 segment 包含:

字段说明
id片段序号(0-based)
text转写原文
start / end秒级时间戳
tokens日语分词(Sudachi)
bunsetu文节分析(Ginza)
translation译文
translation_cached是否命中翻译缓存

失败处理

任意步骤异常时:

  1. Worker 捕获错误,格式化 stack
  2. PATCH /jobs/{id}status: failed,写入 error 与完整 log
  3. App 播放器展示 subtitleError

常见失败原因:Speech 服务不可达、Whisper 模型缺失、R2 上传失败、YouTube 下载被限流。

配置参考

worker/.env.example

MEMSHARE_API_BASE_URL=http://localhost:8080/api WORKER_API_KEY=change-me SPEECH_SERVICE_URL=http://localhost:8000 SUBTITLE_TARGET_LANGUAGE=zh WORKER_POLL_INTERVAL_SECONDS=10