Files
PYTV/api/routers/schedule.py
2026-03-09 08:26:45 -04:00

144 lines
5.1 KiB
Python

from ninja import Router, Schema
from typing import List, Optional
from datetime import date, time
from core.models import ScheduleTemplate, Channel, ScheduleBlock
from django.shortcuts import get_object_or_404
from datetime import date
router = Router(tags=["schedule"])
class ScheduleTemplateSchema(Schema):
id: int
name: str
description: Optional[str] = None
timezone_name: str
valid_from_date: Optional[date] = None
valid_to_date: Optional[date] = None
priority: int
is_active: bool
channel_id: int
class ScheduleTemplateCreateSchema(Schema):
name: str
description: Optional[str] = None
timezone_name: str
valid_from_date: Optional[date] = None
valid_to_date: Optional[date] = None
priority: int = 0
is_active: bool = True
channel_id: int
class ScheduleBlockSchema(Schema):
id: int
schedule_template_id: int
name: str
block_type: str
start_local_time: time
end_local_time: time
day_of_week_mask: int
spills_past_midnight: bool
target_content_rating: Optional[int] = None
default_genre_id: Optional[int] = None
class ScheduleBlockCreateSchema(Schema):
schedule_template_id: int
name: str
block_type: str
start_local_time: time
end_local_time: time
day_of_week_mask: int
spills_past_midnight: bool = False
target_content_rating: Optional[int] = None
@router.get("/template/", response=List[ScheduleTemplateSchema])
def list_schedule_templates(request):
return ScheduleTemplate.objects.all()
@router.get("/template/{template_id}", response=ScheduleTemplateSchema)
def get_schedule_template(request, template_id: int):
return get_object_or_404(ScheduleTemplate, id=template_id)
@router.post("/template/", response={201: ScheduleTemplateSchema})
def create_schedule_template(request, payload: ScheduleTemplateCreateSchema):
channel = get_object_or_404(Channel, id=payload.channel_id)
template = ScheduleTemplate.objects.create(
channel=channel,
name=payload.name,
description=payload.description,
timezone_name=payload.timezone_name,
valid_from_date=payload.valid_from_date,
valid_to_date=payload.valid_to_date,
priority=payload.priority,
is_active=payload.is_active
)
# Create a default 24/7 programming block automatically to avoid
# complex block management in the UI
from datetime import time
ScheduleBlock.objects.create(
schedule_template=template,
name="Default 24/7 Block",
block_type=ScheduleBlock.BlockType.PROGRAMMING,
start_local_time=time(0, 0, 0),
end_local_time=time(23, 59, 59),
day_of_week_mask=127,
)
return 201, template
class GenerateScheduleSchema(Schema):
target_date: date
@router.post("/generate/{channel_id}")
def generate_schedule(request, channel_id: int, payload: GenerateScheduleSchema):
channel = get_object_or_404(Channel, id=channel_id)
# Inline import to prevent circular dependency issues during initialization
from core.services.scheduler import ScheduleGenerator
generator = ScheduleGenerator(channel=channel)
airings_created = generator.generate_for_date(payload.target_date)
return {"status": "success", "airings_created": airings_created}
@router.delete("/template/{template_id}", response={204: None})
def delete_schedule_template(request, template_id: int):
template = get_object_or_404(ScheduleTemplate, id=template_id)
template.delete()
return 204, None
@router.post("/generate-today/{channel_id}")
def generate_schedule_today(request, channel_id: int):
"""Convenience endpoint: generates today's schedule for a channel."""
from datetime import date
from core.services.scheduler import ScheduleGenerator
channel = get_object_or_404(Channel, id=channel_id)
generator = ScheduleGenerator(channel=channel)
airings_created = generator.generate_for_date(date.today())
return {"status": "success", "airings_created": airings_created}
@router.get("/template/{template_id}/blocks", response=List[ScheduleBlockSchema])
def list_schedule_blocks(request, template_id: int):
template = get_object_or_404(ScheduleTemplate, id=template_id)
return template.scheduleblock_set.all().order_by('start_local_time')
@router.post("/block/", response={201: ScheduleBlockSchema})
def create_schedule_block(request, payload: ScheduleBlockCreateSchema):
template = get_object_or_404(ScheduleTemplate, id=payload.schedule_template_id)
block = ScheduleBlock.objects.create(
schedule_template=template,
name=payload.name,
block_type=payload.block_type,
start_local_time=payload.start_local_time,
end_local_time=payload.end_local_time,
day_of_week_mask=payload.day_of_week_mask,
spills_past_midnight=payload.spills_past_midnight,
target_content_rating=payload.target_content_rating,
)
return 201, block
@router.delete("/block/{block_id}", response={204: None})
def delete_schedule_block(request, block_id: int):
block = get_object_or_404(ScheduleBlock, id=block_id)
block.delete()
return 204, None