Files
PYTV/core/services/scheduler.py
2026-03-09 13:29:23 -04:00

313 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Schedule generator — respects ChannelSourceRule assignments.
Source selection priority:
1. If any rules with rule_mode='prefer' exist, items from those sources
are weighted much more heavily.
2. Items from rule_mode='allow' sources fill the rest.
3. Items from rule_mode='avoid' sources are only used as a last resort
(weight × 0.1).
4. Items from rule_mode='block' sources are NEVER scheduled.
5. If NO ChannelSourceRule rows exist for this channel, falls back to
the old behaviour (all items in the channel's library).
"""
import random
import uuid
from datetime import datetime, timedelta, date, timezone
from core.models import (
Channel, ChannelSourceRule, ScheduleTemplate,
ScheduleBlock, Airing, MediaItem,
)
class ScheduleGenerator:
"""
Reads ScheduleTemplate + ScheduleBlocks for a channel and fills the day
with concrete Airing rows, picking MediaItems according to the channel's
ChannelSourceRule assignments.
"""
def __init__(self, channel: Channel):
self.channel = channel
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def generate_for_date(self, target_date: date) -> int:
"""
Idempotent generation of airings for `target_date`.
Returns the number of new Airing rows created.
"""
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
template = self._get_template()
if not template:
return 0
# Resolve the template's local timezone (fall back to UTC)
try:
local_tz = ZoneInfo(template.timezone_name or 'UTC')
except (ZoneInfoNotFoundError, Exception):
local_tz = ZoneInfo('UTC')
target_weekday_bit = 1 << target_date.weekday()
blocks = template.scheduleblock_set.all().order_by('start_local_time')
airings_created = 0
for block in blocks:
if not (block.day_of_week_mask & target_weekday_bit):
continue
# Convert local block times to UTC-aware datetimes
start_local = datetime.combine(target_date, block.start_local_time, tzinfo=local_tz)
end_local = datetime.combine(target_date, block.end_local_time, tzinfo=local_tz)
start_dt = start_local.astimezone(timezone.utc)
end_dt = end_local.astimezone(timezone.utc)
# Midnight-wrap support (e.g. 23:0002:00 local)
if end_dt <= start_dt:
end_dt += timedelta(days=1)
# Clear existing airings whose start time is within this block's window
Airing.objects.filter(
channel=self.channel,
starts_at__gte=start_dt,
starts_at__lt=end_dt,
).delete()
available_items = self._get_weighted_items(block)
if not available_items:
continue
# Prevent overlaps: ensure we don't start before the end of the previous block's overrun
latest_prior_airing = Airing.objects.filter(
channel=self.channel,
starts_at__lt=start_dt
).order_by('-ends_at').first()
actual_start_dt = start_dt
if latest_prior_airing and latest_prior_airing.ends_at > start_dt:
actual_start_dt = latest_prior_airing.ends_at
# If the prior block ran all the way through this block's window, skip
if actual_start_dt >= end_dt:
continue
airings_created += self._fill_block(
template, block, actual_start_dt, end_dt, available_items
)
return airings_created
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _get_template(self):
"""Pick the highest-priority active ScheduleTemplate for this channel."""
qs = ScheduleTemplate.objects.filter(
channel=self.channel, is_active=True
).order_by('-priority')
return qs.first()
def _get_weighted_items(self, block: ScheduleBlock, require_downloaded: bool = False) -> list:
"""
Build a weighted pool of MediaItems respecting ChannelSourceRule.
If require_downloaded is True, strictly exclude items from YouTube sources
that have not yet been downloaded (cached_file_path is null).
Returns a flat list with items duplicated according to their effective
weight (rounded to nearest int, min 1) so random.choice() gives the
right probability distribution without needing numpy.
"""
if block.block_type == ScheduleBlock.BlockType.OFF_AIR:
return []
rules = list(
ChannelSourceRule.objects.filter(channel=self.channel)
.select_related('media_source')
)
if rules:
# ── Rules exist: build filtered + weighted pool ───────────────
allowed_source_ids = set() # allow + prefer
blocked_source_ids = set() # block
avoid_source_ids = set() # avoid
source_weights: dict[int, float] = {}
for rule in rules:
# If a rule has a label, it only applies if this block's name matches
if rule.schedule_block_label and rule.schedule_block_label != block.name:
continue
sid = rule.media_source_id
mode = rule.rule_mode
w = float(rule.weight or 1.0)
if mode == 'block':
blocked_source_ids.add(sid)
elif mode == 'avoid':
avoid_source_ids.add(sid)
source_weights[sid] = w * 0.1 # heavily discounted
elif mode == 'prefer':
allowed_source_ids.add(sid)
source_weights[sid] = w * 3.0 # boosted
else: # 'allow'
allowed_source_ids.add(sid)
source_weights[sid] = w
# Build base queryset from allowed + avoid sources (not blocked)
eligible_source_ids = (allowed_source_ids | avoid_source_ids) - blocked_source_ids
if not eligible_source_ids:
return []
base_qs = MediaItem.objects.filter(
media_source_id__in=eligible_source_ids,
is_active=True,
).exclude(item_kind='bumper').select_related('media_source')
else:
# ── No rules: fall back to full library (old behaviour) ────────
base_qs = MediaItem.objects.filter(
media_source__library=self.channel.library,
is_active=True,
).exclude(item_kind='bumper')
source_weights = {}
# Optionally filter by genre if block specifies one
if block.default_genre:
base_qs = base_qs.filter(genres=block.default_genre)
# Enforce downloaded requirement for emergency replacements
if require_downloaded:
from django.db.models import Q
from core.services.youtube import YOUTUBE_SOURCE_TYPES
base_qs = base_qs.exclude(
Q(media_source__source_type__in=YOUTUBE_SOURCE_TYPES) & Q(cached_file_path__isnull=True)
)
items = list(base_qs)
if not items:
return []
if not source_weights:
# No weight information — plain shuffle
random.shuffle(items)
return items
# Build weighted list: each item appears ⌈weight⌉ times
weighted: list[MediaItem] = []
for item in items:
w = source_weights.get(item.media_source_id, 1.0)
copies = max(1, round(w))
weighted.extend([item] * copies)
random.shuffle(weighted)
return weighted
def _fill_block(
self,
template: ScheduleTemplate,
block: ScheduleBlock,
start_dt: datetime,
end_dt: datetime,
items: list,
) -> int:
"""Fill start_dt→end_dt with sequential Airings, cycling through items."""
cursor = start_dt
idx = 0
created = 0
batch = uuid.uuid4()
while cursor < end_dt:
item = items[idx % len(items)]
idx += 1
duration = timedelta(seconds=max(item.runtime_seconds or 1800, 1))
# Don't let a single item overshoot the end by more than its own length
if cursor + duration > end_dt + timedelta(hours=1):
break
Airing.objects.create(
channel=self.channel,
schedule_template=template,
schedule_block=block,
media_item=item,
starts_at=cursor,
ends_at=cursor + duration,
slot_kind="program",
status="scheduled",
source_reason="template",
generation_batch_uuid=batch,
)
cursor += duration
created += 1
return created
def replace_undownloaded_airings(self, airings: list[Airing]):
"""
Takes a list of specific Airings that failed to download or are
too close to airtime without a valid cache file. Replaces the
underlying media_item with one guaranteed to be playable, and
ripple-shifts all following airings on the channel by the duration diff.
"""
import logging
logger = logging.getLogger(__name__)
for original_airing in airings:
# 1. First check if the channel has a dedicated error fallback collection
safe_items = []
if getattr(self.channel, 'fallback_collection', None):
safe_items = list(self.channel.fallback_collection.media_items.exclude(
cached_file_path__isnull=True,
media_source__source_type__in=['youtube', 'youtube_channel', 'youtube_playlist']
))
# 2. If no fallback collection or it yielded no valid items, try block sources
if not safe_items:
safe_items = self._get_weighted_items(original_airing.schedule_block, require_downloaded=True)
if not safe_items:
logger.error(f"Cannot replace airing {original_airing.id}: No downloaded items available for fallback or block {original_airing.schedule_block.name}")
continue
# 2. Pick a random valid fallback item
fallback_item = random.choice(safe_items)
old_duration = original_airing.ends_at - original_airing.starts_at
# Update the original airing to reference the new item
original_airing.media_item = fallback_item
original_airing.source_reason = 'recovery'
new_duration = timedelta(seconds=max(fallback_item.runtime_seconds or 1800, 1))
original_airing.ends_at = original_airing.starts_at + new_duration
original_airing.save(update_fields=['media_item', 'source_reason', 'ends_at'])
logger.info(f"Replaced airing {original_airing.id} with '{fallback_item.title}' (diff: {new_duration - old_duration})")
# 3. Ripple shift downstream airings accurately
delta = new_duration - old_duration
if delta.total_seconds() != 0:
# Find all airings strictly after this one on the same channel
downstream = Airing.objects.filter(
channel=self.channel,
starts_at__gte=original_airing.starts_at + old_duration
).exclude(id=original_airing.id).order_by('starts_at')
# Apply shift
for later_airing in downstream:
later_airing.starts_at += delta
later_airing.ends_at += delta
later_airing.save(update_fields=['starts_at', 'ends_at'])