feat(main): initial commit

This commit is contained in:
2026-03-08 11:28:59 -04:00
commit 458ceb31b1
66 changed files with 7885 additions and 0 deletions

0
api/__init__.py Normal file
View File

3
api/admin.py Normal file
View File

@@ -0,0 +1,3 @@
from django.contrib import admin
# Register your models here.

13
api/api.py Normal file
View File

@@ -0,0 +1,13 @@
from ninja import NinjaAPI
api = NinjaAPI(title="PYTV API")
from api.routers.library import router as library_router
from api.routers.channel import router as channel_router
from api.routers.schedule import router as schedule_router
from api.routers.user import router as user_router
api.add_router("/library/", library_router)
api.add_router("/channel/", channel_router)
api.add_router("/schedule/", schedule_router)
api.add_router("/user/", user_router)

5
api/apps.py Normal file
View File

@@ -0,0 +1,5 @@
from django.apps import AppConfig
class ApiConfig(AppConfig):
name = "api"

View File

3
api/models.py Normal file
View File

@@ -0,0 +1,3 @@
from django.db import models
# Create your models here.

0
api/routers/__init__.py Normal file
View File

47
api/routers/channel.py Normal file
View File

@@ -0,0 +1,47 @@
from ninja import Router, Schema
from typing import List, Optional
from core.models import Channel, AppUser, Library
from django.shortcuts import get_object_or_404
router = Router(tags=["channel"])
class ChannelSchema(Schema):
id: int
name: str
slug: str
channel_number: Optional[int] = None
description: Optional[str] = None
scheduling_mode: str
library_id: int
owner_user_id: int
class ChannelCreateSchema(Schema):
name: str
slug: str
channel_number: Optional[int] = None
description: Optional[str] = None
library_id: int
owner_user_id: int # Mock Auth User
@router.get("/", response=List[ChannelSchema])
def list_channels(request):
return Channel.objects.all()
@router.get("/{channel_id}", response=ChannelSchema)
def get_channel(request, channel_id: int):
return get_object_or_404(Channel, id=channel_id)
@router.post("/", response={201: ChannelSchema})
def create_channel(request, payload: ChannelCreateSchema):
owner = get_object_or_404(AppUser, id=payload.owner_user_id)
library = get_object_or_404(Library, id=payload.library_id)
channel = Channel.objects.create(
owner_user=owner,
library=library,
name=payload.name,
slug=payload.slug,
channel_number=payload.channel_number,
description=payload.description
)
return 201, channel

38
api/routers/library.py Normal file
View File

@@ -0,0 +1,38 @@
from ninja import Router, Schema
from typing import List, Optional
from core.models import Library, AppUser
from django.shortcuts import get_object_or_404
router = Router(tags=["library"])
class LibrarySchema(Schema):
id: int
name: str
description: Optional[str] = None
visibility: str
owner_user_id: int
class LibraryCreateSchema(Schema):
name: str
description: Optional[str] = None
visibility: Optional[str] = 'private'
owner_user_id: int # In a real app with auth, this would come from request.user
@router.get("/", response=List[LibrarySchema])
def list_libraries(request):
return Library.objects.all()
@router.get("/{library_id}", response=LibrarySchema)
def get_library(request, library_id: int):
return get_object_or_404(Library, id=library_id)
@router.post("/", response={201: LibrarySchema})
def create_library(request, payload: LibraryCreateSchema):
owner = get_object_or_404(AppUser, id=payload.owner_user_id)
library = Library.objects.create(
owner_user=owner,
name=payload.name,
description=payload.description,
visibility=payload.visibility
)
return 201, library

65
api/routers/schedule.py Normal file
View File

