feat(main): main
This commit is contained in:
56
core/management/commands/run_cache_worker.py
Normal file
56
core/management/commands/run_cache_worker.py
Normal file
@@ -0,0 +1,56 @@
|
||||
"""
|
||||
management command: run_cache_worker
|
||||
|
||||
Runs continuously in the background to automatically download and cache
|
||||
upcoming programming for the next 24 hours. Intended to run as a daemon
|
||||
or Docker service.
|
||||
"""
|
||||
|
||||
import time
|
||||
import logging
|
||||
from django.core.management.base import BaseCommand
|
||||
from core.services.cache import run_cache
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = "Run the 24-hour ahead cache worker continuously in the background."
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument(
|
||||
"--interval",
|
||||
type=int,
|
||||
default=600,
|
||||
help="Interval in seconds between cache runs (default: 600s/10m).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--hours",
|
||||
type=int,
|
||||
default=24,
|
||||
help="How many hours ahead to scan for upcoming airings (default: 24).",
|
||||
)
|
||||
|
||||
def handle(self, *args, **options):
|
||||
interval = options["interval"]
|
||||
hours = options["hours"]
|
||||
|
||||
self.stdout.write(self.style.SUCCESS(f"Starting continuous cache worker (interval: {interval}s, ahead: {hours}h)"))
|
||||
|
||||
while True:
|
||||
try:
|
||||
self.stdout.write(f"▶ Running background cache worker (window: {hours}h)")
|
||||
result = run_cache(hours=hours, prune_only=False)
|
||||
|
||||
if result["downloaded"] > 0 or result["pruned"] > 0 or result["failed"] > 0:
|
||||
self.stdout.write(self.style.SUCCESS(f" 🗑 Pruned: {result['pruned']}"))
|
||||
self.stdout.write(self.style.SUCCESS(f" ↓ Downloaded: {result['downloaded']}"))
|
||||
self.stdout.write(self.style.SUCCESS(f" ✓ Already cached: {result['already_cached']}"))
|
||||
if result["failed"]:
|
||||
self.stderr.write(self.style.ERROR(f" ✗ Failed: {result['failed']}"))
|
||||
|
||||
except Exception as e:
|
||||
self.stderr.write(self.style.ERROR(f"Error in cache worker loop: {e}"))
|
||||
logger.error(f"Error in cache worker loop: {e}")
|
||||
|
||||
# Sleep until next interval
|
||||
time.sleep(interval)
|
||||
@@ -0,0 +1,34 @@
|
||||
# Generated by Django 6.0.3 on 2026-03-08 22:07
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("core", "0002_mediaitem_cache_expires_at_and_more"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name="channelsourcerule",
|
||||
name="schedule_block_label",
|
||||
field=models.CharField(blank=True, max_length=255, null=True),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name="scheduleblock",
|
||||
name="target_content_rating",
|
||||
field=models.IntegerField(
|
||||
blank=True,
|
||||
choices=[
|
||||
(1, "TV-Y / All Children"),
|
||||
(2, "TV-Y7 / Directed to Older Children"),
|
||||
(3, "TV-G / General Audience"),
|
||||
(4, "TV-PG / Parental Guidance Suggested"),
|
||||
(5, "TV-14 / Parents Strongly Cautioned"),
|
||||
(6, "TV-MA / Mature Audience Only"),
|
||||
],
|
||||
null=True,
|
||||
),
|
||||
),
|
||||
]
|
||||
@@ -255,6 +255,7 @@ class ChannelSourceRule(models.Model):
|
||||
channel = models.ForeignKey(Channel, on_delete=models.CASCADE)
|
||||
media_source = models.ForeignKey(MediaSource, on_delete=models.CASCADE, blank=True, null=True)
|
||||
media_collection = models.ForeignKey(MediaCollection, on_delete=models.CASCADE, blank=True, null=True)
|
||||
schedule_block_label = models.CharField(max_length=255, blank=True, null=True)
|
||||
|
||||
class RuleMode(models.TextChoices):
|
||||
ALLOW = 'allow', 'Allow'
|
||||
@@ -347,6 +348,16 @@ class ScheduleBlock(models.Model):
|
||||
end_local_time = models.TimeField()
|
||||
day_of_week_mask = models.SmallIntegerField() # 1 to 127
|
||||
spills_past_midnight = models.BooleanField(default=False)
|
||||
|
||||
class TargetRating(models.IntegerChoices):
|
||||
TV_Y = 1, 'TV-Y / All Children'
|
||||
TV_Y7 = 2, 'TV-Y7 / Directed to Older Children'
|
||||
TV_G = 3, 'TV-G / General Audience'
|
||||
TV_PG = 4, 'TV-PG / Parental Guidance Suggested'
|
||||
TV_14 = 5, 'TV-14 / Parents Strongly Cautioned'
|
||||
TV_MA = 6, 'TV-MA / Mature Audience Only'
|
||||
|
||||
target_content_rating = models.IntegerField(choices=TargetRating.choices, blank=True, null=True)
|
||||
default_genre = models.ForeignKey(Genre, on_delete=models.SET_NULL, blank=True, null=True)
|
||||
min_content_rating = models.ForeignKey(ContentRating, on_delete=models.SET_NULL, blank=True, null=True, related_name='+')
|
||||
max_content_rating = models.ForeignKey(ContentRating, on_delete=models.SET_NULL, blank=True, null=True, related_name='+')
|
||||
|
||||
@@ -40,44 +40,66 @@ def run_cache(hours: int = 24, prune_only: bool = False) -> dict:
|
||||
)
|
||||
|
||||
youtube_items: dict[int, MediaItem] = {}
|
||||
for airing in upcoming:
|
||||
item = airing.media_item
|
||||
if item.media_source and item.media_source.source_type in YOUTUBE_SOURCE_TYPES:
|
||||
youtube_items[item.pk] = item
|
||||
|
||||
downloaded = already_cached = failed = 0
|
||||
items_status = []
|
||||
|
||||
for airing in upcoming:
|
||||
item = airing.media_item
|
||||
|
||||
# Determine if we are inside the 1-hour critical safety window
|
||||
time_until_airing = airing.starts_at - now
|
||||
in_safety_window = time_until_airing.total_seconds() < 3600
|
||||
|
||||
if item.media_source and item.media_source.source_type in YOUTUBE_SOURCE_TYPES:
|
||||
youtube_items[item.pk] = item
|
||||
|
||||
# Skip if already cached
|
||||
if item.cached_file_path and pathlib.Path(item.cached_file_path).exists():
|
||||
already_cached += 1
|
||||
items_status.append({
|
||||
"id": item.pk,
|
||||
"title": item.title,
|
||||
"status": "cached",
|
||||
"path": item.cached_file_path,
|
||||
})
|
||||
continue
|
||||
|
||||
# If in the 1-hour safety valve window, DO NOT download. Replace the airing.
|
||||
if in_safety_window:
|
||||
logger.warning(f"Airing {airing.id} ({item.title}) is < 1h away and not cached! Triggering emergency replacement.")
|
||||
from core.services.scheduler import ScheduleGenerator
|
||||
generator = ScheduleGenerator(channel=airing.channel)
|
||||
try:
|
||||
generator.replace_undownloaded_airings([airing])
|
||||
items_status.append({
|
||||
"id": item.pk,
|
||||
"title": item.title,
|
||||
"status": "replaced",
|
||||
"error": "Not downloaded in time",
|
||||
})
|
||||
except Exception as e:
|
||||
logger.error(f"Emergency replacement failed for airing {airing.id}: {e}")
|
||||
continue
|
||||
|
||||
for item in youtube_items.values():
|
||||
# Skip if already cached
|
||||
if item.cached_file_path and pathlib.Path(item.cached_file_path).exists():
|
||||
already_cached += 1
|
||||
items_status.append({
|
||||
"id": item.pk,
|
||||
"title": item.title,
|
||||
"status": "cached",
|
||||
"path": item.cached_file_path,
|
||||
})
|
||||
continue
|
||||
|
||||
try:
|
||||
local_path = download_for_airing(item)
|
||||
downloaded += 1
|
||||
items_status.append({
|
||||
"id": item.pk,
|
||||
"title": item.title,
|
||||
"status": "downloaded",
|
||||
"path": str(local_path),
|
||||
})
|
||||
except Exception as exc:
|
||||
failed += 1
|
||||
items_status.append({
|
||||
"id": item.pk,
|
||||
"title": item.title,
|
||||
"status": "failed",
|
||||
"error": str(exc),
|
||||
})
|
||||
logger.error("download_for_airing(%s) failed: %s", item.pk, exc)
|
||||
# Otherwise, attempt download normally
|
||||
try:
|
||||
local_path = download_for_airing(item)
|
||||
downloaded += 1
|
||||
items_status.append({
|
||||
"id": item.pk,
|
||||
"title": item.title,
|
||||
"status": "downloaded",
|
||||
"path": str(local_path),
|
||||
})
|
||||
except Exception as exc:
|
||||
failed += 1
|
||||
items_status.append({
|
||||
"id": item.pk,
|
||||
"title": item.title,
|
||||
"status": "failed",
|
||||
"error": str(exc),
|
||||
})
|
||||
logger.error("download_for_airing(%s) failed: %s", item.pk, exc)
|
||||
|
||||
logger.info(
|
||||
"run_cache(hours=%d): pruned=%d downloaded=%d cached=%d failed=%d",
|
||||
|
||||
@@ -46,7 +46,7 @@ class ScheduleGenerator:
|
||||
return 0
|
||||
|
||||
target_weekday_bit = 1 << target_date.weekday()
|
||||
blocks = template.scheduleblock_set.all()
|
||||
blocks = template.scheduleblock_set.all().order_by('start_local_time')
|
||||
airings_created = 0
|
||||
|
||||
for block in blocks:
|
||||
@@ -60,7 +60,7 @@ class ScheduleGenerator:
|
||||
if end_dt <= start_dt:
|
||||
end_dt += timedelta(days=1)
|
||||
|
||||
# Clear existing airings in this window (idempotency)
|
||||
# Clear existing airings whose start time is within this block's window
|
||||
Airing.objects.filter(
|
||||
channel=self.channel,
|
||||
starts_at__gte=start_dt,
|
||||
@@ -71,8 +71,18 @@ class ScheduleGenerator:
|
||||
if not available_items:
|
||||
continue
|
||||
|
||||
# Prevent overlaps: ensure we don't start before the end of the previous block's overrun
|
||||
latest_prior_airing = Airing.objects.filter(
|
||||
channel=self.channel,
|
||||
starts_at__lt=start_dt
|
||||
).order_by('-ends_at').first()
|
||||
|
||||
actual_start_dt = start_dt
|
||||
if latest_prior_airing and latest_prior_airing.ends_at > start_dt:
|
||||
actual_start_dt = latest_prior_airing.ends_at
|
||||
|
||||
airings_created += self._fill_block(
|
||||
template, block, start_dt, end_dt, available_items
|
||||
template, block, actual_start_dt, end_dt, available_items
|
||||
)
|
||||
|
||||
return airings_created
|
||||
@@ -88,14 +98,20 @@ class ScheduleGenerator:
|
||||
).order_by('-priority')
|
||||
return qs.first()
|
||||
|
||||
def _get_weighted_items(self, block: ScheduleBlock) -> list:
|
||||
def _get_weighted_items(self, block: ScheduleBlock, require_downloaded: bool = False) -> list:
|
||||
"""
|
||||
Build a weighted pool of MediaItems respecting ChannelSourceRule.
|
||||
|
||||
If require_downloaded is True, strictly exclude items from YouTube sources
|
||||
that have not yet been downloaded (cached_file_path is null).
|
||||
|
||||
Returns a flat list with items duplicated according to their effective
|
||||
weight (rounded to nearest int, min 1) so random.choice() gives the
|
||||
right probability distribution without needing numpy.
|
||||
"""
|
||||
if block.block_type == ScheduleBlock.BlockType.OFF_AIR:
|
||||
return []
|
||||
|
||||
rules = list(
|
||||
ChannelSourceRule.objects.filter(channel=self.channel)
|
||||
.select_related('media_source')
|
||||
@@ -109,6 +125,10 @@ class ScheduleGenerator:
|
||||
source_weights: dict[int, float] = {}
|
||||
|
||||
for rule in rules:
|
||||
# If a rule has a label, it only applies if this block's name matches
|
||||
if rule.schedule_block_label and rule.schedule_block_label != block.name:
|
||||
continue
|
||||
|
||||
sid = rule.media_source_id
|
||||
mode = rule.rule_mode
|
||||
w = float(rule.weight or 1.0)
|
||||
@@ -148,6 +168,14 @@ class ScheduleGenerator:
|
||||
if block.default_genre:
|
||||
base_qs = base_qs.filter(genres=block.default_genre)
|
||||
|
||||
# Enforce downloaded requirement for emergency replacements
|
||||
if require_downloaded:
|
||||
from django.db.models import Q
|
||||
from core.services.youtube import YOUTUBE_SOURCE_TYPES
|
||||
base_qs = base_qs.exclude(
|
||||
Q(media_source__source_type__in=YOUTUBE_SOURCE_TYPES) & Q(cached_file_path__isnull=True)
|
||||
)
|
||||
|
||||
items = list(base_qs)
|
||||
if not items:
|
||||
return []
|
||||
@@ -208,3 +236,51 @@ class ScheduleGenerator:
|
||||
created += 1
|
||||
|
||||
return created
|
||||
|
||||
def replace_undownloaded_airings(self, airings: list[Airing]):
|
||||
"""
|
||||
Takes a list of specific Airings that failed to download or are
|
||||
too close to airtime without a valid cache file. Replaces the
|
||||
underlying media_item with one guaranteed to be playable, and
|
||||
ripple-shifts all following airings on the channel by the duration diff.
|
||||
"""
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
for original_airing in airings:
|
||||
# 1. Fetch available downloaded items for this block
|
||||
safe_items = self._get_weighted_items(original_airing.schedule_block, require_downloaded=True)
|
||||
if not safe_items:
|
||||
logger.error(f"Cannot replace airing {original_airing.id}: No downloaded items available for block {original_airing.schedule_block.name}")
|
||||
continue
|
||||
|
||||
|
||||
# 2. Pick a random valid fallback item
|
||||
fallback_item = random.choice(safe_items)
|
||||
old_duration = original_airing.ends_at - original_airing.starts_at
|
||||
|
||||
# Update the original airing to reference the new item
|
||||
original_airing.media_item = fallback_item
|
||||
original_airing.source_reason = 'recovery'
|
||||
|
||||
new_duration = timedelta(seconds=max(fallback_item.runtime_seconds or 1800, 1))
|
||||
original_airing.ends_at = original_airing.starts_at + new_duration
|
||||
original_airing.save(update_fields=['media_item', 'source_reason', 'ends_at'])
|
||||
|
||||
logger.info(f"Replaced airing {original_airing.id} with '{fallback_item.title}' (diff: {new_duration - old_duration})")
|
||||
|
||||
# 3. Ripple shift downstream airings accurately
|
||||
delta = new_duration - old_duration
|
||||
|
||||
if delta.total_seconds() != 0:
|
||||
# Find all airings strictly after this one on the same channel
|
||||
downstream = Airing.objects.filter(
|
||||
channel=self.channel,
|
||||
starts_at__gte=original_airing.starts_at + old_duration
|
||||
).exclude(id=original_airing.id).order_by('starts_at')
|
||||
|
||||
# Apply shift
|
||||
for later_airing in downstream:
|
||||
later_airing.starts_at += delta
|
||||
later_airing.ends_at += delta
|
||||
later_airing.save(update_fields=['starts_at', 'ends_at'])
|
||||
|
||||
@@ -238,7 +238,36 @@ def download_for_airing(media_item: MediaItem) -> Path:
|
||||
|
||||
# Persist the cache location on the model
|
||||
media_item.cached_file_path = str(downloaded_path)
|
||||
media_item.save(update_fields=["cached_file_path"])
|
||||
|
||||
# Extract exact runtime from the cached file using ffprobe-static via Node.js
|
||||
import subprocess
|
||||
import json
|
||||
|
||||
exact_duration = None
|
||||
try:
|
||||
# Resolve ffprobe path from the npm package
|
||||
node_cmd = ["node", "-e", "console.log(require('ffprobe-static').path)"]
|
||||
result = subprocess.run(node_cmd, capture_output=True, text=True, check=True)
|
||||
ffprobe_cmd = result.stdout.strip()
|
||||
|
||||
probe_cmd = [
|
||||
ffprobe_cmd,
|
||||
"-v", "error",
|
||||
"-show_entries", "format=duration",
|
||||
"-of", "default=noprint_wrappers=1:nokey=1",
|
||||
str(downloaded_path)
|
||||
]
|
||||
probe_result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True)
|
||||
exact_duration = float(probe_result.stdout.strip())
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to extract exact runtime for {video_id} using ffprobe: {e}")
|
||||
|
||||
if exact_duration:
|
||||
# Round up to nearest integer to be safe on bounds
|
||||
import math
|
||||
media_item.runtime_seconds = int(math.ceil(exact_duration))
|
||||
|
||||
media_item.save(update_fields=["cached_file_path", "runtime_seconds"])
|
||||
|
||||
logger.info("downloaded %s -> %s", video_id, downloaded_path)
|
||||
logger.info("downloaded %s -> %s (exact runtime: %s)", video_id, downloaded_path, exact_duration)
|
||||
return downloaded_path
|
||||
|
||||
Reference in New Issue
Block a user