""" Cache service — reusable download/prune logic used by both: - python manage.py cache_upcoming - POST /api/sources/cache-upcoming """ import logging import pathlib from datetime import timedelta from django.utils import timezone from core.models import Airing, MediaItem, MediaSource from core.services.youtube import download_for_airing, YOUTUBE_SOURCE_TYPES logger = logging.getLogger(__name__) def run_cache(hours: int = 24, prune_only: bool = False, channel_id: int | None = None) -> dict: """ Scan Airings in the next `hours` hours, download any uncached YouTube videos, and prune stale local files. If `channel_id` is provided, only process airings for that specific channel. Returns a summary dict suitable for JSON serialization. """ now = timezone.now() window_end = now + timedelta(hours=hours) # ── Prune first ──────────────────────────────────────────────────────── pruned = _prune(now) if prune_only: return {"pruned": pruned, "downloaded": 0, "already_cached": 0, "failed": 0, "items": []} # ── Find upcoming and currently playing YouTube-backed airings ────────── qs = Airing.objects.filter(ends_at__gt=now, starts_at__lte=window_end) if channel_id is not None: qs = qs.filter(channel_id=channel_id) upcoming = qs.select_related("media_item__media_source") youtube_items: dict[int, MediaItem] = {} downloaded = already_cached = failed = 0 items_status = [] for airing in upcoming: item = airing.media_item # Determine if we are inside the 1-hour critical safety window time_until_airing = airing.starts_at - now in_safety_window = time_until_airing.total_seconds() < 3600 if item.media_source and item.media_source.source_type in YOUTUBE_SOURCE_TYPES: youtube_items[item.pk] = item # Skip if already cached if item.cached_file_path and pathlib.Path(item.cached_file_path).exists(): already_cached += 1 items_status.append({ "id": item.pk, "title": item.title, "status": "cached", "path": item.cached_file_path, }) continue # If in the 1-hour safety valve window, DO NOT download. Replace the airing. if in_safety_window: logger.warning(f"Airing {airing.id} ({item.title}) is < 1h away and not cached! Triggering emergency replacement.") from core.services.scheduler import ScheduleGenerator generator = ScheduleGenerator(channel=airing.channel) try: generator.replace_undownloaded_airings([airing]) items_status.append({ "id": item.pk, "title": item.title, "status": "replaced", "error": "Not downloaded in time", }) except Exception as e: logger.error(f"Emergency replacement failed for airing {airing.id}: {e}") continue # Otherwise, attempt download normally try: local_path = download_for_airing(item) downloaded += 1 items_status.append({ "id": item.pk, "title": item.title, "status": "downloaded", "path": str(local_path), }) except Exception as exc: failed += 1 items_status.append({ "id": item.pk, "title": item.title, "status": "failed", "error": str(exc), }) logger.error("download_for_airing(%s) failed: %s", item.pk, exc) logger.info( "run_cache(hours=%d): pruned=%d downloaded=%d cached=%d failed=%d", hours, pruned, downloaded, already_cached, failed, ) return { "pruned": pruned, "downloaded": downloaded, "already_cached": already_cached, "failed": failed, "items": items_status, } def _prune(now) -> int: """Delete local cache files whose airings have all ended.""" pruned = 0 stale = MediaItem.objects.filter(cached_file_path__isnull=False).exclude( airing__ends_at__gte=now ) for item in stale: p = pathlib.Path(item.cached_file_path) if p.exists(): try: p.unlink() pruned += 1 except OSError as exc: logger.warning("Could not delete %s: %s", p, exc) item.cached_file_path = None item.cache_expires_at = None item.save(update_fields=["cached_file_path", "cache_expires_at"]) return pruned def get_download_status() -> dict: """ Return a snapshot of all YouTube MediaItems and their cache status, useful for rendering the Downloads UI. """ items = ( MediaItem.objects .filter(media_source__source_type__in=YOUTUBE_SOURCE_TYPES) .select_related("media_source") .order_by("media_source__name", "title") ) result = [] for item in items: cached = bool(item.cached_file_path and pathlib.Path(item.cached_file_path).exists()) result.append({ "id": item.pk, "title": item.title, "source_name": item.media_source.name, "source_id": item.media_source.id, "youtube_video_id": item.youtube_video_id, "runtime_seconds": item.runtime_seconds, "cached": cached, "cached_path": item.cached_file_path if cached else None, }) return {"items": result, "total": len(result), "cached": sum(1 for r in result if r["cached"])}