跳转到内容

Webhook

OneLiveRec 提供 Webhook,允许外部服务在直播监控与录制事件发生时进行响应。

Webhook 可用于自动化,例如:

  • 自动上传录制文件
  • 当直播开始时发送通知
  • 触发后处理流程
  • 监控系统错误

每个事件都会以 HTTP POST 请求(JSON Payload) 的形式发送。


所有 Webhook 事件都共享统一的结构。

事件示例 Payload
{
"id": "398ea849-7c4b-45c4-bd2f-093df217266c",
"ts": "2026-03-11T05:22:38.393477700Z",
"type": "event_type",
"data": {}
}
字段类型说明
idstring事件唯一标识
tsstring事件时间戳(RFC3339)
typestring事件类型
dataobject事件相关数据

表示直播的元数据。

示例
{
"uid": "<USER ID>",
"uname": "<USER NAME>",
"avatar": "https://live.example.com/avatar.jpg",
"title": "<TITLE>",
"cover": "https://live.example.com/cover.jpg",
"categories": [
"<PRIMARY CATEGORY>",
"<SECONDARY CATEGORY>"
],
"status": "live",
"live_id": "<LIVE ID>",
"start_time": "2026-03-11T05:22:38Z"
}
字段类型说明
uidstring平台用户 ID
unamestring主播用户名
avatarstring头像 URL
titlestring直播标题
coverstring直播封面图
categoriesstring[]直播分类
statusstringliveoffline
live_idstringnull
start_timestringnull

当 OneLiveRec 启动时触发。

示例
{
"type": "app_launch"
}

常见用途:

  • 启动监控
  • 健康检查
  • 日志记录

当 OneLiveRec 退出时触发。

示例
{
"type": "app_exit"
}

常见用途:

  • 告警通知
  • 自动重启流程

当被监控的频道 开始直播 时触发。

示例
{
"type": "live_start",
"data": {
"platform": "<PLATFORM>",
"channel": "<CHANNEL>",
"url": "https://live.example.com",
"live_info": {}
}
}
字段说明
platform平台名称
channel频道标识
url直播地址
live_info当前直播元数据

当直播 结束 时触发。

示例
{
"type": "live_end",
"data": {
"platform": "<PLATFORM>",
"channel": "<CHANNEL>",
"url": "https://live.example.com",
"live_info": {}
}
}

常见用途:

  • 归档录制文件
  • 触发后处理流程

当录制文件 开始写入 时触发。

示例
{
"type": "video_file_create",
"data": {
"platform": "<PLATFORM>",
"channel": "<CHANNEL>",
"path": "path/to/file.ts"
}
}

当录制文件 写入完成 时触发。

示例
{
"type": "video_file_finish",
"data": {
"platform": "<PLATFORM>",
"channel": "<CHANNEL>",
"path": "path/to/file.mp4",
"filesize": 1073741824,
"duration": 3600
}
}
字段类型说明
pathstring输出文件路径
filesizenumber文件大小(字节)
durationnumber视频时长(秒)

视频转封装(transmux)完成 时触发。

示例:

  • .ts → .mp4
  • .flv → .mp4
  • .ts → .mkv
示例
{
"type": "video_transmux_finish",
"data": {
"platform": "<PLATFORM>",
"channel": "<CHANNEL>",
"input": "path/to/input.file",
"output": "path/to/output.file"
}
}

当直播过程中 标题发生变化 时触发。

示例
{
"type": "title_change",
"data": {
"platform": "<PLATFORM>",
"channel": "<CHANNEL>",
"old_live_info": {},
"new_live_info": {}
}
}
字段说明
old_live_info修改前的直播信息
new_live_info修改后的直播信息

当直播 分类发生变化 时触发。

示例
{
"type": "category_change",
"data": {
"platform": "<PLATFORM>",
"channel": "<CHANNEL>",
"old_live_info": {},
"new_live_info": {}
}
}

当 OneLiveRec 发生错误时触发。

示例
{
"type": "error",
"data": {
"platform": "<PLATFORM>",
"channel": "<CHANNEL>",
"error": "Something went wrong"
}
}

常见用途:

  • 监控系统
  • 告警系统
  • 自动重试

OneLiveRec 可以通过 Webhook 轻松集成各种自动化脚本。

例如,通过脚本 uploader.py,你可以在录制完成后自动将视频上传到 Pixeldrain 云端。

