fix: stop discovery from bursting dozens of yt-dlp calls inside one task
Each search/graph/trending task was calling _fetch_and_index_channel inline for up to 10-15 newly discovered channels, each making up to 4 yt-dlp calls (1 channel metadata + 3 individual video fetches for dateless entries). This bypassed the 30-90 s worker gap, producing bursts of 40-60 calls in rapid succession and hammering YouTube. Changes: - _fetch_and_index_channel: removed the dateless-video individual fetch loop — one call per channel, videos without published_at are simply skipped at discovery time - _search_and_store and _fetch_graph_for_channel: queue channel indexing as separate worker tasks (3 and 2 respectively) so the 30-90 s gap applies between every yt-dlp call, including channel indexing - update_trending_signal and update_graph_signal (old sync path): removed inline _fetch_and_index_channel loops (15 and 10 channels) - _discovery_task in channels.py: replaced run_full_discovery (old synchronous path) with schedule_discovery so sync-all and follow-by-url go through the queue system Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -23,7 +23,7 @@ _worker_lock = _threading.Lock()
|
||||
|
||||
|
||||
def _fetch_and_index_channel(db: Session, channel: Channel):
|
||||
"""Fetch full metadata + recent videos for a discovered channel."""
|
||||
"""Fetch metadata + recent videos for a discovered channel (one yt-dlp call only)."""
|
||||
try:
|
||||
result = ytdlp.fetch_channel_metadata(channel.youtube_channel_id, max_videos=10, polite=True)
|
||||
if not result:
|
||||
@@ -34,32 +34,9 @@ def _fetch_and_index_channel(db: Session, channel: Channel):
|
||||
setattr(channel, k, v)
|
||||
channel.crawled_at = datetime.utcnow()
|
||||
|
||||
videos = result.get("videos", [])
|
||||
|
||||
# For videos missing a date (RSS didn't cover them or flat-playlist had no timestamp),
|
||||
# do individual fetches — capped at 3 to avoid slow-downs.
|
||||
dateless = [v for v in videos if not v.get("published_at")]
|
||||
individual_fetched: dict[str, dict] = {}
|
||||
for vdata in dateless[:3]:
|
||||
for vdata in result.get("videos", []):
|
||||
yt_id = vdata.get("youtube_video_id")
|
||||
if not yt_id:
|
||||
continue
|
||||
try:
|
||||
meta = ytdlp.fetch_video_metadata(yt_id, polite=True)
|
||||
if meta and meta.get("published_at"):
|
||||
individual_fetched[yt_id] = meta
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
for vdata in videos:
|
||||
yt_id = vdata.get("youtube_video_id")
|
||||
if not yt_id:
|
||||
continue
|
||||
# Prefer individually-fetched metadata if we retrieved it
|
||||
if yt_id in individual_fetched:
|
||||
vdata = individual_fetched[yt_id]
|
||||
# Skip videos we still can't date — undated videos break feed ordering
|
||||
if not vdata.get("published_at"):
|
||||
if not yt_id or not vdata.get("published_at"):
|
||||
continue
|
||||
if not db.query(Video).filter_by(youtube_video_id=yt_id).first():
|
||||
db.add(Video(
|
||||
@@ -194,10 +171,9 @@ def _search_and_store(
|
||||
|
||||
db.commit()
|
||||
|
||||
for channel_id in needs_indexing[:10]:
|
||||
channel = db.query(Channel).filter_by(id=channel_id).first()
|
||||
if channel:
|
||||
_fetch_and_index_channel(db, channel)
|
||||
# Queue channel indexing as separate worker tasks (30-90 s gaps apply).
|
||||
for channel_id in needs_indexing[:3]:
|
||||
_task_queue.put((user_id, lambda cid=channel_id: _do_task_index_channel(user_id, cid)))
|
||||
|
||||
|
||||
def crawl_by_search(db: Session, user_id: int):
|
||||
@@ -599,11 +575,6 @@ def update_trending_signal(db: Session, user_id: int, regions: list[str]):
|
||||
|
||||
db.commit()
|
||||
|
||||
for channel_id in needs_indexing[:10]:
|
||||
channel = db.query(Channel).filter_by(id=channel_id).first()
|
||||
if channel:
|
||||
_fetch_and_index_channel(db, channel)
|
||||
|
||||
|
||||
def update_graph_signal(db: Session, user_id: int):
|
||||
"""Discover channels featured on followed channels' /channels tab.
|
||||
@@ -664,11 +635,6 @@ def update_graph_signal(db: Session, user_id: int):
|
||||
|
||||
db.commit()
|
||||
|
||||
for channel_id in needs_indexing[:15]:
|
||||
channel = db.query(Channel).filter_by(id=channel_id).first()
|
||||
if channel:
|
||||
_fetch_and_index_channel(db, channel)
|
||||
|
||||
|
||||
def run_full_discovery(db: Session, user_id: int, regions: list[str] | None = None):
|
||||
if regions is None:
|
||||
@@ -788,10 +754,8 @@ def _fetch_graph_for_channel(db: Session, user_id: int, source_yt_id: str):
|
||||
|
||||
db.commit()
|
||||
|
||||
for channel_id in needs_indexing[:3]:
|
||||
ch = db.query(Channel).filter_by(id=channel_id).first()
|
||||
if ch:
|
||||
_fetch_and_index_channel(db, ch)
|
||||
for channel_id in needs_indexing[:2]:
|
||||
_task_queue.put((user_id, lambda cid=channel_id: _do_task_index_channel(user_id, cid)))
|
||||
|
||||
|
||||
def _do_task_graph(user_id: int, source_yt_id: str):
|
||||
@@ -803,6 +767,19 @@ def _do_task_graph(user_id: int, source_yt_id: str):
|
||||
db.close()
|
||||
|
||||
|
||||
def _do_task_index_channel(user_id: int, channel_id: int):
|
||||
"""Index one newly-discovered channel (one yt-dlp call). Queued as a separate
|
||||
worker task so the 30-90 s gap applies rather than bursting inline."""
|
||||
from ..database import SessionLocal
|
||||
db = SessionLocal()
|
||||
try:
|
||||
channel = db.query(Channel).filter_by(id=channel_id).first()
|
||||
if channel:
|
||||
_fetch_and_index_channel(db, channel)
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
def _worker_loop():
|
||||
while True:
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user