Webhook
OneLiveRec 提供 Webhook,允许外部服务在直播监控与录制事件发生时进行响应。
Webhook 可用于自动化,例如:
- 自动上传录制文件
- 当直播开始时发送通知
- 触发后处理流程
- 监控系统错误
每个事件都会以 HTTP POST 请求(JSON Payload) 的形式发送。
所有 Webhook 事件都共享统一的结构。
事件示例 Payload
{ "id": "398ea849-7c4b-45c4-bd2f-093df217266c", "ts": "2026-03-11T05:22:38.393477700Z", "type": "event_type", "data": {}}| 字段 | 类型 | 说明 |
|---|---|---|
id | string | 事件唯一标识 |
ts | string | 事件时间戳(RFC3339) |
type | string | 事件类型 |
data | object | 事件相关数据 |
通用数据结构
Section titled “通用数据结构”live_info
Section titled “live_info”表示直播的元数据。
示例
{ "uid": "<USER ID>", "uname": "<USER NAME>", "avatar": "https://live.example.com/avatar.jpg", "title": "<TITLE>", "cover": "https://live.example.com/cover.jpg", "categories": [ "<PRIMARY CATEGORY>", "<SECONDARY CATEGORY>" ], "status": "live", "live_id": "<LIVE ID>", "start_time": "2026-03-11T05:22:38Z"}| 字段 | 类型 | 说明 |
|---|---|---|
uid | string | 平台用户 ID |
uname | string | 主播用户名 |
avatar | string | 头像 URL |
title | string | 直播标题 |
cover | string | 直播封面图 |
categories | string[] | 直播分类 |
status | string | live 或 offline |
live_id | string | null |
start_time | string | null |
app_launch
Section titled “app_launch”当 OneLiveRec 启动时触发。
示例
{ "type": "app_launch"}常见用途:
- 启动监控
- 健康检查
- 日志记录
app_exit
Section titled “app_exit”当 OneLiveRec 退出时触发。
示例
{ "type": "app_exit"}常见用途:
- 告警通知
- 自动重启流程
live_start
Section titled “live_start”当被监控的频道 开始直播 时触发。
示例
{ "type": "live_start", "data": { "platform": "<PLATFORM>", "channel": "<CHANNEL>", "url": "https://live.example.com", "live_info": {} }}| 字段 | 说明 |
|---|---|
platform | 平台名称 |
channel | 频道标识 |
url | 直播地址 |
live_info | 当前直播元数据 |
live_end
Section titled “live_end”当直播 结束 时触发。
示例
{ "type": "live_end", "data": { "platform": "<PLATFORM>", "channel": "<CHANNEL>", "url": "https://live.example.com", "live_info": {} }}常见用途:
- 归档录制文件
- 触发后处理流程
video_file_create
Section titled “video_file_create”当录制文件 开始写入 时触发。
示例
{ "type": "video_file_create", "data": { "platform": "<PLATFORM>", "channel": "<CHANNEL>", "path": "path/to/file.ts" }}video_file_finish
Section titled “video_file_finish”当录制文件 写入完成 时触发。
示例
{ "type": "video_file_finish", "data": { "platform": "<PLATFORM>", "channel": "<CHANNEL>", "path": "path/to/file.mp4", "filesize": 1073741824, "duration": 3600 }}| 字段 | 类型 | 说明 |
|---|---|---|
path | string | 输出文件路径 |
filesize | number | 文件大小(字节) |
duration | number | 视频时长(秒) |
video_transmux_finish
Section titled “video_transmux_finish”当 视频转封装(transmux)完成 时触发。
示例:
.ts → .mp4.flv → .mp4.ts → .mkv
示例
{ "type": "video_transmux_finish", "data": { "platform": "<PLATFORM>", "channel": "<CHANNEL>", "input": "path/to/input.file", "output": "path/to/output.file" }}title_change
Section titled “title_change”当直播过程中 标题发生变化 时触发。
示例
{ "type": "title_change", "data": { "platform": "<PLATFORM>", "channel": "<CHANNEL>", "old_live_info": {}, "new_live_info": {} }}| 字段 | 说明 |
|---|---|
old_live_info | 修改前的直播信息 |
new_live_info | 修改后的直播信息 |
category_change
Section titled “category_change”当直播 分类发生变化 时触发。
示例
{ "type": "category_change", "data": { "platform": "<PLATFORM>", "channel": "<CHANNEL>", "old_live_info": {}, "new_live_info": {} }}当 OneLiveRec 发生错误时触发。
示例
{ "type": "error", "data": { "platform": "<PLATFORM>", "channel": "<CHANNEL>", "error": "Something went wrong" }}常见用途:
- 监控系统
- 告警系统
- 自动重试
OneLiveRec 可以通过 Webhook 轻松集成各种自动化脚本。
例如,通过脚本 uploader.py,你可以在录制完成后自动将视频上传到 Pixeldrain 云端。
# /// script# dependencies = [# "fastapi",# "uvicorn",# "httpx",# "pydantic",# "loguru",# "tenacity",# "tqdm",# "anyio",# ]# ///
import osimport httpximport uvicornimport secretsimport itertoolsimport anyiofrom typing import Dict, Anyfrom fastapi import FastAPI, BackgroundTasks, Query, HTTPExceptionfrom pydantic import BaseModelfrom loguru import loggerfrom tenacity import retry, stop_after_attempt, wait_fixed, before_sleep_logfrom tqdm import tqdm
# --- CONFIGURATION ---PIXELDRAIN_API_KEY = "5f45f184-64bb-4eaa-be19-4a5f0459db49"WEBHOOK_SECRET_KEY = "my_super_secret_webhook_key_123"PORT = 5000
# --- PATH MARKERS ---DOCKER_REC_MARKER = "/app/rec"DOCKER_BASE = "/app"
# --- LOGGING CONFIG ---LOG_DIR = "./logs"os.makedirs(LOG_DIR, exist_ok=True)
logger.add( f"{LOG_DIR}/uploader_{{time:YYYY-MM-DD}}.log", rotation="10 MB", retention="30 days", compression="zip", level="DEBUG", enqueue=True,)
# --- RETRY SETTINGS ---MAX_RETRIES = 3RETRY_DELAY = 10
bar_position = itertools.count()
# --- UTILS ---
class UploadProgressGenerator: """An async iterator that yields chunks and updates tqdm for httpx 'content'."""
def __init__(self, file_path, pbar): self.file_path = file_path self.pbar = pbar
async def __aiter__(self): # anyio provides asynchronous file access compatible with httpx streaming async with await anyio.open_file(self.file_path, "rb") as f: while True: chunk = await f.read(1024 * 1024) # 1MB chunks if not chunk: break self.pbar.update(len(chunk)) yield chunk
# --- CORE LOGIC ---
@retry( stop=stop_after_attempt(MAX_RETRIES), wait=wait_fixed(RETRY_DELAY), before_sleep=before_sleep_log(logger, "DEBUG"), reraise=True,)async def execute_upload(file_path: str, filename: str, position: int): file_size = os.path.getsize(file_path)
url = f"https://pixeldrain.com/api/file/{filename}"
pbar = tqdm( total=file_size, unit="B", unit_scale=True, desc=f"🚀 {filename}", position=position, leave=False, )
try: async with httpx.AsyncClient(timeout=60) as client: # Create the async generator for the request body upload_gen = UploadProgressGenerator(file_path, pbar)
# PUT with 'content' as an async iterable is the most stable stream method response = await client.put( url, auth=("", PIXELDRAIN_API_KEY), content=upload_gen )
if response.status_code not in [200, 201]: logger.error(f"Server rejected PUT: {response.text}") raise Exception(f"Pixeldrain Error {response.status_code}")
return response.json().get("id") finally: pbar.close()
async def process_and_upload(incoming_path: str): if incoming_path.startswith(DOCKER_REC_MARKER): rel_path = os.path.relpath(incoming_path, DOCKER_BASE) real_path = os.path.abspath(os.path.join(os.getcwd(), rel_path)) else: real_path = incoming_path
if not os.path.exists(real_path): logger.error(f"File not found: {real_path}") return
filename = os.path.basename(real_path) pos = next(bar_position)
try: logger.info(f"Stream Upload Started: {filename}") file_id = await execute_upload(real_path, filename, pos) logger.success(f"SUCCESS: {filename} -> https://pixeldrain.com/u/{file_id}") except Exception as e: logger.error(f"Failed to upload {filename} after {MAX_RETRIES} attempts.") logger.exception(e)
# --- WEB SERVER ---
app = FastAPI()
class OneLiveRecWebhook(BaseModel): id: str ts: str type: str data: Dict[str, Any]
@app.post("/webhook")async def handle_webhook( payload: OneLiveRecWebhook, background_tasks: BackgroundTasks, key: str = Query(...)): if not secrets.compare_digest(key, WEBHOOK_SECRET_KEY): raise HTTPException(status_code=403, detail="Forbidden")
logger.debug( f"Received {payload.type} from {payload.data.get('platform')}-{payload.data.get('channel')}" )
if payload.type == "video_transmux_finish": file_path = payload.data.get("output") if file_path: logger.info(f"Webhook Received: {os.path.basename(file_path)}") background_tasks.add_task(process_and_upload, file_path) return {"status": "success"}
return {"status": "ignored"}
if __name__ == "__main__": logger.info(f"Uploader running at {os.getcwd()}") uvicorn.run(app, host="0.0.0.0", port=PORT)运行示例脚本
Section titled “运行示例脚本”1. 安装 uv
Section titled “1. 安装 uv”uv 是一款全能型 Python 管理工具。它负责处理所有依赖,让你无需担心 Python 版本或 Pip 环境问题。
- Windows (PowerShell):
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"- macOS / Linux:
curl -LsSf https://astral.sh/uv/install.sh | sh- 完成后重启你的终端。
2. 配置脚本
Section titled “2. 配置脚本”- 将上述代码保存为
uploader.py,并放入你的 OneLiveRec 根目录(即包含compose.yaml和rec/文件夹的目录)。 - 打开脚本并编辑以下两行:
PIXELDRAIN_API_KEY: 填入你的 Pixeldrain API 密钥。WEBHOOK_SECRET_KEY: 设置一个你自定义的随机密码。
3. 启动服务
Section titled “3. 启动服务”在终端进入该文件夹并输入:
uv run uploader.py- 执行逻辑:
uv会自动下载所需的库,并在 5000 端口启动 Webhook 服务器。
4. 关联 OneLiveRec
Section titled “4. 关联 OneLiveRec”- 打开 OneLiveRec
- 前往 设置 -> 自动化 -> Webhooks。
- 新建一个 Webhook,将 URL 设置为:
http://你的服务器IP:5000/webhook?key=你的密钥
flowchart TD
A[OneLiveRec] -->|Webhook POST| B[Webhook 服务器]
B --> C{事件类型}
C -->|live_start| D[通知系统]
C -->|video_file_finish| E[后处理流程]
C -->|video_transmux_finish| F[自动上传]
F --> G[Pixeldrain API]
C -->|error| H[告警系统]
Webhook 端点信息
Section titled “Webhook 端点信息”示例服务器提供:
POST /webhook示例 URL:
http://localhost:5000/webhook?key=SECRET上传触发逻辑
Section titled “上传触发逻辑”自动化脚本监听事件:
video_transmux_finish触发后:
- 提取输出文件路径
- 解析 Docker 路径
- 上传文件
- 若失败则自动重试
示例代码:
if payload.type == "video_transmux_finish": file_path = payload.data.get("output") background_tasks.add_task(process_and_upload, file_path)上传进度通过 tqdm 显示。
示例输出:
🚀 stream_record_01.mp4 1.2GB / 1.2GBWebhook 端点应始终进行保护。
推荐方法:
- secret 查询参数
- 反向代理认证
- HMAC 签名
- IP 白名单