""" YouTube source sync service. Two-phase design: Phase 1 — METADATA ONLY (sync_source): Crawls a YouTube channel or playlist and upserts MediaItem rows with title, duration, thumbnail etc. No video files are downloaded. A max_videos cap keeps this fast for large channels. Phase 2 — DOWNLOAD ON DEMAND (download_for_airing): Called only by `python manage.py cache_upcoming` immediately before a scheduled Airing. Downloads only the specific video needed. """ import logging import os from pathlib import Path import yt_dlp from django.conf import settings from django.utils import timezone from core.models import MediaItem, MediaSource logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # helpers # --------------------------------------------------------------------------- YOUTUBE_SOURCE_TYPES = { MediaSource.SourceType.YOUTUBE_CHANNEL, MediaSource.SourceType.YOUTUBE_PLAYLIST, } def _cache_dir() -> Path: """Return (and create) the directory where downloaded videos are stored.""" root = Path(getattr(settings, "MEDIA_ROOT", "/tmp/pytv_cache")) root.mkdir(parents=True, exist_ok=True) return root # --------------------------------------------------------------------------- # metadata extraction (no download) # --------------------------------------------------------------------------- def _extract_playlist_info(url: str, max_videos: int | None = None) -> list[dict]: """ Use yt-dlp to extract metadata for up to `max_videos` videos in a channel/playlist without downloading any files. `extract_flat=True` is crucial — it fetches only a lightweight index (title, id, duration) rather than resolving full stream URLs, which makes crawling large channels orders of magnitude faster. Returns a list of yt-dlp info dicts (most-recent first for channels). """ ydl_opts = { "quiet": True, "no_warnings": True, "extract_flat": True, # metadata only — NO stream/download URLs "ignoreerrors": True, } if max_videos is not None: # yt-dlp uses 1-based playlist indices; playlistend limits how many # entries are fetched from the source before returning. ydl_opts["playlistend"] = max_videos with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) if info is None: return [] # Both channels and playlists wrap entries in an "entries" key. entries = info.get("entries") or [] # Flatten one extra level for channels (channel -> playlist -> entries) flat: list[dict] = [] for entry in entries: if entry is None: continue if "entries" in entry: # nested playlist flat.extend(e for e in entry["entries"] if e) else: flat.append(entry) return flat # --------------------------------------------------------------------------- # public API # --------------------------------------------------------------------------- def sync_source(media_source: MediaSource, max_videos: int | None = None) -> dict: """ Phase 1: Metadata-only sync. Crawls a YouTube channel/playlist and upserts MediaItem rows for each discovered video. No video files are ever downloaded here. Args: media_source: The MediaSource to sync. max_videos: Maximum number of videos to import. When None the defaults are applied: - youtube_channel → 50 (channels can have 10k+ videos) - youtube_playlist → 200 (playlists are usually curated) Returns: {"created": int, "updated": int, "skipped": int} """ if media_source.source_type not in YOUTUBE_SOURCE_TYPES: raise ValueError(f"MediaSource {media_source.id} is not a YouTube source.") # Apply sensible defaults per source type if max_videos is None: if media_source.source_type == MediaSource.SourceType.YOUTUBE_CHANNEL: max_videos = 50 else: max_videos = 200 entries = _extract_playlist_info(media_source.uri, max_videos=max_videos) created = updated = skipped = 0 for entry in entries: video_id = entry.get("id") if not video_id: skipped += 1 continue title = entry.get("title") or f"YouTube Video {video_id}" duration = entry.get("duration") or 0 # seconds, may be None for live thumbnail = entry.get("thumbnail") or "" description = entry.get("description") or "" release_year = None upload_date = entry.get("upload_date") # "YYYYMMDD" # Enforce Source Rules if media_source.min_video_length_seconds is not None: if duration < media_source.min_video_length_seconds: skipped += 1 continue if media_source.max_video_length_seconds is not None: if duration > media_source.max_video_length_seconds: skipped += 1 continue if upload_date and len(upload_date) >= 8: try: year = int(upload_date[0:4]) month = int(upload_date[4:6]) day = int(upload_date[6:8]) from datetime import date video_date = date(year, month, day) release_year = year if media_source.max_age_days is not None: age_days = (date.today() - video_date).days if age_days > media_source.max_age_days: skipped += 1 continue except ValueError: pass # Store the YouTube watch URL in file_path so the scheduler can # reference it. The ACTUAL video file will only be downloaded when # `cache_upcoming` runs before the airing. video_url = entry.get("url") or f"https://www.youtube.com/watch?v={video_id}" obj, was_created = MediaItem.objects.update_or_create( media_source=media_source, youtube_video_id=video_id, defaults={ "title": title, "item_kind": MediaItem.ItemKind.MOVIE, "runtime_seconds": max(int(duration), 1), "file_path": video_url, "thumbnail_path": thumbnail, "description": description, "release_year": release_year, "metadata_json": { "yt_id": video_id, "yt_url": video_url, "uploader": entry.get("uploader", ""), }, "is_active": True, }, ) if was_created: created += 1 else: updated += 1 # Update last-scanned timestamp media_source.last_scanned_at = timezone.now() media_source.save(update_fields=["last_scanned_at"]) logger.info( "sync_source(%s): created=%d updated=%d skipped=%d (limit=%s)", media_source.id, created, updated, skipped, max_videos, ) return {"created": created, "updated": updated, "skipped": skipped} def download_for_airing(media_item: MediaItem) -> Path: """ Download a YouTube video to the local cache so it can be served directly without network dependency at airing time. Returns the local Path of the downloaded file. Raises RuntimeError if the download fails. """ video_id = media_item.youtube_video_id if not video_id: raise ValueError(f"MediaItem {media_item.id} has no youtube_video_id.") cache_dir = _cache_dir() # Use video_id so we can detect already-cached files quickly. output_template = str(cache_dir / f"{video_id}.%(ext)s") # Check if already cached and not expired if media_item.cached_file_path: existing = Path(media_item.cached_file_path) if existing.exists(): logger.info("cache hit: %s already at %s", video_id, existing) return existing from django.core.cache import cache def progress_hook(d): if d['status'] == 'downloading': # Note: _percent_str includes ANSI escape codes sometimes, but yt_dlp usually cleans it if not a tty pct = d.get('_percent_str', '').strip() # Clean ANSI just in case import re pct_clean = re.sub(r'\x1b\[[0-9;]*m', '', pct).strip() if pct_clean: # Store the string "xx.x%" into Django cache. Expire after 5 mins so it cleans itself up. cache.set(f"yt_progress_{video_id}", pct_clean, timeout=300) elif d['status'] == 'finished': cache.set(f"yt_progress_{video_id}", "100%", timeout=300) ydl_opts = { "quiet": True, "no_warnings": True, "outtmpl": output_template, # Only request pre-muxed (progressive) formats — no separate video+audio # streams that would require ffmpeg to merge. Falls back through: # 1. Best pre-muxed mp4 up to 1080p # 2. Any pre-muxed mp4 # 3. Any pre-muxed webm # 4. Anything pre-muxed (no merger needed) "format": "best[ext=mp4][height<=1080]/best[ext=mp4]/best[ext=webm]/best", "progress_hooks": [progress_hook], } url = media_item.file_path # URL stored here by sync_source with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) if info is None: raise RuntimeError(f"yt-dlp returned no info for {url}") downloaded_path = Path(ydl.prepare_filename(info)) if not downloaded_path.exists(): # yt-dlp may have merged to .mp4 even if the template said otherwise mp4_path = downloaded_path.with_suffix(".mp4") if mp4_path.exists(): downloaded_path = mp4_path else: raise RuntimeError(f"Expected download at {downloaded_path} but file not found.") # Persist the cache location on the model media_item.cached_file_path = str(downloaded_path) # Extract exact runtime from the cached file using ffprobe-static via Node.js import subprocess import json exact_duration = None try: # Resolve ffprobe path from the npm package node_cmd = ["node", "-e", "console.log(require('ffprobe-static').path)"] result = subprocess.run(node_cmd, capture_output=True, text=True, check=True) ffprobe_cmd = result.stdout.strip() probe_cmd = [ ffprobe_cmd, "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", str(downloaded_path) ] probe_result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True) exact_duration = float(probe_result.stdout.strip()) except Exception as e: logger.warning(f"Failed to extract exact runtime for {video_id} using ffprobe: {e}") if exact_duration: # Round up to nearest integer to be safe on bounds import math media_item.runtime_seconds = int(math.ceil(exact_duration)) media_item.save(update_fields=["cached_file_path", "runtime_seconds"]) logger.info("downloaded %s -> %s (exact runtime: %s)", video_id, downloaded_path, exact_duration) return downloaded_path