diff --git a/backend/routers/discovery.py b/backend/routers/discovery.py index 655b20a..22657bc 100644 --- a/backend/routers/discovery.py +++ b/backend/routers/discovery.py @@ -56,9 +56,32 @@ def list_discovery( ORDER BY dq.score DESC LIMIT :limit OFFSET :offset """), - {"user_id": current_user.id, "limit": limit, "offset": offset}, + {"user_id": current_user.id, "limit": limit * 3, "offset": offset}, ).mappings().all() + # Load negative affinity tags and use them to filter channels already in the queue + neg_affinity = { + r["tag"] for r in db.execute( + text("SELECT tag FROM user_tag_affinity WHERE user_id = :user_id AND score < -2"), + {"user_id": current_user.id}, + ).mappings().all() + } + if neg_affinity and rows: + channel_ids_csv = ",".join(str(r["channel_id"]) for r in rows) + vtag_rows = db.execute( + text(f"SELECT channel_id, tags FROM videos WHERE channel_id IN ({channel_ids_csv}) AND tags IS NOT NULL LIMIT 1000") + ).mappings().all() + neg_hit: dict[int, int] = {} + for vr in vtag_rows: + try: + for tag in json.loads(vr["tags"] or "[]"): + if isinstance(tag, str) and tag.lower().strip() in neg_affinity: + neg_hit[vr["channel_id"]] = neg_hit.get(vr["channel_id"], 0) + 1 + except (json.JSONDecodeError, TypeError): + pass + rows = [r for r in rows if neg_hit.get(r["channel_id"], 0) < 3] + + rows = rows[:limit] items = [] for row in rows: row = dict(row) diff --git a/backend/services/discovery.py b/backend/services/discovery.py index 40fe750..5ac37c1 100644 --- a/backend/services/discovery.py +++ b/backend/services/discovery.py @@ -100,6 +100,7 @@ def _add_to_discovery( def _search_and_store( db: Session, user_id: int, queries: list[str], followed_yt_ids: set[str], score_multiplier: float, source: str, + neg_affinity_tags: frozenset[str] = frozenset(), ): """Run YouTube searches for the given queries and add results to discovery.""" discovered: dict[str, dict] = {} @@ -145,6 +146,24 @@ def _search_and_store( uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first() if uc and uc.status in ("followed", "dismissed"): continue + + # Skip channels whose indexed videos heavily overlap with negatively-rated tags + if neg_affinity_tags and not is_new and channel.crawled_at: + neg_hit = 0 + vtags = db.execute( + text("SELECT tags FROM videos WHERE channel_id = :cid AND tags IS NOT NULL LIMIT 20"), + {"cid": channel.id}, + ).scalars().all() + for tags_json in vtags: + try: + for tag in json.loads(tags_json or "[]"): + if isinstance(tag, str) and tag.lower().strip() in neg_affinity_tags: + neg_hit += 1 + except (json.JSONDecodeError, TypeError): + pass + if neg_hit >= 3: + continue + preview_json = json.dumps(info["previews"]) if info["previews"] else None _add_to_discovery( db, user_id, channel.id, @@ -245,7 +264,14 @@ def crawl_by_search(db: Session, user_id: int): if not queries: return - _search_and_store(db, user_id, queries, followed_yt_ids, score_multiplier=5.0, source="search") + neg_tags = frozenset( + r["tag"] for r in db.execute( + text("SELECT tag FROM user_tag_affinity WHERE user_id = :user_id AND score < -2"), + {"user_id": user_id}, + ).mappings().all() + ) + _search_and_store(db, user_id, queries, followed_yt_ids, score_multiplier=5.0, source="search", + neg_affinity_tags=neg_tags) def update_community_signal(db: Session, user_id: int): @@ -355,7 +381,14 @@ def update_liked_signal(db: Session, user_id: int): ).scalars().all()) top_tags = [t for t, _ in sorted(tag_counts.items(), key=lambda x: -x[1])[:6]] - _search_and_store(db, user_id, top_tags, followed_yt_ids, score_multiplier=10.0, source="liked") + neg_tags = frozenset( + r["tag"] for r in db.execute( + text("SELECT tag FROM user_tag_affinity WHERE user_id = :user_id AND score < -2"), + {"user_id": user_id}, + ).mappings().all() + ) + _search_and_store(db, user_id, top_tags, followed_yt_ids, score_multiplier=10.0, source="liked", + neg_affinity_tags=neg_tags) def update_watch_signal(db: Session, user_id: int): @@ -408,40 +441,27 @@ def update_watch_signal(db: Session, user_id: int): ).scalars().all()) top_tags = [t for t, _ in sorted(qualified.items(), key=lambda x: -x[1])[:6]] - - _search_and_store(db, user_id, top_tags, followed_yt_ids, score_multiplier=3.0, source="watched") + neg_tags = frozenset( + r["tag"] for r in db.execute( + text("SELECT tag FROM user_tag_affinity WHERE user_id = :user_id AND score < -2"), + {"user_id": user_id}, + ).mappings().all() + ) + _search_and_store(db, user_id, top_tags, followed_yt_ids, score_multiplier=3.0, source="watched", + neg_affinity_tags=neg_tags) def _build_user_tag_profile(db: Session, user_id: int) -> dict[str, float]: - """Return a weighted tag dict from liked (weight 3) + watched (weight 1) videos.""" + """Return tag affinity dict (positive = liked, negative = disliked/dismissed).""" rows = db.execute( - text(""" - SELECT v.tags, MAX(uv.liked) AS liked - FROM user_videos uv - JOIN videos v ON uv.video_id = v.id - WHERE uv.user_id = :user_id AND (uv.liked = 1 OR uv.watched = 1) - AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]' - GROUP BY v.id - """), + text("SELECT tag, score FROM user_tag_affinity WHERE user_id = :user_id"), {"user_id": user_id}, ).mappings().all() - - profile: dict[str, float] = {} - for row in rows: - weight = 3.0 if row["liked"] else 1.0 - try: - for tag in json.loads(row["tags"]): - if isinstance(tag, str): - t = tag.lower().strip() - if 3 <= len(t) <= 40: - profile[t] = profile.get(t, 0.0) + weight - except (json.JSONDecodeError, TypeError): - pass - return profile + return {row["tag"]: row["score"] for row in rows} def _tag_relevance_score(tag_profile: dict[str, float], tags_json: str | None) -> float: - """Score a candidate channel's tags against the user's interest profile.""" + """Score a channel's tags against user affinity — positive means relevant, negative means disliked.""" if not tag_profile or not tags_json: return 0.0 try: @@ -453,35 +473,7 @@ def _tag_relevance_score(tag_profile: dict[str, float], tags_json: str | None) - if isinstance(tag, str): t = tag.lower().strip() score += tag_profile.get(t, 0.0) - return min(score, 50.0) - - -def _dismissed_channel_tags(db: Session, user_id: int) -> set[str]: - """Collect tags of channels this user explicitly dismissed — used to avoid similar content.""" - rows = db.execute( - text(""" - SELECT v.tags - FROM user_channels uc - JOIN videos v ON v.channel_id = uc.channel_id - WHERE uc.user_id = :user_id AND uc.status = 'dismissed' - AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]' - LIMIT 500 - """), - {"user_id": user_id}, - ).mappings().all() - - bad_tags: dict[str, int] = {} - for row in rows: - try: - for tag in json.loads(row["tags"]): - if isinstance(tag, str): - t = tag.lower().strip() - if 3 <= len(t) <= 40: - bad_tags[t] = bad_tags.get(t, 0) + 1 - except (json.JSONDecodeError, TypeError): - pass - # Only include tags that appeared in 3+ dismissed-channel videos (strong signal) - return {t for t, c in bad_tags.items() if c >= 3} + return max(-100.0, min(score, 50.0)) def update_trending_signal(db: Session, user_id: int, regions: list[str]): @@ -490,7 +482,6 @@ def update_trending_signal(db: Session, user_id: int, regions: list[str]): return tag_profile = _build_user_tag_profile(db, user_id) - dismiss_tags = _dismissed_channel_tags(db, user_id) followed_yt_ids = set(db.execute( text(""" @@ -556,10 +547,10 @@ def update_trending_signal(db: Session, user_id: int, regions: list[str]): if uc and uc.status in ("followed", "dismissed"): continue - # Score: base ×4 per region × count, boosted by tag relevance, penalised by dismiss-tag overlap base_score = float(info["count"]) * 4.0 * len(info["regions"]) - # Tag relevance boost (requires channel to have indexed videos) + # Tag relevance: positive for liked content, negative for dismissed/disliked. + # tag_profile comes from user_tag_affinity which tracks both signals. tag_boost = 0.0 if not is_new and channel.crawled_at: tag_rows = db.execute( @@ -568,25 +559,8 @@ def update_trending_signal(db: Session, user_id: int, regions: list[str]): ).scalars().all() for tags_json in tag_rows: tag_boost += _tag_relevance_score(tag_profile, tags_json) - tag_boost = min(tag_boost, 30.0) - # Dismiss penalty: if channel's tags overlap heavily with dismissed content, reduce score - dismiss_penalty = 0.0 - if dismiss_tags and not is_new: - tag_rows2 = db.execute( - text("SELECT tags FROM videos WHERE channel_id = :cid AND tags IS NOT NULL LIMIT 20"), - {"cid": channel.id}, - ).scalars().all() - for tags_json in tag_rows2: - try: - for tag in json.loads(tags_json or "[]"): - if isinstance(tag, str) and tag.lower().strip() in dismiss_tags: - dismiss_penalty += 5.0 - except (json.JSONDecodeError, TypeError): - pass - dismiss_penalty = min(dismiss_penalty, base_score * 0.8) - - final_score = base_score + tag_boost - dismiss_penalty + final_score = base_score + tag_boost if final_score <= 0: continue