@@ -0,0 +1,65 @@
from ninja import Router, Schema
from typing import List, Optional
from core.models import ScheduleTemplate, Channel
from django.shortcuts import get_object_or_404
from datetime import date
router = Router(tags=["schedule"])
class ScheduleTemplateSchema(Schema):
id: int
name: str
description: Optional[str] = None
timezone_name: str
valid_from_date: Optional[date] = None
valid_to_date: Optional[date] = None
priority: int
is_active: bool
channel_id: int
class ScheduleTemplateCreateSchema(Schema):
name: str
description: Optional[str] = None
timezone_name: str
valid_from_date: Optional[date] = None
valid_to_date: Optional[date] = None
priority: int = 0
is_active: bool = True
channel_id: int
@router.get("/template/", response=List[ScheduleTemplateSchema])
def list_schedule_templates(request):
return ScheduleTemplate.objects.all()
@router.get("/template/{template_id}", response=ScheduleTemplateSchema)
def get_schedule_template(request, template_id: int):
return get_object_or_404(ScheduleTemplate, id=template_id)
@router.post("/template/", response={201: ScheduleTemplateSchema})
def create_schedule_template(request, payload: ScheduleTemplateCreateSchema):
channel = get_object_or_404(Channel, id=payload.channel_id)
template = ScheduleTemplate.objects.create(
channel=channel,
name=payload.name,
description=payload.description,
timezone_name=payload.timezone_name,
valid_from_date=payload.valid_from_date,
valid_to_date=payload.valid_to_date,
priority=payload.priority,
is_active=payload.is_active
)
return 201, template
class GenerateScheduleSchema(Schema):
target_date: date
@router.post("/generate/{channel_id}")
def generate_schedule(request, channel_id: int, payload: GenerateScheduleSchema):
channel = get_object_or_404(Channel, id=channel_id)
# Inline import to prevent circular dependency issues during initialization
from core.services.scheduler import ScheduleGenerator
generator = ScheduleGenerator(channel=channel)
airings_created = generator.generate_for_date(payload.target_date)
return {"status": "success", "airings_created": airings_created}

52
api/routers/user.py Normal file
View File

@@ -0,0 +1,52 @@
from ninja import Router
from core.models import AppUser
from django.shortcuts import get_object_or_404
from ninja import Schema
from typing import List, Optional
from datetime import datetime
router = Router()
class UserSchema(Schema):
id: int
username: str
email: str
is_staff: bool
is_superuser: bool
is_active: bool
date_joined: datetime
class UserCreateSchema(Schema):
username: str
email: str
password: str
is_superuser: bool = False
class UserUpdateSchema(Schema):
username: Optional[str] = None
email: Optional[str] = None
is_active: Optional[bool] = None
@router.get("/", response=List[UserSchema])
def list_users(request):
return AppUser.objects.all()
@router.get("/{user_id}", response=UserSchema)
def get_user(request, user_id: int):
return get_object_or_404(AppUser, id=user_id)
@router.post("/", response={201: UserSchema})
def create_user(request, payload: UserCreateSchema):
if payload.is_superuser:
user = AppUser.objects.create_superuser(payload.username, payload.email, payload.password)
else:
user = AppUser.objects.create_user(payload.username, payload.email, payload.password)
return 201, user
@router.patch("/{user_id}", response=UserSchema)
def update_user(request, user_id: int, payload: UserUpdateSchema):
user = get_object_or_404(AppUser, id=user_id)
for attr, value in payload.dict(exclude_unset=True).items():
setattr(user, attr, value)
user.save()
return user

0
api/tests/__init__.py Normal file
View File

67
api/tests/test_channel.py Normal file
View File

