mirror of
https://github.com/datascale-ai/opentalking.git
synced 2026-07-03 15:22:34 +08:00
1079 lines
41 KiB
Python
1079 lines
41 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import tempfile
|
|
import uuid
|
|
import wave
|
|
from pathlib import Path
|
|
from collections.abc import Mapping
|
|
from typing import Any
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
from opentalking.avatar.fasterliveportrait_config import normalize_fasterliveportrait_runtime_config
|
|
from opentalking.avatar.loader import load_avatar_bundle
|
|
from opentalking.core.model_config import get_model_config
|
|
from opentalking.core.types.frames import VideoFrameData
|
|
from opentalking.export_store import create_video_export
|
|
from opentalking.models.registry import get_adapter
|
|
from opentalking.providers.stt.dashscope.adapter import decode_audio_file_to_pcm_i16
|
|
from opentalking.providers.synthesis.audio2video_client import LocalAudio2VideoClient, OmniRTAudio2VideoClient
|
|
from opentalking.providers.synthesis.backends import resolve_model_backend
|
|
from opentalking.providers.synthesis.flashtalk.ws_client import FlashTalkWSClient
|
|
from opentalking.providers.synthesis.omnirt import auth_headers, resolve_synthesis_ws_url
|
|
from opentalking.providers.tts.factory import build_tts_adapter
|
|
from opentalking.scene_assets import SceneAssetStore
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
REFERENCE_DRIVER_AUDIO_PATH = Path(__file__).resolve().parent / "assets" / "reference_drivers" / "flashtalk_default_driver.wav"
|
|
|
|
SUPPORTED_VIDEO_CREATION_MODELS = {
|
|
"flashtalk",
|
|
"flashhead",
|
|
"fasterliveportrait",
|
|
"musetalk",
|
|
"quicktalk",
|
|
"wav2lip",
|
|
}
|
|
|
|
|
|
def _settings_path(settings: object, name: str, default: str) -> Path:
|
|
return Path(str(getattr(settings, name, default) or default)).expanduser().resolve()
|
|
|
|
|
|
def _settings_int(settings: object, name: str, default: int) -> int:
|
|
try:
|
|
return int(getattr(settings, name, default))
|
|
except (TypeError, ValueError):
|
|
return default
|
|
|
|
|
|
def _settings_float(settings: object, name: str, default: float) -> float:
|
|
try:
|
|
return float(getattr(settings, name, default))
|
|
except (TypeError, ValueError):
|
|
return default
|
|
|
|
|
|
def _export_with_download_url(item: dict[str, Any]) -> dict[str, Any]:
|
|
return {**item, "download_url": f"/exports/videos/{item['id']}/download"}
|
|
|
|
|
|
def _safe_title(title: str | None, *, model: str, avatar_id: str) -> str:
|
|
value = (title or "").strip()
|
|
return value or f"视频创作 · {model} · {avatar_id}"
|
|
|
|
|
|
def _reference_duration_options(settings: object) -> set[int]:
|
|
raw = str(getattr(settings, "video_creation_reference_durations", "") or "10,30,60")
|
|
options: set[int] = set()
|
|
for part in raw.split(","):
|
|
try:
|
|
value = int(part.strip())
|
|
except ValueError:
|
|
continue
|
|
if value > 0:
|
|
options.add(value)
|
|
return options or {10, 30, 60}
|
|
|
|
|
|
def _validate_reference_duration(settings: object, duration_sec: int | None) -> int:
|
|
options = _reference_duration_options(settings)
|
|
value = min(options) if duration_sec is None else int(duration_sec)
|
|
if value not in options:
|
|
allowed = ", ".join(str(item) for item in sorted(options))
|
|
raise ValueError(f"duration_sec must be one of: {allowed}")
|
|
return value
|
|
|
|
|
|
def _coerce_composition_float(
|
|
payload: Mapping[str, object],
|
|
key: str,
|
|
default: float,
|
|
*,
|
|
min_value: float,
|
|
max_value: float,
|
|
) -> float:
|
|
raw = payload.get(key)
|
|
if raw in (None, ""):
|
|
return default
|
|
if not isinstance(raw, str | int | float):
|
|
raise ValueError(f"{key} must be a number")
|
|
try:
|
|
value = float(raw)
|
|
except (TypeError, ValueError) as exc:
|
|
raise ValueError(f"{key} must be a number") from exc
|
|
if value < min_value or value > max_value:
|
|
raise ValueError(f"{key} must be between {min_value:g} and {max_value:g}")
|
|
return value
|
|
|
|
|
|
def _coerce_composition_int(
|
|
payload: Mapping[str, object],
|
|
key: str,
|
|
default: int,
|
|
*,
|
|
min_value: int,
|
|
max_value: int,
|
|
) -> int:
|
|
raw = payload.get(key)
|
|
if raw in (None, ""):
|
|
value = default
|
|
elif isinstance(raw, str | int | float):
|
|
try:
|
|
value = int(raw)
|
|
except (TypeError, ValueError) as exc:
|
|
raise ValueError(f"{key} must be an integer") from exc
|
|
else:
|
|
raise ValueError(f"{key} must be an integer")
|
|
if value < min_value or value > max_value:
|
|
raise ValueError(f"{key} must be between {min_value:g} and {max_value:g}")
|
|
return value + (value % 2)
|
|
|
|
|
|
def _normalize_video_composition_config(
|
|
settings: object,
|
|
avatar_path: Path,
|
|
config: Mapping[str, object] | None,
|
|
) -> dict[str, object] | None:
|
|
if not config:
|
|
return None
|
|
background_id = str(config.get("background_id") or "").strip()
|
|
if not background_id:
|
|
return None
|
|
store = SceneAssetStore(_settings_path(settings, "scene_assets_dir", "./data/scene-assets"))
|
|
background = next((item for item in store.list_backgrounds() if item.get("id") == background_id), None)
|
|
if background is None:
|
|
raise ValueError("background_id not found")
|
|
if str(background.get("kind") or "") == "video":
|
|
raise ValueError("video backgrounds are not supported for video creation")
|
|
background_path = store.background_file_path(background_id)
|
|
if background_path is None:
|
|
raise FileNotFoundError("background file not found")
|
|
avatar_fit = str(config.get("avatar_fit") or "contain").strip()
|
|
avatar_anchor = str(config.get("avatar_anchor") or "center").strip()
|
|
if avatar_fit not in {"contain", "cover"}:
|
|
raise ValueError("invalid avatar_fit")
|
|
if avatar_anchor not in {"center", "bottom", "left", "right"}:
|
|
raise ValueError("invalid avatar_anchor")
|
|
return {
|
|
"background_path": background_path,
|
|
"avatar_mask_path": _reference_image_path(avatar_path),
|
|
"avatar_fit": avatar_fit,
|
|
"avatar_anchor": avatar_anchor,
|
|
"avatar_scale": _coerce_composition_float(config, "avatar_scale", 1.0, min_value=0.1, max_value=4.0),
|
|
"avatar_offset_x": _coerce_composition_float(config, "avatar_offset_x", 0.0, min_value=-2000.0, max_value=2000.0),
|
|
"avatar_offset_y": _coerce_composition_float(config, "avatar_offset_y", 0.0, min_value=-2000.0, max_value=2000.0),
|
|
"output_width": _coerce_composition_int(config, "output_width", 1280, min_value=320, max_value=3840),
|
|
"output_height": _coerce_composition_int(config, "output_height", 720, min_value=180, max_value=2160),
|
|
}
|
|
|
|
|
|
def _resize_cover(image: np.ndarray, width: int, height: int) -> np.ndarray:
|
|
src_h, src_w = image.shape[:2]
|
|
scale = max(float(width) / float(src_w), float(height) / float(src_h))
|
|
new_w = max(1, int(round(src_w * scale)))
|
|
new_h = max(1, int(round(src_h * scale)))
|
|
resized = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA if scale < 1 else cv2.INTER_LINEAR)
|
|
left = max(0, (new_w - width) // 2)
|
|
top = max(0, (new_h - height) // 2)
|
|
return np.ascontiguousarray(resized[top:top + height, left:left + width])
|
|
|
|
|
|
def _avatar_anchor_origin(anchor: str, canvas_w: int, canvas_h: int, layer_w: int, layer_h: int) -> tuple[int, int]:
|
|
if anchor == "bottom":
|
|
return (canvas_w - layer_w) // 2, canvas_h - layer_h
|
|
if anchor == "left":
|
|
return 0, (canvas_h - layer_h) // 2
|
|
if anchor == "right":
|
|
return canvas_w - layer_w, (canvas_h - layer_h) // 2
|
|
return (canvas_w - layer_w) // 2, (canvas_h - layer_h) // 2
|
|
|
|
|
|
def _load_avatar_alpha_mask(path: object) -> np.ndarray | None:
|
|
image = cv2.imread(str(path), cv2.IMREAD_UNCHANGED)
|
|
if image is None or image.ndim != 3 or image.shape[2] < 4:
|
|
return None
|
|
return image[:, :, 3].astype(np.float32) / 255.0
|
|
|
|
|
|
def _composite_avatar_layer(
|
|
background: np.ndarray,
|
|
frame: np.ndarray,
|
|
*,
|
|
avatar_fit: str,
|
|
avatar_anchor: str,
|
|
avatar_scale: float,
|
|
avatar_offset_x: float,
|
|
avatar_offset_y: float,
|
|
fallback_alpha: np.ndarray | None = None,
|
|
) -> np.ndarray:
|
|
canvas_h, canvas_w = background.shape[:2]
|
|
layer = np.asarray(frame, dtype=np.uint8)
|
|
if layer.ndim != 3 or layer.shape[2] < 3:
|
|
return background
|
|
bgr = layer[:, :, :3]
|
|
if layer.shape[2] >= 4:
|
|
alpha = layer[:, :, 3].astype(np.float32) / 255.0
|
|
elif fallback_alpha is not None:
|
|
alpha = fallback_alpha
|
|
if alpha.shape[:2] != bgr.shape[:2]:
|
|
alpha = cv2.resize(alpha, (bgr.shape[1], bgr.shape[0]), interpolation=cv2.INTER_AREA).astype(np.float32)
|
|
else:
|
|
alpha = np.ones(layer.shape[:2], dtype=np.float32)
|
|
fit_scale = min(float(canvas_w) / float(bgr.shape[1]), float(canvas_h) / float(bgr.shape[0]))
|
|
if avatar_fit == "cover":
|
|
fit_scale = max(float(canvas_w) / float(bgr.shape[1]), float(canvas_h) / float(bgr.shape[0]))
|
|
scale = max(0.01, fit_scale * float(avatar_scale))
|
|
layer_w = max(1, int(round(bgr.shape[1] * scale)))
|
|
layer_h = max(1, int(round(bgr.shape[0] * scale)))
|
|
bgr_resized = cv2.resize(bgr, (layer_w, layer_h), interpolation=cv2.INTER_AREA if scale < 1 else cv2.INTER_LINEAR)
|
|
alpha_resized = cv2.resize(alpha, (layer_w, layer_h), interpolation=cv2.INTER_AREA if scale < 1 else cv2.INTER_LINEAR)
|
|
origin_x, origin_y = _avatar_anchor_origin(avatar_anchor, canvas_w, canvas_h, layer_w, layer_h)
|
|
left = int(round(origin_x + avatar_offset_x))
|
|
top = int(round(origin_y + avatar_offset_y))
|
|
dst_left = max(0, left)
|
|
dst_top = max(0, top)
|
|
dst_right = min(canvas_w, left + layer_w)
|
|
dst_bottom = min(canvas_h, top + layer_h)
|
|
if dst_left >= dst_right or dst_top >= dst_bottom:
|
|
return background
|
|
src_left = dst_left - left
|
|
src_top = dst_top - top
|
|
src_right = src_left + (dst_right - dst_left)
|
|
src_bottom = src_top + (dst_bottom - dst_top)
|
|
out = background.copy()
|
|
fg = bgr_resized[src_top:src_bottom, src_left:src_right].astype(np.float32)
|
|
mask = alpha_resized[src_top:src_bottom, src_left:src_right].astype(np.float32)[:, :, None]
|
|
bg = out[dst_top:dst_bottom, dst_left:dst_right].astype(np.float32)
|
|
out[dst_top:dst_bottom, dst_left:dst_right] = np.clip((fg * mask) + (bg * (1.0 - mask)), 0, 255).astype(np.uint8)
|
|
return out
|
|
|
|
|
|
def _apply_video_composition(
|
|
frames: list[np.ndarray],
|
|
*,
|
|
config: Mapping[str, object] | None,
|
|
) -> list[np.ndarray]:
|
|
if not frames or not config:
|
|
return frames
|
|
first = np.asarray(frames[0])
|
|
frame_height, frame_width = first.shape[:2]
|
|
width = _coerce_composition_int(config, "output_width", int(frame_width), min_value=320, max_value=3840)
|
|
height = _coerce_composition_int(config, "output_height", int(frame_height), min_value=180, max_value=2160)
|
|
background_raw = cv2.imread(str(config["background_path"]), cv2.IMREAD_COLOR)
|
|
if background_raw is None:
|
|
raise FileNotFoundError("background file not found")
|
|
background = _resize_cover(background_raw, int(width), int(height))
|
|
fallback_alpha = _load_avatar_alpha_mask(config.get("avatar_mask_path"))
|
|
avatar_scale = _coerce_composition_float(config, "avatar_scale", 1.0, min_value=0.1, max_value=4.0)
|
|
avatar_offset_x = _coerce_composition_float(config, "avatar_offset_x", 0.0, min_value=-2000.0, max_value=2000.0)
|
|
avatar_offset_y = _coerce_composition_float(config, "avatar_offset_y", 0.0, min_value=-2000.0, max_value=2000.0)
|
|
return [
|
|
_composite_avatar_layer(
|
|
background,
|
|
frame,
|
|
avatar_fit=str(config.get("avatar_fit") or "contain"),
|
|
avatar_anchor=str(config.get("avatar_anchor") or "center"),
|
|
avatar_scale=avatar_scale,
|
|
avatar_offset_x=avatar_offset_x,
|
|
avatar_offset_y=avatar_offset_y,
|
|
fallback_alpha=fallback_alpha,
|
|
)
|
|
for frame in frames
|
|
]
|
|
|
|
|
|
def _build_reference_driver_pcm(total_samples: int, *, level: float = 480.0) -> np.ndarray:
|
|
samples = max(0, int(total_samples))
|
|
if samples == 0:
|
|
return np.zeros(0, dtype=np.int16)
|
|
amplitude = max(1.0, min(float(level), 32767.0))
|
|
t = np.arange(samples, dtype=np.float32) / 16000.0
|
|
carrier = np.sin(2.0 * np.pi * 120.0 * t) + 0.35 * np.sin(2.0 * np.pi * 240.0 * t)
|
|
envelope = 0.55 + 0.45 * np.sin(2.0 * np.pi * 1.8 * t) ** 2
|
|
pcm = carrier * envelope * (amplitude / 1.35)
|
|
return np.clip(np.rint(pcm), -32768, 32767).astype(np.int16)
|
|
|
|
|
|
def _reference_driver_audio_path(settings: object) -> Path:
|
|
raw = str(getattr(settings, "video_creation_reference_driver_audio", "") or "").strip()
|
|
return Path(raw).expanduser().resolve() if raw else REFERENCE_DRIVER_AUDIO_PATH
|
|
|
|
|
|
def _fit_reference_driver_pcm(pcm: np.ndarray, total_samples: int) -> np.ndarray:
|
|
target = max(0, int(total_samples))
|
|
if target == 0:
|
|
return np.zeros(0, dtype=np.int16)
|
|
source = np.asarray(pcm, dtype=np.int16).reshape(-1)
|
|
if source.size == 0:
|
|
raise ValueError("reference driver audio decoded to empty PCM")
|
|
repeats = int(np.ceil(float(target) / float(source.size)))
|
|
return np.tile(source, repeats)[:target].astype(np.int16, copy=False)
|
|
|
|
|
|
def _read_pcm16_mono_wav(path: Path) -> np.ndarray | None:
|
|
try:
|
|
with wave.open(str(path), "rb") as wf:
|
|
if wf.getnchannels() != 1 or wf.getsampwidth() != 2 or wf.getframerate() != 16000:
|
|
return None
|
|
raw = wf.readframes(wf.getnframes())
|
|
except (wave.Error, OSError):
|
|
return None
|
|
return np.frombuffer(raw, dtype="<i2").copy()
|
|
|
|
|
|
async def _load_reference_driver_pcm(settings: object, total_samples: int) -> np.ndarray | None:
|
|
path = _reference_driver_audio_path(settings)
|
|
if not path.is_file():
|
|
return None
|
|
direct_pcm = _read_pcm16_mono_wav(path)
|
|
if direct_pcm is not None:
|
|
return _fit_reference_driver_pcm(direct_pcm, total_samples)
|
|
try:
|
|
pcm = await decode_audio_file_to_pcm_i16(path)
|
|
return _fit_reference_driver_pcm(pcm, total_samples)
|
|
except Exception as exc: # noqa: BLE001
|
|
log.warning("reference driver audio unavailable, falling back to synthetic PCM: path=%s error=%s", path, exc)
|
|
return None
|
|
|
|
|
|
def _avatar_dir(settings: object, avatar_id: str) -> Path:
|
|
value = avatar_id.strip()
|
|
if not value:
|
|
raise ValueError("avatar_id is required")
|
|
avatars_root = _settings_path(settings, "avatars_dir", "./examples/avatars")
|
|
target = (avatars_root / value).resolve()
|
|
try:
|
|
target.relative_to(avatars_root)
|
|
except ValueError as exc:
|
|
raise ValueError("invalid avatar_id") from exc
|
|
if not target.is_dir():
|
|
raise FileNotFoundError("avatar not found")
|
|
load_avatar_bundle(target, strict=False)
|
|
return target
|
|
|
|
|
|
def _normalize_model(model: str) -> str:
|
|
value = (model or "").strip().lower()
|
|
if value not in SUPPORTED_VIDEO_CREATION_MODELS:
|
|
raise ValueError(
|
|
"video creation only supports flashtalk, flashhead, fasterliveportrait, musetalk, quicktalk, and wav2lip"
|
|
)
|
|
return value
|
|
|
|
|
|
def _reference_image_path(avatar_path: Path) -> Path:
|
|
for name in ("reference.png", "reference.jpg", "reference.jpeg", "reference.webp", "preview.png"):
|
|
path = avatar_path / name
|
|
if path.is_file():
|
|
return path.resolve()
|
|
raise FileNotFoundError("avatar reference image not found")
|
|
|
|
|
|
_FASTLIVEPORTRAIT_VIDEO_CONFIG_KEYS = (
|
|
"width",
|
|
"height",
|
|
"fps",
|
|
"chunk_samples",
|
|
"emit_frames_per_chunk",
|
|
"render_keyframes_per_chunk",
|
|
"disable_frame_interpolation",
|
|
"head_motion_multiplier",
|
|
"pose_motion_multiplier",
|
|
"yaw_multiplier",
|
|
"pitch_multiplier",
|
|
"roll_multiplier",
|
|
"animation_region",
|
|
"expression_multiplier",
|
|
"mouth_open_multiplier",
|
|
"mouth_corner_multiplier",
|
|
"cheek_jaw_multiplier",
|
|
"driving_multiplier",
|
|
"cfg_scale",
|
|
"cfg_cond",
|
|
"flag_stitching",
|
|
"flag_pasteback",
|
|
"flag_normalize_lip",
|
|
"flag_relative_motion",
|
|
"flag_lip_retargeting",
|
|
"lip_retargeting_multiplier",
|
|
"lip_retargeting_min",
|
|
"lip_retargeting_max",
|
|
"lip_retargeting_noise_floor",
|
|
"head_only_pasteback",
|
|
"lookahead_ms",
|
|
)
|
|
|
|
VIDEO_CREATION_FASTLIVEPORTRAIT_DEFAULT_CONFIG: dict[str, object] = {
|
|
"head_motion_multiplier": 0.3,
|
|
"pose_motion_multiplier": 0.35,
|
|
"yaw_multiplier": 0.85,
|
|
"pitch_multiplier": 1.0,
|
|
"roll_multiplier": 0.85,
|
|
"animation_region": "lip",
|
|
"expression_multiplier": 1.0,
|
|
"mouth_open_multiplier": 0.9,
|
|
"mouth_corner_multiplier": 0.85,
|
|
"cheek_jaw_multiplier": 0.9,
|
|
"driving_multiplier": 1.0,
|
|
"cfg_scale": 3.0,
|
|
"flag_stitching": True,
|
|
"flag_pasteback": True,
|
|
"flag_relative_motion": True,
|
|
"flag_normalize_lip": False,
|
|
"flag_lip_retargeting": False,
|
|
}
|
|
|
|
|
|
def _fasterliveportrait_video_config(
|
|
raw: Mapping[str, object] | None,
|
|
) -> dict[str, object] | None:
|
|
base = get_model_config("fasterliveportrait")
|
|
out: dict[str, object] = {}
|
|
for key in _FASTLIVEPORTRAIT_VIDEO_CONFIG_KEYS:
|
|
value = base.get(key)
|
|
if value is not None:
|
|
out[key] = value
|
|
out.update(VIDEO_CREATION_FASTLIVEPORTRAIT_DEFAULT_CONFIG)
|
|
out.update(normalize_fasterliveportrait_runtime_config(dict(raw or {})))
|
|
return out or None
|
|
|
|
|
|
def _fasterliveportrait_preroll_samples(settings: object, model: str, sample_rate: int) -> int:
|
|
if model != "fasterliveportrait":
|
|
return 0
|
|
preroll_ms = _settings_int(settings, "video_creation_fasterliveportrait_preroll_ms", 400)
|
|
if preroll_ms <= 0 or sample_rate <= 0:
|
|
return 0
|
|
return max(0, int(round(float(sample_rate) * float(preroll_ms) / 1000.0)))
|
|
|
|
|
|
def _audio2video_client(settings: object, model: str, sample_rate: int, backend: object | None = None):
|
|
backend = backend or resolve_model_backend(model, settings)
|
|
backend_name = str(getattr(backend, "backend", "") or "").strip().lower()
|
|
if backend_name in {"omnirt", "direct_ws"}:
|
|
if model == "flashhead":
|
|
from opentalking.providers.synthesis.flashhead import FlashHeadWSClient
|
|
|
|
return OmniRTAudio2VideoClient(
|
|
FlashHeadWSClient(
|
|
ws_url=str(getattr(backend, "ws_url", "") or getattr(settings, "flashhead_ws_url", "") or ""),
|
|
model=str(getattr(settings, "flashhead_model", "") or "soulx-flashhead-1.3b"),
|
|
config={
|
|
"fps": int(getattr(settings, "flashhead_fps", 25) or 25),
|
|
"sample_rate": int(getattr(settings, "flashhead_sample_rate", 16000) or 16000),
|
|
"width": int(getattr(settings, "flashhead_width", 416) or 416),
|
|
"height": int(getattr(settings, "flashhead_height", 704) or 704),
|
|
"frame_num": int(getattr(settings, "flashhead_frame_num", 25) or 25),
|
|
"chunk_samples": int(getattr(settings, "flashhead_chunk_samples", 16000) or 16000),
|
|
},
|
|
)
|
|
)
|
|
ws_url = str(getattr(backend, "ws_url", "") or "") if backend_name == "direct_ws" else resolve_synthesis_ws_url(model, settings)
|
|
headers = auth_headers(settings)
|
|
return OmniRTAudio2VideoClient(
|
|
FlashTalkWSClient(ws_url, extra_headers=headers or None)
|
|
)
|
|
if backend_name != "local":
|
|
raise ValueError(f"video creation does not support {model} backend: {backend_name or 'unknown'}")
|
|
return LocalAudio2VideoClient(
|
|
get_adapter(model),
|
|
device=_device_for_model(settings, model),
|
|
sample_rate=sample_rate,
|
|
)
|
|
|
|
|
|
def _remote_audio2video_backend(backend: object) -> bool:
|
|
return str(getattr(backend, "backend", "") or "").strip().lower() in {"omnirt", "direct_ws"}
|
|
|
|
|
|
def _avatar_manifest(avatar_path: Path):
|
|
return load_avatar_bundle(avatar_path, strict=False).manifest
|
|
|
|
|
|
def _resolve_avatar_relative_path(avatar_path: Path, raw: object) -> Path | None:
|
|
value = str(raw or "").strip()
|
|
if not value:
|
|
return None
|
|
avatar_root = avatar_path.resolve()
|
|
path = (avatar_root / value).resolve()
|
|
try:
|
|
path.relative_to(avatar_root)
|
|
except ValueError:
|
|
return None
|
|
return path
|
|
|
|
|
|
def _avatar_manifest_metadata(avatar_path: Path) -> dict[str, Any]:
|
|
metadata = _avatar_manifest(avatar_path).metadata
|
|
return dict(metadata or {}) if isinstance(metadata, dict) else {}
|
|
|
|
|
|
def _quicktalk_manifest_section(avatar_path: Path) -> dict[str, Any]:
|
|
quicktalk = _avatar_manifest_metadata(avatar_path).get("quicktalk")
|
|
return dict(quicktalk) if isinstance(quicktalk, dict) else {}
|
|
|
|
|
|
def _quicktalk_video_config(avatar_path: Path) -> dict[str, int]:
|
|
manifest = _avatar_manifest(avatar_path)
|
|
out: dict[str, int] = {}
|
|
for key in ("width", "height"):
|
|
try:
|
|
value = int(getattr(manifest, key) or 0)
|
|
except (TypeError, ValueError):
|
|
value = 0
|
|
if value > 0:
|
|
out[key] = value
|
|
out["fps"] = 25
|
|
return out
|
|
|
|
|
|
def _settings_or_env_int(settings: object, attr: str, env_names: tuple[str, ...], default: int) -> int:
|
|
raw: Any = getattr(settings, attr, None)
|
|
if raw in (None, ""):
|
|
for env_name in env_names:
|
|
env_value = os.environ.get(env_name)
|
|
if env_value not in (None, ""):
|
|
raw = env_value
|
|
break
|
|
try:
|
|
value: Any = raw if raw not in (None, "") else default
|
|
return int(value)
|
|
except (TypeError, ValueError):
|
|
return default
|
|
|
|
|
|
def _even_video_dim(value: int) -> int:
|
|
value = max(2, int(value))
|
|
return value + (value % 2)
|
|
|
|
|
|
def _quicktalk_cache_video_size(settings: object, avatar_path: Path) -> tuple[int, int] | None:
|
|
config = _quicktalk_video_config(avatar_path)
|
|
width = int(config.get("width") or 0)
|
|
height = int(config.get("height") or 0)
|
|
if width <= 0 or height <= 0:
|
|
return None
|
|
max_long_edge = _settings_or_env_int(
|
|
settings,
|
|
"quicktalk_max_long_edge",
|
|
("OPENTALKING_QUICKTALK_MAX_LONG_EDGE", "OMNIRT_QUICKTALK_MAX_LONG_EDGE"),
|
|
900,
|
|
)
|
|
if max_long_edge <= 0:
|
|
max_long_edge = 900
|
|
long_edge = max(width, height)
|
|
if long_edge > max_long_edge:
|
|
scale = float(max_long_edge) / float(long_edge)
|
|
width = max(2, int(round(width * scale)))
|
|
height = max(2, int(round(height * scale)))
|
|
width -= width % 2
|
|
height -= height % 2
|
|
else:
|
|
width = _even_video_dim(width)
|
|
height = _even_video_dim(height)
|
|
return width, height
|
|
|
|
|
|
def _prepared_quicktalk_path(settings: object, avatar_path: Path, prefix: str, suffix: str) -> Path | None:
|
|
quicktalk_dir = avatar_path / "quicktalk"
|
|
cache_size = _quicktalk_cache_video_size(settings, avatar_path)
|
|
if not quicktalk_dir.is_dir() or cache_size is None:
|
|
return None
|
|
width, height = cache_size
|
|
path = (quicktalk_dir / f"{prefix}_{width}x{height}.{suffix}").resolve()
|
|
try:
|
|
path.relative_to(avatar_path.resolve())
|
|
except ValueError:
|
|
return None
|
|
return path if path.is_file() else None
|
|
|
|
|
|
def _quicktalk_declared_video_source(avatar_path: Path) -> Path | None:
|
|
metadata = _avatar_manifest_metadata(avatar_path)
|
|
quicktalk = _quicktalk_manifest_section(avatar_path)
|
|
for source in (metadata, quicktalk):
|
|
for key in ("source_video", "template_video"):
|
|
path = _resolve_avatar_relative_path(avatar_path, source.get(key))
|
|
if path is not None and path.is_file():
|
|
return path
|
|
return None
|
|
|
|
|
|
def _quicktalk_prepared_template_video(settings: object, avatar_path: Path) -> Path | None:
|
|
prepared = _prepared_quicktalk_path(settings, avatar_path, "template", "mp4")
|
|
if prepared is not None:
|
|
return prepared
|
|
|
|
quicktalk_dir = avatar_path / "quicktalk"
|
|
preferred = quicktalk_dir / "template_900.mp4"
|
|
if preferred.is_file():
|
|
return preferred.resolve()
|
|
if quicktalk_dir.is_dir():
|
|
for candidate in sorted(quicktalk_dir.glob("template_*.mp4")):
|
|
candidate = candidate.resolve()
|
|
try:
|
|
candidate.relative_to(avatar_path.resolve())
|
|
except ValueError:
|
|
continue
|
|
if candidate.is_file():
|
|
return candidate
|
|
|
|
for name in ("idle.mp4", "idle.mov", "idle.webm", "idle.avi", "source.mp4"):
|
|
path = avatar_path / name
|
|
if path.is_file():
|
|
return path.resolve()
|
|
source_dir = avatar_path / "source"
|
|
if source_dir.is_dir():
|
|
for pattern in ("*.mp4", "*.mov", "*.webm", "*.avi"):
|
|
for candidate in sorted(source_dir.glob(pattern)):
|
|
if candidate.is_file():
|
|
return candidate.resolve()
|
|
return None
|
|
|
|
|
|
def _quicktalk_template_video(settings: object, avatar_path: Path) -> Path | None:
|
|
declared = _quicktalk_declared_video_source(avatar_path)
|
|
if declared is not None:
|
|
return declared
|
|
return _quicktalk_prepared_template_video(settings, avatar_path)
|
|
|
|
|
|
def _quicktalk_template_frame_dir(avatar_path: Path) -> Path | None:
|
|
metadata = _avatar_manifest_metadata(avatar_path)
|
|
if str(metadata.get("reference_mode") or "").strip().lower() != "frames":
|
|
return None
|
|
raw = str(metadata.get("frame_dir") or "frames").strip() or "frames"
|
|
frame_dir = _resolve_avatar_relative_path(avatar_path, raw)
|
|
return frame_dir if frame_dir is not None and frame_dir.is_dir() else None
|
|
|
|
|
|
def _quicktalk_face_cache(settings: object, avatar_path: Path) -> Path | None:
|
|
prepared = _prepared_quicktalk_path(settings, avatar_path, "face_cache_v3", "npz")
|
|
if prepared is not None:
|
|
return prepared
|
|
|
|
quicktalk = _quicktalk_manifest_section(avatar_path)
|
|
path = _resolve_avatar_relative_path(avatar_path, quicktalk.get("face_cache"))
|
|
if path is not None and path.is_file():
|
|
return path
|
|
|
|
quicktalk_dir = avatar_path / "quicktalk"
|
|
if not quicktalk_dir.is_dir():
|
|
return None
|
|
candidates = [quicktalk_dir / "face_cache_v3_900.npz", *sorted(quicktalk_dir.glob("face_cache_v3_*.npz"))]
|
|
seen: set[Path] = set()
|
|
for candidate in candidates:
|
|
candidate = candidate.resolve()
|
|
if candidate in seen:
|
|
continue
|
|
seen.add(candidate)
|
|
try:
|
|
candidate.relative_to(avatar_path.resolve())
|
|
except ValueError:
|
|
continue
|
|
if candidate.is_file():
|
|
return candidate
|
|
return None
|
|
|
|
|
|
def _quicktalk_init_session_kwargs(settings: object, avatar_path: Path) -> dict[str, object]:
|
|
kwargs: dict[str, object] = {"video_config": _quicktalk_video_config(avatar_path)}
|
|
declared_video = _quicktalk_declared_video_source(avatar_path)
|
|
template_video = declared_video or _quicktalk_prepared_template_video(settings, avatar_path)
|
|
template_frame_dir = _quicktalk_template_frame_dir(avatar_path)
|
|
if template_video is not None:
|
|
kwargs["template_mode"] = "video"
|
|
kwargs["template_video"] = template_video
|
|
elif template_frame_dir is not None:
|
|
kwargs["template_mode"] = "frames"
|
|
kwargs["template_frame_dir"] = template_frame_dir
|
|
else:
|
|
kwargs["template_mode"] = "image"
|
|
if declared_video is None:
|
|
face_cache = _quicktalk_face_cache(settings, avatar_path)
|
|
if face_cache is not None:
|
|
kwargs["quicktalk_face_cache"] = face_cache
|
|
return kwargs
|
|
|
|
|
|
def _init_session_kwargs(
|
|
*,
|
|
settings: object,
|
|
model: str,
|
|
avatar_path: Path,
|
|
backend: object,
|
|
fasterliveportrait_config: Mapping[str, object] | None,
|
|
) -> dict[str, object]:
|
|
kwargs: dict[str, object] = {"avatar_path": avatar_path}
|
|
if model == "quicktalk":
|
|
kwargs.update(_quicktalk_init_session_kwargs(settings, avatar_path))
|
|
if not _remote_audio2video_backend(backend):
|
|
return kwargs
|
|
|
|
kwargs["ref_image"] = _reference_image_path(avatar_path)
|
|
if model == "quicktalk":
|
|
return kwargs
|
|
if model != "fasterliveportrait":
|
|
return kwargs
|
|
|
|
video_config = _fasterliveportrait_video_config(fasterliveportrait_config)
|
|
if video_config:
|
|
kwargs["video_config"] = video_config
|
|
return kwargs
|
|
|
|
|
|
def _device_for_model(settings: object, model: str) -> str:
|
|
if model == "quicktalk":
|
|
return str(
|
|
getattr(settings, "quicktalk_device", "")
|
|
or getattr(settings, "torch_device", "")
|
|
or "cuda:0"
|
|
)
|
|
if model == "wav2lip":
|
|
return str(
|
|
getattr(settings, "wav2lip_device", "")
|
|
or getattr(settings, "torch_device", "")
|
|
or "cuda"
|
|
)
|
|
return str(getattr(settings, "torch_device", "") or "cuda")
|
|
|
|
|
|
def _frame_array(frame: VideoFrameData | Any) -> np.ndarray | None:
|
|
data = getattr(frame, "data", frame)
|
|
arr = np.asarray(data)
|
|
if arr.ndim != 3 or arr.shape[2] < 3:
|
|
return None
|
|
channels = 4 if arr.shape[2] >= 4 else 3
|
|
return np.ascontiguousarray(arr[:, :, :channels].astype(np.uint8, copy=False))
|
|
|
|
|
|
def _write_wav(path: Path, pcm: np.ndarray, sample_rate: int = 16000) -> None:
|
|
arr = np.asarray(pcm, dtype="<i2").reshape(-1)
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with wave.open(str(path), "wb") as wf:
|
|
wf.setnchannels(1)
|
|
wf.setsampwidth(2)
|
|
wf.setframerate(int(sample_rate))
|
|
wf.writeframes(arr.tobytes())
|
|
|
|
|
|
def _write_video_only(path: Path, frames: list[np.ndarray], fps: float) -> None:
|
|
if not frames:
|
|
raise RuntimeError("video creation produced zero frames")
|
|
first = np.asarray(frames[0], dtype=np.uint8)
|
|
height, width = first.shape[:2]
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
fourcc = getattr(cv2, "VideoWriter_fourcc")
|
|
writer = cv2.VideoWriter(
|
|
str(path),
|
|
fourcc(*"mp4v"),
|
|
max(1.0, float(fps)),
|
|
(int(width), int(height)),
|
|
)
|
|
if not writer.isOpened():
|
|
raise RuntimeError(f"cannot open video writer: {path}")
|
|
try:
|
|
for frame in frames:
|
|
arr = np.asarray(frame, dtype=np.uint8)
|
|
if arr.shape[:2] != (height, width):
|
|
resized = cv2.resize(arr, (width, height), interpolation=cv2.INTER_AREA)
|
|
arr = np.asarray(resized, dtype=np.uint8)
|
|
if arr.ndim == 3 and arr.shape[2] >= 4:
|
|
arr = arr[:, :, :3]
|
|
writer.write(arr)
|
|
finally:
|
|
writer.release()
|
|
|
|
|
|
async def _ffmpeg_mux(ffmpeg_bin: str, video_in: Path, audio_in: Path, out_mp4: Path) -> None:
|
|
out_mp4.parent.mkdir(parents=True, exist_ok=True)
|
|
proc = await asyncio.create_subprocess_exec(
|
|
ffmpeg_bin,
|
|
"-y",
|
|
"-hide_banner",
|
|
"-loglevel",
|
|
"error",
|
|
"-i",
|
|
str(video_in),
|
|
"-i",
|
|
str(audio_in),
|
|
"-c:v",
|
|
"libx264",
|
|
"-pix_fmt",
|
|
"yuv420p",
|
|
"-c:a",
|
|
"aac",
|
|
"-shortest",
|
|
str(out_mp4),
|
|
stdout=asyncio.subprocess.DEVNULL,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
_, stderr = await proc.communicate()
|
|
if proc.returncode != 0:
|
|
detail = (stderr or b"").decode("utf-8", errors="replace")[:1200]
|
|
raise RuntimeError(f"ffmpeg mux failed ({proc.returncode}): {detail}")
|
|
|
|
|
|
class VideoCreationService:
|
|
def __init__(self, settings: object) -> None:
|
|
self.settings = settings
|
|
|
|
async def create_from_audio_file(
|
|
self,
|
|
*,
|
|
model: str,
|
|
avatar_id: str,
|
|
upload_path: Path,
|
|
title: str,
|
|
mime_type: str | None = None,
|
|
fasterliveportrait_config: Mapping[str, object] | None = None,
|
|
composition_config: Mapping[str, object] | None = None,
|
|
) -> dict[str, Any]:
|
|
pcm = await decode_audio_file_to_pcm_i16(upload_path)
|
|
if pcm.size == 0:
|
|
raise ValueError("audio decoded to empty PCM")
|
|
return await self._create_from_pcm(
|
|
model=model,
|
|
avatar_id=avatar_id,
|
|
pcm=pcm,
|
|
title=title,
|
|
source="upload",
|
|
fasterliveportrait_config=fasterliveportrait_config,
|
|
composition_config=composition_config,
|
|
)
|
|
|
|
async def create_from_tts_text(
|
|
self,
|
|
*,
|
|
model: str,
|
|
avatar_id: str,
|
|
text: str,
|
|
title: str,
|
|
tts_provider: str | None,
|
|
tts_model: str | None,
|
|
voice: str | None,
|
|
source: str = "tts_text",
|
|
fasterliveportrait_config: Mapping[str, object] | None = None,
|
|
indextts_config: Mapping[str, object] | None = None,
|
|
composition_config: Mapping[str, object] | None = None,
|
|
) -> dict[str, Any]:
|
|
text_value = text.strip()
|
|
if not text_value:
|
|
raise ValueError("text is required")
|
|
sample_rate = int(getattr(self.settings, "tts_sample_rate", 16000) or 16000)
|
|
tts = build_tts_adapter(
|
|
sample_rate=sample_rate,
|
|
chunk_ms=40.0,
|
|
settings=self.settings,
|
|
default_voice=voice,
|
|
tts_provider=tts_provider,
|
|
tts_model=tts_model,
|
|
indextts_config=indextts_config,
|
|
)
|
|
chunks: list[np.ndarray] = []
|
|
try:
|
|
async for chunk in tts.synthesize_stream(text_value, voice=voice):
|
|
arr = np.asarray(chunk.data, dtype=np.int16).reshape(-1)
|
|
if arr.size:
|
|
chunks.append(arr.copy())
|
|
sample_rate = int(chunk.sample_rate or sample_rate)
|
|
finally:
|
|
close = getattr(tts, "aclose", None)
|
|
if close is not None:
|
|
await close()
|
|
if not chunks:
|
|
raise RuntimeError("TTS returned no audio")
|
|
pcm = np.concatenate(chunks).astype(np.int16, copy=False)
|
|
if sample_rate != 16000:
|
|
pcm = await self._resample_pcm(pcm, sample_rate)
|
|
return await self._create_from_pcm(
|
|
model=model,
|
|
avatar_id=avatar_id,
|
|
pcm=pcm,
|
|
title=title,
|
|
source=source,
|
|
fasterliveportrait_config=fasterliveportrait_config,
|
|
composition_config=composition_config,
|
|
)
|
|
|
|
async def create_reference_video(
|
|
self,
|
|
*,
|
|
model: str,
|
|
avatar_id: str,
|
|
duration_sec: int | None,
|
|
title: str,
|
|
composition_config: Mapping[str, object] | None = None,
|
|
) -> dict[str, Any]:
|
|
model_value = _normalize_model(model)
|
|
if model_value != "flashtalk":
|
|
raise ValueError("reference video generation only supports flashtalk")
|
|
duration = _validate_reference_duration(self.settings, duration_sec)
|
|
sample_rate = 16000
|
|
total_samples = duration * sample_rate
|
|
pcm = await _load_reference_driver_pcm(self.settings, total_samples)
|
|
if pcm is None:
|
|
level = _settings_float(self.settings, "video_creation_reference_driver_level", 480.0)
|
|
pcm = _build_reference_driver_pcm(total_samples, level=level)
|
|
return await self._create_from_pcm(
|
|
model=model_value,
|
|
avatar_id=avatar_id,
|
|
pcm=pcm,
|
|
title=title,
|
|
source="reference_video",
|
|
composition_config=composition_config,
|
|
)
|
|
|
|
async def _resample_pcm(self, pcm: np.ndarray, sample_rate: int) -> np.ndarray:
|
|
with tempfile.TemporaryDirectory(prefix="opentalking_vc_resample_") as tmp:
|
|
tmpdir = Path(tmp)
|
|
src = tmpdir / "src.wav"
|
|
out = tmpdir / "out.wav"
|
|
_write_wav(src, pcm, sample_rate)
|
|
proc = await asyncio.create_subprocess_exec(
|
|
str(getattr(self.settings, "ffmpeg_bin", "ffmpeg") or "ffmpeg"),
|
|
"-y",
|
|
"-hide_banner",
|
|
"-loglevel",
|
|
"error",
|
|
"-i",
|
|
str(src),
|
|
"-ac",
|
|
"1",
|
|
"-ar",
|
|
"16000",
|
|
"-f",
|
|
"wav",
|
|
str(out),
|
|
stdout=asyncio.subprocess.DEVNULL,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
_, stderr = await proc.communicate()
|
|
if proc.returncode != 0:
|
|
detail = (stderr or b"").decode("utf-8", errors="replace")[:1200]
|
|
raise RuntimeError(f"ffmpeg resample failed ({proc.returncode}): {detail}")
|
|
with wave.open(str(out), "rb") as wf:
|
|
raw = wf.readframes(wf.getnframes())
|
|
return np.frombuffer(raw, dtype="<i2").copy()
|
|
|
|
async def _create_from_pcm(
|
|
self,
|
|
*,
|
|
model: str,
|
|
avatar_id: str,
|
|
pcm: np.ndarray,
|
|
title: str,
|
|
source: str,
|
|
fasterliveportrait_config: Mapping[str, object] | None = None,
|
|
composition_config: Mapping[str, object] | None = None,
|
|
) -> dict[str, Any]:
|
|
model_value = _normalize_model(model)
|
|
avatar_path = _avatar_dir(self.settings, avatar_id)
|
|
normalized_composition_config = _normalize_video_composition_config(self.settings, avatar_path, composition_config)
|
|
job_id = uuid.uuid4().hex
|
|
work_dir = _settings_path(self.settings, "exports_dir", "./data/exports") / "video_creation_jobs" / job_id
|
|
work_dir.mkdir(parents=True, exist_ok=False)
|
|
pcm = np.asarray(pcm, dtype=np.int16).reshape(-1)
|
|
sample_rate = 16000
|
|
audio_wav = work_dir / "audio.wav"
|
|
_write_wav(audio_wav, pcm, sample_rate)
|
|
|
|
backend = resolve_model_backend(model_value, self.settings)
|
|
backend_name = str(getattr(backend, "backend", "") or "").strip().lower()
|
|
ws_url = ""
|
|
if backend_name in {"omnirt", "direct_ws"}:
|
|
ws_url = (
|
|
str(getattr(backend, "ws_url", "") or "")
|
|
if backend_name == "direct_ws"
|
|
else resolve_synthesis_ws_url(model_value, self.settings)
|
|
)
|
|
log.info(
|
|
"video creation audio2video backend: job=%s model=%s avatar=%s backend=%s ws_url=%s",
|
|
job_id,
|
|
model_value,
|
|
avatar_id,
|
|
backend_name or "unknown",
|
|
ws_url,
|
|
)
|
|
client = _audio2video_client(self.settings, model_value, sample_rate, backend=backend)
|
|
preroll_samples = _fasterliveportrait_preroll_samples(self.settings, model_value, sample_rate)
|
|
render_source_pcm = pcm
|
|
if preroll_samples:
|
|
render_source_pcm = np.concatenate([np.zeros(preroll_samples, dtype=np.int16), pcm])
|
|
|
|
frames: list[np.ndarray] = []
|
|
try:
|
|
await client.init_session(
|
|
**_init_session_kwargs(
|
|
model=model_value,
|
|
avatar_path=avatar_path,
|
|
settings=self.settings,
|
|
backend=backend,
|
|
fasterliveportrait_config=fasterliveportrait_config,
|
|
)
|
|
)
|
|
await client.prewarm()
|
|
chunk_samples = max(1, int(client.audio_chunk_samples or round(sample_rate / max(1, client.fps))))
|
|
pad_len = (-len(render_source_pcm)) % chunk_samples
|
|
render_pcm = render_source_pcm if not pad_len else np.concatenate([
|
|
render_source_pcm,
|
|
np.zeros(pad_len, dtype=np.int16),
|
|
])
|
|
for start in range(0, len(render_pcm), chunk_samples):
|
|
chunk = render_pcm[start:start + chunk_samples]
|
|
for frame in await client.generate(chunk):
|
|
arr = _frame_array(frame)
|
|
if arr is not None:
|
|
frames.append(arr)
|
|
fps = float(client.fps or 25)
|
|
finally:
|
|
await client.close()
|
|
|
|
if preroll_samples:
|
|
drop_frames = max(0, int(round(float(preroll_samples) * fps / float(sample_rate))))
|
|
if drop_frames:
|
|
frames = frames[drop_frames:]
|
|
target_frames = max(1, int(round(float(pcm.size) * fps / float(sample_rate))))
|
|
if len(frames) > target_frames:
|
|
frames = frames[:target_frames]
|
|
frames = _apply_video_composition(frames, config=normalized_composition_config)
|
|
|
|
video_only = work_dir / "video_only.mp4"
|
|
_write_video_only(video_only, frames, fps)
|
|
output_mp4 = work_dir / "result.mp4"
|
|
await _ffmpeg_mux(str(getattr(self.settings, "ffmpeg_bin", "ffmpeg") or "ffmpeg"), video_only, audio_wav, output_mp4)
|
|
content = output_mp4.read_bytes()
|
|
duration = float(pcm.size) / float(sample_rate) if sample_rate else None
|
|
item = create_video_export(
|
|
_settings_path(self.settings, "exports_dir", "./data/exports"),
|
|
content=content,
|
|
mime_type="video/mp4",
|
|
kind="video_creation",
|
|
title=_safe_title(title, model=model_value, avatar_id=avatar_id),
|
|
duration_sec=duration,
|
|
session_id=None,
|
|
avatar_id=avatar_id,
|
|
model=model_value,
|
|
max_bytes=_settings_int(self.settings, "export_max_bytes", 1024 * 1024 * 1024),
|
|
)
|
|
log.info(
|
|
"video creation export complete: job=%s export_id=%s model=%s avatar=%s path=%s",
|
|
job_id,
|
|
item.get("id"),
|
|
model_value,
|
|
avatar_id,
|
|
item.get("path"),
|
|
)
|
|
return {
|
|
"job_id": job_id,
|
|
"status": "done",
|
|
"source": source,
|
|
"export_video": _export_with_download_url(item),
|
|
}
|