# /// script
# dependencies = [
# "fastapi",
# "uvicorn",
# "httpx",
# "pydantic",
# "loguru",
# "tenacity",
# "tqdm",
# "anyio",
# ]
# ///
import os
import httpx
import uvicorn
import secrets
import itertools
import anyio
from typing import Dict, Any
from fastapi import FastAPI, BackgroundTasks, Query, HTTPException
from pydantic import BaseModel
from loguru import logger
from tenacity import retry, stop_after_attempt, wait_fixed, before_sleep_log
from tqdm import tqdm
# --- CONFIGURATION ---
PIXELDRAIN_API_KEY = "5f45f184-64bb-4eaa-be19-4a5f0459db49"
WEBHOOK_SECRET_KEY = "my_super_secret_webhook_key_123"
PORT = 5000
# --- PATH MARKERS ---
DOCKER_REC_MARKER = "/app/rec"
DOCKER_BASE = "/app"
# --- LOGGING CONFIG ---
LOG_DIR = "./logs"
os.makedirs(LOG_DIR, exist_ok=True)
logger.add(
f"{LOG_DIR}/uploader_{{time:YYYY-MM-DD}}.log",
rotation="10 MB",
retention="30 days",
compression="zip",
level="DEBUG",
enqueue=True,
)
# --- RETRY SETTINGS ---
MAX_RETRIES = 3
RETRY_DELAY = 10
bar_position = itertools.count()
# --- UTILS ---
class UploadProgressGenerator:
"""An async iterator that yields chunks and updates tqdm for httpx 'content'."""
def __init__(self, file_path, pbar):
self.file_path = file_path
self.pbar = pbar
async def __aiter__(self):
# anyio provides asynchronous file access compatible with httpx streaming
async with await anyio.open_file(self.file_path, "rb") as f:
while True:
chunk = await f.read(1024 * 1024) # 1MB chunks
if not chunk:
break
self.pbar.update(len(chunk))
yield chunk
# --- CORE LOGIC ---
@retry(
stop=stop_after_attempt(MAX_RETRIES),
wait=wait_fixed(RETRY_DELAY),
before_sleep=before_sleep_log(logger, "DEBUG"),
reraise=True,
)
async def execute_upload(file_path: str, filename: str, position: int):
file_size = os.path.getsize(file_path)
url = f"https://pixeldrain.com/api/file/{filename}"
pbar = tqdm(
total=file_size,
unit="B",
unit_scale=True,
desc=f"🚀 {filename}",
position=position,
leave=False,
)
try:
async with httpx.AsyncClient(timeout=60) as client:
# Create the async generator for the request body
upload_gen = UploadProgressGenerator(file_path, pbar)
# PUT with 'content' as an async iterable is the most stable stream method
response = await client.put(
url, auth=("", PIXELDRAIN_API_KEY), content=upload_gen
)
if response.status_code not in [200, 201]:
logger.error(f"Server rejected PUT: {response.text}")
raise Exception(f"Pixeldrain Error {response.status_code}")
return response.json().get("id")
finally:
pbar.close()
async def process_and_upload(incoming_path: str):
if incoming_path.startswith(DOCKER_REC_MARKER):
rel_path = os.path.relpath(incoming_path, DOCKER_BASE)
real_path = os.path.abspath(os.path.join(os.getcwd(), rel_path))
else:
real_path = incoming_path
if not os.path.exists(real_path):
logger.error(f"File not found: {real_path}")
return
filename = os.path.basename(real_path)
pos = next(bar_position)
try:
logger.info(f"Stream Upload Started: {filename}")
file_id = await execute_upload(real_path, filename, pos)
logger.success(f"SUCCESS: {filename} -> https://pixeldrain.com/u/{file_id}")
except Exception as e:
logger.error(f"Failed to upload {filename} after {MAX_RETRIES} attempts.")
logger.exception(e)
# --- WEB SERVER ---
app = FastAPI()
class OneLiveRecWebhook(BaseModel):
id: str
ts: str
type: str
data: Dict[str, Any]
@app.post("/webhook")
async def handle_webhook(
payload: OneLiveRecWebhook, background_tasks: BackgroundTasks, key: str = Query(...)
):
if not secrets.compare_digest(key, WEBHOOK_SECRET_KEY):
raise HTTPException(status_code=403, detail="Forbidden")
logger.debug(
f"Received {payload.type} from {payload.data.get('platform')}-{payload.data.get('channel')}"
)
if payload.type == "video_transmux_finish":
file_path = payload.data.get("output")
if file_path:
logger.info(f"Webhook Received: {os.path.basename(file_path)}")
background_tasks.add_task(process_and_upload, file_path)
return {"status": "success"}
return {"status": "ignored"}
if __name__ == "__main__":
logger.info(f"Uploader running at {os.getcwd()}")
uvicorn.run(app, host="0.0.0.0", port=PORT)

uv 是一款全能型 Python 管理工具。它负责处理所有依赖,让你无需担心 Python 版本或 Pip 环境问题。

  • Windows (PowerShell):
Terminal window
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"
  • macOS / Linux:
Terminal window
curl -LsSf https://astral.sh/uv/install.sh | sh
  • 完成后重启你的终端。
  1. 将上述代码保存为 uploader.py,并放入你的 OneLiveRec 根目录(即包含 compose.yamlrec/ 文件夹的目录)。
  2. 打开脚本并编辑以下两行:
  • PIXELDRAIN_API_KEY: 填入你的 Pixeldrain API 密钥。
  • WEBHOOK_SECRET_KEY: 设置一个你自定义的随机密码。

在终端进入该文件夹并输入:

Terminal window
uv run uploader.py
  • 执行逻辑: uv 会自动下载所需的库,并在 5000 端口启动 Webhook 服务器。
  1. 打开 OneLiveRec
  2. 前往 设置 -> 自动化 -> Webhooks
  3. 新建一个 Webhook,将 URL 设置为:http://你的服务器IP:5000/webhook?key=你的密钥

flowchart TD

A[OneLiveRec] -->|Webhook POST| B[Webhook 服务器]

B --> C{事件类型}

C -->|live_start| D[通知系统]

C -->|video_file_finish| E[后处理流程]

C -->|video_transmux_finish| F[自动上传]

F --> G[Pixeldrain API]

C -->|error| H[告警系统]

示例服务器提供:

POST /webhook

示例 URL:

http://localhost:5000/webhook?key=SECRET

自动化脚本监听事件:

video_transmux_finish

触发后:

  1. 提取输出文件路径
  2. 解析 Docker 路径
  3. 上传文件
  4. 若失败则自动重试

示例代码:

if payload.type == "video_transmux_finish":
file_path = payload.data.get("output")
background_tasks.add_task(process_and_upload, file_path)

上传进度通过 tqdm 显示。

示例输出:

🚀 stream_record_01.mp4 1.2GB / 1.2GB

Webhook 端点应始终进行保护。

推荐方法:

  • secret 查询参数
  • 反向代理认证
  • HMAC 签名
  • IP 白名单