@@ -0,0 +1,67 @@
import pytest
from django.test import Client
from core.models import AppUser, Library, Channel
@pytest.fixture
def client():
return Client()
@pytest.fixture
def user(db):
return AppUser.objects.create_user(username="apiuser2", email="api2@example.com", password="password")
@pytest.fixture
def library(db, user):
return Library.objects.create(owner_user=user, name="Network Lib")
@pytest.fixture
def channel(db, user, library):
return Channel.objects.create(
owner_user=user,
library=library,
name="Test Channel",
slug="test-ch",
channel_number=1,
scheduling_mode="template_driven"
)
@pytest.mark.django_db
def test_list_channels(client, channel):
response = client.get("/api/channel/")
assert response.status_code == 200
data = response.json()
assert len(data) == 1
assert data[0]["name"] == "Test Channel"
assert data[0]["slug"] == "test-ch"
@pytest.mark.django_db
def test_get_channel(client, channel):
response = client.get(f"/api/channel/{channel.id}")
assert response.status_code == 200
data = response.json()
assert data["name"] == "Test Channel"
@pytest.mark.django_db
def test_get_channel_not_found(client):
response = client.get("/api/channel/999")
assert response.status_code == 404
@pytest.mark.django_db
def test_create_channel(client, user, library):
payload = {
"name": "New API Channel",
"slug": "new-api-ch",
"channel_number": 5,
"description": "Created via API",
"library_id": library.id,
"owner_user_id": user.id
}
response = client.post("/api/channel/", data=payload, content_type="application/json")
assert response.status_code == 201
data = response.json()
assert data["name"] == "New API Channel"
assert data["slug"] == "new-api-ch"
# Verify it hit the DB
assert Channel.objects.count() == 1
assert Channel.objects.get(id=data["id"]).name == "New API Channel"

54
api/tests/test_library.py Normal file
View File

@@ -0,0 +1,54 @@
import pytest
from django.test import Client
from core.models import AppUser, Library
@pytest.fixture
def client():
return Client()
@pytest.fixture
def user(db):
return AppUser.objects.create_user(username="apiuser", email="api@example.com", password="password")
@pytest.fixture
def library(db, user):
return Library.objects.create(owner_user=user, name="Test Library", visibility="private")
@pytest.mark.django_db
def test_list_libraries(client, library):
response = client.get("/api/library/")
assert response.status_code == 200
data = response.json()
assert len(data) == 1
assert data[0]["name"] == "Test Library"
assert data[0]["owner_user_id"] == library.owner_user.id
@pytest.mark.django_db
def test_get_library(client, library):
response = client.get(f"/api/library/{library.id}")
assert response.status_code == 200
data = response.json()
assert data["name"] == "Test Library"
@pytest.mark.django_db
def test_get_library_not_found(client):
response = client.get("/api/library/999")
assert response.status_code == 404
@pytest.mark.django_db
def test_create_library(client, user):
payload = {
"name": "New API Library",
"description": "Created via API",
"visibility": "public",
"owner_user_id": user.id
}
response = client.post("/api/library/", data=payload, content_type="application/json")
assert response.status_code == 201
data = response.json()
assert data["name"] == "New API Library"
assert data["visibility"] == "public"
# Verify it hit the DB
assert Library.objects.count() == 1
assert Library.objects.get(id=data["id"]).name == "New API Library"

106
api/tests/test_schedule.py Normal file
View File

