Fix discovery to actually use negative affinity signals
Previously the engine was blind to dislikes/dismissals: - _build_user_tag_profile only used liked/watched (positive only) - dismiss_penalty was capped at 80% so hated content still surfaced - _search_and_store had zero affinity filtering, any YouTube result entered the queue - user_tag_affinity negative scores (written by dismiss/dislike) were never read Now: - _build_user_tag_profile reads directly from user_tag_affinity (positive + negative) - _tag_relevance_score returns negative values, so disliked-tag channels score below zero and get dropped - _search_and_store skips channels whose indexed videos match 3+ negatively-rated tags - list_discovery post-filters channels already in the queue using the same neg-affinity check - Removed the old _dismissed_channel_tags + dismiss_penalty (superseded) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -56,9 +56,32 @@ def list_discovery(
|
||||
ORDER BY dq.score DESC
|
||||
LIMIT :limit OFFSET :offset
|
||||
"""),
|
||||
{"user_id": current_user.id, "limit": limit, "offset": offset},
|
||||
{"user_id": current_user.id, "limit": limit * 3, "offset": offset},
|
||||
).mappings().all()
|
||||
|
||||
# Load negative affinity tags and use them to filter channels already in the queue
|
||||
neg_affinity = {
|
||||
r["tag"] for r in db.execute(
|
||||
text("SELECT tag FROM user_tag_affinity WHERE user_id = :user_id AND score < -2"),
|
||||
{"user_id": current_user.id},
|
||||
).mappings().all()
|
||||
}
|
||||
if neg_affinity and rows:
|
||||
channel_ids_csv = ",".join(str(r["channel_id"]) for r in rows)
|
||||
vtag_rows = db.execute(
|
||||
text(f"SELECT channel_id, tags FROM videos WHERE channel_id IN ({channel_ids_csv}) AND tags IS NOT NULL LIMIT 1000")
|
||||
).mappings().all()
|
||||
neg_hit: dict[int, int] = {}
|
||||
for vr in vtag_rows:
|
||||
try:
|
||||
for tag in json.loads(vr["tags"] or "[]"):
|
||||
if isinstance(tag, str) and tag.lower().strip() in neg_affinity:
|
||||
neg_hit[vr["channel_id"]] = neg_hit.get(vr["channel_id"], 0) + 1
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
pass
|
||||
rows = [r for r in rows if neg_hit.get(r["channel_id"], 0) < 3]
|
||||
|
||||
rows = rows[:limit]
|
||||
items = []
|
||||
for row in rows:
|
||||
row = dict(row)
|
||||
|
||||
@@ -100,6 +100,7 @@ def _add_to_discovery(
|
||||
def _search_and_store(
|
||||
db: Session, user_id: int, queries: list[str],
|
||||
followed_yt_ids: set[str], score_multiplier: float, source: str,
|
||||
neg_affinity_tags: frozenset[str] = frozenset(),
|
||||
):
|
||||
"""Run YouTube searches for the given queries and add results to discovery."""
|
||||
discovered: dict[str, dict] = {}
|
||||
@@ -145,6 +146,24 @@ def _search_and_store(
|
||||
uc = db.query(UserChannel).filter_by(user_id=user_id, channel_id=channel.id).first()
|
||||
if uc and uc.status in ("followed", "dismissed"):
|
||||
continue
|
||||
|
||||
# Skip channels whose indexed videos heavily overlap with negatively-rated tags
|
||||
if neg_affinity_tags and not is_new and channel.crawled_at:
|
||||
neg_hit = 0
|
||||
vtags = db.execute(
|
||||
text("SELECT tags FROM videos WHERE channel_id = :cid AND tags IS NOT NULL LIMIT 20"),
|
||||
{"cid": channel.id},
|
||||
).scalars().all()
|
||||
for tags_json in vtags:
|
||||
try:
|
||||
for tag in json.loads(tags_json or "[]"):
|
||||
if isinstance(tag, str) and tag.lower().strip() in neg_affinity_tags:
|
||||
neg_hit += 1
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
pass
|
||||
if neg_hit >= 3:
|
||||
continue
|
||||
|
||||
preview_json = json.dumps(info["previews"]) if info["previews"] else None
|
||||
_add_to_discovery(
|
||||
db, user_id, channel.id,
|
||||
@@ -245,7 +264,14 @@ def crawl_by_search(db: Session, user_id: int):
|
||||
if not queries:
|
||||
return
|
||||
|
||||
_search_and_store(db, user_id, queries, followed_yt_ids, score_multiplier=5.0, source="search")
|
||||
neg_tags = frozenset(
|
||||
r["tag"] for r in db.execute(
|
||||
text("SELECT tag FROM user_tag_affinity WHERE user_id = :user_id AND score < -2"),
|
||||
{"user_id": user_id},
|
||||
).mappings().all()
|
||||
)
|
||||
_search_and_store(db, user_id, queries, followed_yt_ids, score_multiplier=5.0, source="search",
|
||||
neg_affinity_tags=neg_tags)
|
||||
|
||||
|
||||
def update_community_signal(db: Session, user_id: int):
|
||||
@@ -355,7 +381,14 @@ def update_liked_signal(db: Session, user_id: int):
|
||||
).scalars().all())
|
||||
|
||||
top_tags = [t for t, _ in sorted(tag_counts.items(), key=lambda x: -x[1])[:6]]
|
||||
_search_and_store(db, user_id, top_tags, followed_yt_ids, score_multiplier=10.0, source="liked")
|
||||
neg_tags = frozenset(
|
||||
r["tag"] for r in db.execute(
|
||||
text("SELECT tag FROM user_tag_affinity WHERE user_id = :user_id AND score < -2"),
|
||||
{"user_id": user_id},
|
||||
).mappings().all()
|
||||
)
|
||||
_search_and_store(db, user_id, top_tags, followed_yt_ids, score_multiplier=10.0, source="liked",
|
||||
neg_affinity_tags=neg_tags)
|
||||
|
||||
|
||||
def update_watch_signal(db: Session, user_id: int):
|
||||
@@ -408,40 +441,27 @@ def update_watch_signal(db: Session, user_id: int):
|
||||
).scalars().all())
|
||||
|
||||
top_tags = [t for t, _ in sorted(qualified.items(), key=lambda x: -x[1])[:6]]
|
||||
|
||||
_search_and_store(db, user_id, top_tags, followed_yt_ids, score_multiplier=3.0, source="watched")
|
||||
neg_tags = frozenset(
|
||||
r["tag"] for r in db.execute(
|
||||
text("SELECT tag FROM user_tag_affinity WHERE user_id = :user_id AND score < -2"),
|
||||
{"user_id": user_id},
|
||||
).mappings().all()
|
||||
)
|
||||
_search_and_store(db, user_id, top_tags, followed_yt_ids, score_multiplier=3.0, source="watched",
|
||||
neg_affinity_tags=neg_tags)
|
||||
|
||||
|
||||
def _build_user_tag_profile(db: Session, user_id: int) -> dict[str, float]:
|
||||
"""Return a weighted tag dict from liked (weight 3) + watched (weight 1) videos."""
|
||||
"""Return tag affinity dict (positive = liked, negative = disliked/dismissed)."""
|
||||
rows = db.execute(
|
||||
text("""
|
||||
SELECT v.tags, MAX(uv.liked) AS liked
|
||||
FROM user_videos uv
|
||||
JOIN videos v ON uv.video_id = v.id
|
||||
WHERE uv.user_id = :user_id AND (uv.liked = 1 OR uv.watched = 1)
|
||||
AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]'
|
||||
GROUP BY v.id
|
||||
"""),
|
||||
text("SELECT tag, score FROM user_tag_affinity WHERE user_id = :user_id"),
|
||||
{"user_id": user_id},
|
||||
).mappings().all()
|
||||
|
||||
profile: dict[str, float] = {}
|
||||
for row in rows:
|
||||
weight = 3.0 if row["liked"] else 1.0
|
||||
try:
|
||||
for tag in json.loads(row["tags"]):
|
||||
if isinstance(tag, str):
|
||||
t = tag.lower().strip()
|
||||
if 3 <= len(t) <= 40:
|
||||
profile[t] = profile.get(t, 0.0) + weight
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
pass
|
||||
return profile
|
||||
return {row["tag"]: row["score"] for row in rows}
|
||||
|
||||
|
||||
def _tag_relevance_score(tag_profile: dict[str, float], tags_json: str | None) -> float:
|
||||
"""Score a candidate channel's tags against the user's interest profile."""
|
||||
"""Score a channel's tags against user affinity — positive means relevant, negative means disliked."""
|
||||
if not tag_profile or not tags_json:
|
||||
return 0.0
|
||||
try:
|
||||
@@ -453,35 +473,7 @@ def _tag_relevance_score(tag_profile: dict[str, float], tags_json: str | None) -
|
||||
if isinstance(tag, str):
|
||||
t = tag.lower().strip()
|
||||
score += tag_profile.get(t, 0.0)
|
||||
return min(score, 50.0)
|
||||
|
||||
|
||||
def _dismissed_channel_tags(db: Session, user_id: int) -> set[str]:
|
||||
"""Collect tags of channels this user explicitly dismissed — used to avoid similar content."""
|
||||
rows = db.execute(
|
||||
text("""
|
||||
SELECT v.tags
|
||||
FROM user_channels uc
|
||||
JOIN videos v ON v.channel_id = uc.channel_id
|
||||
WHERE uc.user_id = :user_id AND uc.status = 'dismissed'
|
||||
AND v.tags IS NOT NULL AND v.tags != '' AND v.tags != '[]'
|
||||
LIMIT 500
|
||||
"""),
|
||||
{"user_id": user_id},
|
||||
).mappings().all()
|
||||
|
||||
bad_tags: dict[str, int] = {}
|
||||
for row in rows:
|
||||
try:
|
||||
for tag in json.loads(row["tags"]):
|
||||
if isinstance(tag, str):
|
||||
t = tag.lower().strip()
|
||||
if 3 <= len(t) <= 40:
|
||||
bad_tags[t] = bad_tags.get(t, 0) + 1
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
pass
|
||||
# Only include tags that appeared in 3+ dismissed-channel videos (strong signal)
|
||||
return {t for t, c in bad_tags.items() if c >= 3}
|
||||
return max(-100.0, min(score, 50.0))
|
||||
|
||||
|
||||
def update_trending_signal(db: Session, user_id: int, regions: list[str]):
|
||||
@@ -490,7 +482,6 @@ def update_trending_signal(db: Session, user_id: int, regions: list[str]):
|
||||
return
|
||||
|
||||
tag_profile = _build_user_tag_profile(db, user_id)
|
||||
dismiss_tags = _dismissed_channel_tags(db, user_id)
|
||||
|
||||
followed_yt_ids = set(db.execute(
|
||||
text("""
|
||||
@@ -556,10 +547,10 @@ def update_trending_signal(db: Session, user_id: int, regions: list[str]):
|
||||
if uc and uc.status in ("followed", "dismissed"):
|
||||
continue
|
||||
|
||||
# Score: base ×4 per region × count, boosted by tag relevance, penalised by dismiss-tag overlap
|
||||
base_score = float(info["count"]) * 4.0 * len(info["regions"])
|
||||
|
||||
# Tag relevance boost (requires channel to have indexed videos)
|
||||
# Tag relevance: positive for liked content, negative for dismissed/disliked.
|
||||
# tag_profile comes from user_tag_affinity which tracks both signals.
|
||||
tag_boost = 0.0
|
||||
if not is_new and channel.crawled_at:
|
||||
tag_rows = db.execute(
|
||||
@@ -568,25 +559,8 @@ def update_trending_signal(db: Session, user_id: int, regions: list[str]):
|
||||
).scalars().all()
|
||||
for tags_json in tag_rows:
|
||||
tag_boost += _tag_relevance_score(tag_profile, tags_json)
|
||||
tag_boost = min(tag_boost, 30.0)
|
||||
|
||||
# Dismiss penalty: if channel's tags overlap heavily with dismissed content, reduce score
|
||||
dismiss_penalty = 0.0
|
||||
if dismiss_tags and not is_new:
|
||||
tag_rows2 = db.execute(
|
||||
text("SELECT tags FROM videos WHERE channel_id = :cid AND tags IS NOT NULL LIMIT 20"),
|
||||
{"cid": channel.id},
|
||||
).scalars().all()
|
||||
for tags_json in tag_rows2:
|
||||
try:
|
||||
for tag in json.loads(tags_json or "[]"):
|
||||
if isinstance(tag, str) and tag.lower().strip() in dismiss_tags:
|
||||
dismiss_penalty += 5.0
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
pass
|
||||
dismiss_penalty = min(dismiss_penalty, base_score * 0.8)
|
||||
|
||||
final_score = base_score + tag_boost - dismiss_penalty
|
||||
final_score = base_score + tag_boost
|
||||
if final_score <= 0:
|
||||
continue
|
||||
|
||||
|
||||
Reference in New Issue
Block a user