117 lines
4.5 KiB
Python
117 lines
4.5 KiB
Python
import pytest
|
|
from datetime import date, timedelta, datetime, timezone
|
|
from django.utils import timezone as django_timezone
|
|
|
|
from core.models import AppUser, Library, Channel, ScheduleTemplate, ScheduleBlock, MediaSource, MediaItem, ChannelSourceRule, Airing
|
|
from core.services.scheduler import ScheduleGenerator
|
|
|
|
@pytest.mark.django_db
|
|
def test_repeat_gap_hours():
|
|
# Setup user, library, channel
|
|
user = AppUser.objects.create_user(username="rule_tester", password="pw")
|
|
library = Library.objects.create(owner_user=user, name="Rule Test Lib")
|
|
channel = Channel.objects.create(
|
|
owner_user=user, library=library, name="Rule TV", slug="r-tv", channel_number=99, timezone_name="UTC"
|
|
)
|
|
|
|
source = MediaSource.objects.create(
|
|
library=library,
|
|
name="Gap Source",
|
|
source_type="local_directory",
|
|
uri="/tmp/fake",
|
|
min_repeat_gap_hours=5
|
|
)
|
|
|
|
# Create 6 items (each 1 hour long) -> 6 hours total duration
|
|
# This means a 5 hour gap is mathematically satisfiable without breaking cooldowns.
|
|
for i in range(6):
|
|
MediaItem.objects.create(
|
|
media_source=source,
|
|
title=f"Vid {i}",
|
|
item_kind="movie",
|
|
runtime_seconds=3600, # 1 hour
|
|
file_path=f"/tmp/fake/{i}.mp4",
|
|
cached_file_path=f"/tmp/fake/{i}.mp4",
|
|
is_active=True
|
|
)
|
|
|
|
ChannelSourceRule.objects.create(channel=channel, media_source=source, rule_mode="allow", weight=1.0)
|
|
template = ScheduleTemplate.objects.create(channel=channel, name="T", priority=10, is_active=True)
|
|
block = ScheduleBlock.objects.create(
|
|
schedule_template=template, name="All Day", block_type="programming",
|
|
start_local_time="00:00:00", end_local_time="12:00:00", day_of_week_mask=127
|
|
)
|
|
|
|
target_date = django_timezone.now().date()
|
|
gen = ScheduleGenerator(channel)
|
|
created = gen.generate_for_date(target_date)
|
|
|
|
assert created == 12 # 12 1-hour slots
|
|
|
|
airings = list(Airing.objects.filter(channel=channel).order_by('starts_at'))
|
|
|
|
# Assert that NO item is repeated within 5 hours of itself
|
|
last_played = {}
|
|
for a in airings:
|
|
if a.media_item_id in last_played:
|
|
gap = (a.starts_at - last_played[a.media_item_id]).total_seconds() / 3600.0
|
|
assert gap >= 5.0, f"Item {a.media_item_id} played too soon. Gap: {gap} hours"
|
|
last_played[a.media_item_id] = a.starts_at
|
|
|
|
@pytest.mark.django_db
|
|
def test_youtube_age_filter():
|
|
from unittest.mock import patch
|
|
from core.services.youtube import sync_source
|
|
user = AppUser.objects.create_user(username="rule_tester2", password="pw")
|
|
library = Library.objects.create(owner_user=user, name="Rule Test Lib 2")
|
|
|
|
source = MediaSource.objects.create(
|
|
library=library,
|
|
name="Old Videos Only",
|
|
source_type="youtube_playlist",
|
|
uri="https://www.youtube.com/playlist?list=fake_playlist",
|
|
max_age_days=10 # Anything older than 10 days is skipped
|
|
)
|
|
|
|
# Mock yt-dlp extract_info to return 3 videos
|
|
# 1. 2 days old (keep)
|
|
# 2. 15 days old (skip)
|
|
# 3. 5 days old (keep)
|
|
from datetime import date, timedelta
|
|
today = date.today()
|
|
|
|
with patch("core.services.youtube.yt_dlp.YoutubeDL") as mock_ydl:
|
|
mock_instance = mock_ydl.return_value.__enter__.return_value
|
|
mock_instance.extract_info.return_value = {
|
|
"entries": [
|
|
{
|
|
"id": "vid1",
|
|
"title": "New Video",
|
|
"duration": 600,
|
|
"upload_date": (today - timedelta(days=2)).strftime("%Y%m%d")
|
|
},
|
|
{
|
|
"id": "vid2",
|
|
"title": "Old Video",
|
|
"duration": 600,
|
|
"upload_date": (today - timedelta(days=15)).strftime("%Y%m%d")
|
|
},
|
|
{
|
|
"id": "vid3",
|
|
"title": "Medium Video",
|
|
"duration": 600,
|
|
"upload_date": (today - timedelta(days=5)).strftime("%Y%m%d")
|
|
}
|
|
]
|
|
}
|
|
|
|
res = sync_source(source, max_videos=5)
|
|
|
|
assert res['created'] == 2 # Only vid1 and vid3
|
|
assert res['skipped'] == 1 # vid2 skipped due to age
|
|
|
|
# Verify the items in DB
|
|
items = list(MediaItem.objects.filter(media_source=source).order_by('title'))
|
|
assert len(items) == 2
|
|
assert "vid2" not in [item.title for item in items]
|