Files
PYTV/api/routers/channel.py
2026-03-08 16:48:58 -04:00

203 lines
6.8 KiB
Python

from ninja import Router, Schema
from typing import List, Optional
from core.models import Channel, AppUser, Library, ChannelSourceRule, MediaSource, Airing
from django.shortcuts import get_object_or_404
from django.utils import timezone
from datetime import datetime, timedelta
router = Router(tags=["channel"])
class ChannelSchema(Schema):
id: int
name: str
slug: str
channel_number: Optional[int] = None
description: Optional[str] = None
scheduling_mode: str
library_id: int
owner_user_id: int
class ChannelCreateSchema(Schema):
name: str
slug: str
channel_number: Optional[int] = None
description: Optional[str] = None
library_id: int
owner_user_id: int # Mock Auth User
class ChannelUpdateSchema(Schema):
name: Optional[str] = None
description: Optional[str] = None
channel_number: Optional[int] = None
scheduling_mode: Optional[str] = None
visibility: Optional[str] = None
is_active: Optional[bool] = None
class ChannelSourceRuleSchema(Schema):
id: int
source_id: int
source_name: str
rule_mode: str
weight: float
class ChannelSourceAssignSchema(Schema):
source_id: int
rule_mode: str = 'allow' # allow | prefer | avoid | block
weight: float = 1.0
class AiringSchema(Schema):
id: int
media_item_title: str
media_item_path: Optional[str] = None
starts_at: datetime
ends_at: datetime
slot_kind: str
status: str
@staticmethod
def from_airing(airing) -> 'AiringSchema':
media_path = None
if airing.media_item:
raw_path = airing.media_item.cached_file_path or airing.media_item.file_path
if raw_path:
if raw_path.startswith("http://") or raw_path.startswith("https://"):
media_path = raw_path
else:
from django.conf import settings
import os
try:
rel_path = os.path.relpath(raw_path, settings.MEDIA_ROOT)
if not rel_path.startswith("..") and not os.path.isabs(rel_path):
base = settings.MEDIA_URL.rstrip('/')
media_path = f"{base}/{rel_path}"
else:
media_path = raw_path
except ValueError:
media_path = raw_path
return AiringSchema(
id=airing.id,
media_item_title=airing.media_item.title if airing.media_item else 'Unknown',
media_item_path=media_path,
starts_at=airing.starts_at,
ends_at=airing.ends_at,
slot_kind=airing.slot_kind,
status=airing.status,
)
@router.get("/", response=List[ChannelSchema])
def list_channels(request):
return Channel.objects.all()
@router.get("/{channel_id}", response=ChannelSchema)
def get_channel(request, channel_id: int):
return get_object_or_404(Channel, id=channel_id)
@router.post("/", response={201: ChannelSchema})
def create_channel(request, payload: ChannelCreateSchema):
owner = get_object_or_404(AppUser, id=payload.owner_user_id)
library = get_object_or_404(Library, id=payload.library_id)
channel = Channel.objects.create(
owner_user=owner,
library=library,
name=payload.name,
slug=payload.slug,
channel_number=payload.channel_number,
description=payload.description
)
return 201, channel
@router.patch("/{channel_id}", response=ChannelSchema)
def update_channel(request, channel_id: int, payload: ChannelUpdateSchema):
channel = get_object_or_404(Channel, id=channel_id)
for attr, value in payload.dict(exclude_unset=True).items():
setattr(channel, attr, value)
channel.save()
return channel
@router.delete("/{channel_id}", response={204: None})
def delete_channel(request, channel_id: int):
channel = get_object_or_404(Channel, id=channel_id)
channel.delete()
return 204, None
@router.get("/{channel_id}/sources", response=List[ChannelSourceRuleSchema])
def list_channel_sources(request, channel_id: int):
channel = get_object_or_404(Channel, id=channel_id)
rules = ChannelSourceRule.objects.filter(channel=channel, media_source__isnull=False).select_related('media_source')
return [
ChannelSourceRuleSchema(
id=r.id,
source_id=r.media_source.id,
source_name=r.media_source.name,
rule_mode=r.rule_mode,
weight=float(r.weight),
)
for r in rules
]
@router.post("/{channel_id}/sources", response={201: ChannelSourceRuleSchema})
def assign_source_to_channel(request, channel_id: int, payload: ChannelSourceAssignSchema):
channel = get_object_or_404(Channel, id=channel_id)
source = get_object_or_404(MediaSource, id=payload.source_id)
rule = ChannelSourceRule.objects.create(
channel=channel,
media_source=source,
rule_mode=payload.rule_mode,
weight=payload.weight,
)
return 201, ChannelSourceRuleSchema(
id=rule.id,
source_id=source.id,
source_name=source.name,
rule_mode=rule.rule_mode,
weight=float(rule.weight),
)
@router.delete("/{channel_id}/sources/{rule_id}", response={204: None})
def remove_source_from_channel(request, channel_id: int, rule_id: int):
rule = get_object_or_404(ChannelSourceRule, id=rule_id, channel_id=channel_id)
rule.delete()
return 204, None
@router.get("/{channel_id}/now", response=Optional[AiringSchema])
def channel_now_playing(request, channel_id: int):
"""Return the Airing currently on-air for this channel, or null."""
channel = get_object_or_404(Channel, id=channel_id)
# Using a 1-second buffer to handle boundary conditions smoothly
now = timezone.now()
airing = (
Airing.objects
.filter(channel=channel, starts_at__lte=now, ends_at__gt=now)
.select_related('media_item')
.first()
)
if airing is None:
return None
return AiringSchema.from_airing(airing)
@router.get("/{channel_id}/airings", response=List[AiringSchema])
def channel_airings(request, channel_id: int, hours: int = 4):
"""
Return Airings for this channel that overlap with the window:
[now - 2 hours, now + {hours} hours]
"""
channel = get_object_or_404(Channel, id=channel_id)
now = timezone.now()
window_start = now - timedelta(hours=2) # Look back 2h for context
window_end = now + timedelta(hours=hours)
# Logic for overlap: starts_at < window_end AND ends_at > window_start
airings = (
Airing.objects
.filter(
channel=channel,
starts_at__lt=window_end,
ends_at__gt=window_start
)
.select_related('media_item')
.order_by('starts_at')
)
return [AiringSchema.from_airing(a) for a in airings]