@@ -0,0 +1,106 @@
import pytest
from django.test import Client
from core.models import AppUser, Library, Channel, ScheduleTemplate
@pytest.fixture
def client():
return Client()
@pytest.fixture
def user(db):
return AppUser.objects.create_user(username="apiuser3", email="api3@example.com", password="password")
@pytest.fixture
def library(db, user):
return Library.objects.create(owner_user=user, name="Schedule Lib")
@pytest.fixture
def channel(db, user, library):
return Channel.objects.create(
owner_user=user,
library=library,
name="Schedule Channel",
slug="sch-ch",
channel_number=2
)
@pytest.fixture
def schedule_template(db, channel):
return ScheduleTemplate.objects.create(
channel=channel,
name="Morning Block",
timezone_name="UTC",
priority=10
)
@pytest.mark.django_db
def test_list_schedule_templates(client, schedule_template):
response = client.get("/api/schedule/template/")
assert response.status_code == 200
data = response.json()
assert len(data) == 1
assert data[0]["name"] == "Morning Block"
assert data[0]["priority"] == 10
@pytest.mark.django_db
def test_get_schedule_template(client, schedule_template):
response = client.get(f"/api/schedule/template/{schedule_template.id}")
assert response.status_code == 200
data = response.json()
assert data["name"] == "Morning Block"
@pytest.mark.django_db
def test_get_schedule_template_not_found(client):
response = client.get("/api/schedule/template/999")
assert response.status_code == 404
@pytest.mark.django_db
def test_create_schedule_template(client, channel):
payload = {
"name": "Evening Block",
"description": "Late night programming",
"timezone_name": "America/New_York",
"priority": 5,
"is_active": True,
"channel_id": channel.id
}
response = client.post("/api/schedule/template/", data=payload, content_type="application/json")
assert response.status_code == 201
data = response.json()
assert data["name"] == "Evening Block"
assert data["timezone_name"] == "America/New_York"
assert ScheduleTemplate.objects.count() == 1
assert ScheduleTemplate.objects.get(id=data["id"]).name == "Evening Block"
@pytest.mark.django_db
def test_generate_schedule(client, channel, schedule_template):
# Setup some media for it to find
from core.models import MediaSource, MediaItem
source = MediaSource.objects.create(library=channel.library, name="G", uri="/tmp", source_type="local_directory")
MediaItem.objects.create(media_source=source, title="M1", item_kind="movie", runtime_seconds=3600, file_path="1")
MediaItem.objects.create(media_source=source, title="M2", item_kind="movie", runtime_seconds=3600, file_path="2")
# Add a schedule block to the template (created in fixture)
from core.models import ScheduleBlock
from datetime import time
t = schedule_template
ScheduleBlock.objects.create(
schedule_template=t,
name="TestBlock",
block_type="programming",
start_local_time=time(0, 0),
end_local_time=time(23, 59),
day_of_week_mask=127 # All days
)
payload = {"target_date": "2026-03-08"}
response = client.post(f"/api/schedule/generate/{channel.id}", data=payload, content_type="application/json")
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
# Should have scheduled both movies back to back until the block ends
assert data["airings_created"] > 0
from core.models import Airing
assert Airing.objects.filter(channel=channel).count() == data["airings_created"]

67
api/tests/test_user.py Normal file
View File

@@ -0,0 +1,67 @@
import pytest
from django.test import Client
from core.models import AppUser
@pytest.fixture
def api_client():
return Client()
@pytest.fixture
def test_user():
return AppUser.objects.create_user(
username="test_api_user",
email="test@api.tv",
password="testpassword123"
)
@pytest.mark.django_db
def test_list_users(api_client, test_user):
response = api_client.get("/api/user/")
assert response.status_code == 200
data = response.json()
assert len(data) >= 1
assert any(u["username"] == "test_api_user" for u in data)
@pytest.mark.django_db
def test_get_user(api_client, test_user):
response = api_client.get(f"/api/user/{test_user.id}")
assert response.status_code == 200
data = response.json()
assert data["username"] == "test_api_user"
assert "password" not in data # Ensure we don't leak passwords
@pytest.mark.django_db
def test_create_user(api_client):
payload = {
"username": "new_viewer",
"email": "viewer@api.tv",
"password": "securepassword",
"is_superuser": False
}
response = api_client.post("/api/user/", data=payload, content_type="application/json")
assert response.status_code == 201
data = response.json()
assert data["username"] == "new_viewer"
# Verify DB
user = AppUser.objects.get(id=data["id"])
assert user.username == "new_viewer"
assert not user.is_superuser
assert user.check_password("securepassword")
@pytest.mark.django_db
def test_update_user(api_client, test_user):
payload = {
"email": "updated@api.tv",
"is_active": False
}
response = api_client.patch(f"/api/user/{test_user.id}", data=payload, content_type="application/json")
assert response.status_code == 200
data = response.json()
assert data["email"] == "updated@api.tv"
assert data["is_active"] is False
# Verify DB
test_user.refresh_from_db()
assert test_user.email == "updated@api.tv"
assert not test_user.is_active

3
api/views.py Normal file
View File

@@ -0,0 +1,3 @@
from django.shortcuts import render
# Create your views here.