Files
FunguyBot/plugins/lastfm.py
T

2022 lines
73 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
This plugin provides comprehensive Last.fm integration for the bot.
It allows users to register their Last.fm username and access rich music analytics.
Commands:
!register <username> - Register your Last.fm username
!np - Show currently playing track (no collapsible)
!recent [user] [limit] - Show recent tracks (default 10, max 50)
!toptracks [user] [period] - Show top tracks (overall/7day/1month/3month/6month/12month)
!topartists [user] [period] - Show top artists
!topalbums [user] [period] - Show top albums
!loved [user] - Show recently loved tracks
!profile [user] - Detailed user profile
!playcount [user] - Total scrobbles
!scrobbles [user] - Detailed scrobbling statistics
!compare <user1> <user2> - Compare musical tastes
!taste [user] - Top artists with taste-o-meter
!friends [user] - Show Last.fm friends
!recommend [user] - Artist recommendations
!similar <artist> - Find similar artists
!tag <tag> - Top artists for a tag/genre
!charts - Global top tracks chart
!tagcloud [user] - Top genre tags
!now - What are registered users playing?
!decades [user] - Favorite decades analysis
!genres [user] - Top genres/tags
!era <year> - Popular tracks from a year
!weekly [user] - Weekly listening report
!monthly [user] - Monthly listening report
!yearly [user] [year] - Yearly listening report
!first <artist> [user] - Find first scrobble of an artist
!concerts [user] - Upcoming concerts for top artists
!radio <artist> - Generate playlist based on artist
!mashup <artist1> <artist2> - Musical connections between artists
!collage [user] [size] - Album art collage (image) using ImageMagick
!listening [user] - Currently listening with album art
!awards [user] - Milestone achievements
!lastfm - Show this help
"""
import logging
import os
import time
import subprocess
import tempfile
import asyncio
import aiohttp
import aiosqlite
import simplematrixbotlib as botlib
from datetime import datetime, timedelta
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
DB_PATH = "lastfm.db"
API_BASE = "http://ws.audioscrobbler.com/2.0/"
VALID_PERIODS = ["overall", "7day", "1month", "3month", "6month", "12month"]
PERIOD_LABELS = {
"overall": "All Time",
"7day": "Last 7 Days",
"1month": "Last Month",
"3month": "Last 3 Months",
"6month": "Last 6 Months",
"12month": "Last Year",
}
# User-Agent to avoid 403/404 from CDNs
HEADERS = {"User-Agent": "FunguyBot/1.0 (Matrix last.fm plugin)"}
# ---------------------------------------------------------------------------
# Database helpers
# ---------------------------------------------------------------------------
async def init_db():
"""Initialize the database with the required tables."""
async with aiosqlite.connect(DB_PATH) as db:
await db.execute("""
CREATE TABLE IF NOT EXISTS user_lastfm (
matrix_user TEXT PRIMARY KEY,
lastfm_user TEXT NOT NULL
)
""")
await db.commit()
async def get_lastfm_username(matrix_user):
"""Get Last.fm username for a Matrix user."""
async with aiosqlite.connect(DB_PATH) as db:
async with db.execute(
"SELECT lastfm_user FROM user_lastfm WHERE matrix_user = ?",
(matrix_user,),
) as cursor:
row = await cursor.fetchone()
return row[0] if row else None
async def set_lastfm_username(matrix_user, lastfm_user):
"""Associate a Last.fm username with a Matrix user."""
async with aiosqlite.connect(DB_PATH) as db:
async with db.execute(
"SELECT lastfm_user FROM user_lastfm WHERE matrix_user = ?",
(matrix_user,),
) as cursor:
row = await cursor.fetchone()
if row:
await db.execute(
"UPDATE user_lastfm SET lastfm_user = ? WHERE matrix_user = ?",
(lastfm_user, matrix_user),
)
else:
await db.execute(
"INSERT INTO user_lastfm (matrix_user, lastfm_user) VALUES (?, ?)",
(matrix_user, lastfm_user),
)
await db.commit()
async def get_all_registered_users():
"""Get all registered Matrix user -> Last.fm user mappings."""
async with aiosqlite.connect(DB_PATH) as db:
async with db.execute("SELECT matrix_user, lastfm_user FROM user_lastfm") as cursor:
rows = await cursor.fetchall()
return {row[0]: row[1] for row in rows}
# ---------------------------------------------------------------------------
# Resolve username: registered user or explicit argument
# ---------------------------------------------------------------------------
async def resolve_username(matrix_user, args, bot, room):
"""
Resolve the Last.fm username from args or registration.
Returns (lastfm_user, display_name) or (None, None) if not resolved.
Sends error message to room if not resolved and bot+room provided.
"""
if args:
lastfm_user = args[0].strip()
display_name = lastfm_user
return lastfm_user, display_name
lastfm_user = await get_lastfm_username(matrix_user)
if not lastfm_user:
if bot and room:
await bot.api.send_text_message(
room.room_id,
"Please register your Last.fm username first with !register <username>\n"
"Or specify a username: !command <username>",
)
return None, None
return lastfm_user, matrix_user
# ---------------------------------------------------------------------------
# API helper
# ---------------------------------------------------------------------------
def get_api_key():
"""Get Last.fm API key from environment."""
api_key = os.getenv("LASTFM_API_KEY")
if not api_key:
logging.error("LASTFM_API_KEY not found in environment variables")
return api_key
async def call_lastfm_api(method, params, bot=None, room=None):
"""
Call the Last.fm API with the given method and params.
Returns JSON data or None on error.
Optionally sends error messages to a room.
"""
api_key = get_api_key()
if not api_key:
if bot and room:
await bot.api.send_text_message(
room.room_id, "❌ Last.fm API key not configured. Set LASTFM_API_KEY."
)
return None
full_params = {
"method": method,
"api_key": api_key,
"format": "json",
**params,
}
try:
async with aiohttp.ClientSession(headers=HEADERS) as session:
async with session.get(API_BASE, params=full_params, timeout=15) as response:
if response.status == 200:
data = await response.json()
if "error" in data:
msg = data.get("message", "Unknown error")
logging.error(f"Last.fm API error ({method}): {msg}")
if bot and room:
await bot.api.send_text_message(room.room_id, f"❌ Last.fm error: {msg}")
return None
return data
else:
logging.error(f"Last.fm API returned status {response.status} for {method}")
if bot and room:
await bot.api.send_text_message(
room.room_id, f"❌ Last.fm API error: HTTP {response.status}"
)
return None
except aiohttp.ClientError as e:
logging.error(f"HTTP error calling Last.fm API ({method}): {e}")
if bot and room:
await bot.api.send_text_message(room.room_id, f"❌ Network error contacting Last.fm: {e}")
return None
except Exception as e:
logging.error(f"Error calling Last.fm API ({method}): {e}")
if bot and room:
await bot.api.send_text_message(room.room_id, f"❌ Error: {e}")
return None
async def get_youtube_link(artist, track_name):
"""Search for a YouTube link for the given artist and track."""
youtube_api_key = os.getenv("YOUTUBE_API_KEY")
if not youtube_api_key:
return None
search_query = f"{artist} {track_name}"
url = "https://www.googleapis.com/youtube/v3/search"
params = {
"part": "snippet",
"q": search_query,
"type": "video",
"key": youtube_api_key,
"maxResults": "1",
}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
items = data.get("items", [])
if items:
video_id = items[0].get("id", {}).get("videoId")
if video_id:
return f"https://www.youtube.com/watch?v={video_id}"
except Exception as e:
logging.error(f"Error searching YouTube: {e}")
return None
# ---------------------------------------------------------------------------
# Safe extraction helpers
# ---------------------------------------------------------------------------
def safe_text(obj, key, default="Unknown"):
"""Safely extract #text from a nested dict."""
if isinstance(obj, dict):
val = obj.get(key, {})
if isinstance(val, dict):
return val.get("#text", default)
if isinstance(val, str):
return val
return default
def safe_int(obj, key, default=0):
"""Safely extract an integer value."""
try:
val = safe_text(obj, key, str(default))
return int(val)
except (ValueError, TypeError):
return default
# ---------------------------------------------------------------------------
# Collapsible wrapper
# ---------------------------------------------------------------------------
def wrap_collapsible(summary, body):
"""Wrap content in a collapsible HTML details block."""
return f"<details><summary><strong>{summary}</strong></summary>{body}</details>"
# ---------------------------------------------------------------------------
# COLLAGE HELPER download image and save to a temp file, return path or None
# ---------------------------------------------------------------------------
# ------------------------------------------------------------
# Helper: safely get artist name from album or track object
# ------------------------------------------------------------
def album_artist_name(album):
"""Extract artist name from an album object (handles both string and dict)."""
artist = album.get("artist", {})
if isinstance(artist, str):
return artist
if isinstance(artist, dict):
# Album API returns 'name', track API returns '#text'
return artist.get("name", artist.get("#text", "Unknown"))
return "Unknown"
# ------------------------------------------------------------
# Download an image to temp file tries direct URL first,
# falls back to album.getInfo + download.
# ------------------------------------------------------------
async def download_album_art_to_file(session, album_data):
"""
Download album art.
album_data is the raw album dict from user.getTopAlbums.
Returns (artist, album_name, filepath) or (artist, album_name, None).
"""
album_name = safe_text(album_data, "name", "Unknown Album")
artist = album_artist_name(album_data)
# 1. Try direct image from the album object (extralarge or any)
direct_url = None
for img in album_data.get("image", []):
if img.get("size") == "extralarge":
direct_url = img.get("#text")
break
if not direct_url:
for img in album_data.get("image", []):
url = img.get("#text")
if url:
direct_url = url
break
if direct_url:
try:
async with session.get(direct_url, timeout=15) as resp:
if resp.status == 200:
content = await resp.read()
if len(content) >= 500:
ext = "jpg" if not direct_url.endswith(".png") else "png"
fd, tmp_path = tempfile.mkstemp(suffix=f".{ext}")
os.close(fd)
with open(tmp_path, "wb") as f:
f.write(content)
logging.info(f"Downloaded '{album_name}' from direct URL")
return (artist, album_name, tmp_path)
except Exception as e:
logging.warning(f"Direct URL failed for '{album_name}': {e}")
# 2. Fallback: album.getInfo
if artist != "Unknown":
try:
params = {
"method": "album.getInfo",
"artist": artist,
"album": album_name,
"autocorrect": "1",
"api_key": get_api_key(),
"format": "json",
}
async with session.get(API_BASE, params=params, timeout=10) as resp:
if resp.status == 200:
data = await resp.json()
album_info = data.get("album", {})
if album_info:
# Get best image from album info
image_url = None
for img in album_info.get("image", []):
if img.get("size") == "extralarge":
image_url = img.get("#text")
break
if not image_url:
for img in album_info.get("image", []):
url = img.get("#text")
if url:
image_url = url
break
if image_url:
async with session.get(image_url, timeout=15) as img_resp:
if img_resp.status == 200:
content = await img_resp.read()
if len(content) >= 500:
ext = "jpg" if not image_url.endswith(".png") else "png"
fd, tmp_path = tempfile.mkstemp(suffix=f".{ext}")
os.close(fd)
with open(tmp_path, "wb") as f:
f.write(content)
logging.info(f"Downloaded '{album_name}' via album.getInfo")
return (artist, album_name, tmp_path)
except Exception as e:
logging.warning(f"album.getInfo fallback failed for '{album_name}': {e}")
return (artist, album_name, None)
# ===================================================================
# COMMAND HANDLERS
# ===================================================================
# ---- !register ---------------------------------------------------
async def cmd_register(room, message, bot, args):
"""Handle !register <lastfm_username>"""
if len(args) < 1:
await bot.api.send_text_message(room.room_id, "Usage: !register <lastfm_username>")
return
lastfm_user = args[0].strip()
matrix_user = str(message.sender)
await set_lastfm_username(matrix_user, lastfm_user)
await bot.api.send_text_message(
room.room_id, f"✅ Registered Last.fm user **{lastfm_user}** for {matrix_user}"
)
logging.info(f"Registered Last.fm user {lastfm_user} for {matrix_user}")
# ---- !np ---------------------------------------------------------
async def cmd_np(room, message, bot, args):
"""Handle !np - Show now playing track. No collapsible."""
matrix_user = str(message.sender)
lastfm_user, display_name = await resolve_username(matrix_user, args, bot, room)
if not lastfm_user:
return
data = await call_lastfm_api("user.getRecentTracks", {"user": lastfm_user, "limit": "1"}, bot, room)
if not data:
return
tracks = data.get("recenttracks", {}).get("track", [])
if not tracks:
await bot.api.send_text_message(room.room_id, f"🔍 No recent tracks found for {lastfm_user}.")
return
track = tracks[0] if isinstance(tracks, list) else tracks
now_playing = track.get("@attr", {}).get("nowplaying", "false") == "true"
artist = safe_text(track, "artist")
name = safe_text(track, "name")
album = safe_text(track, "album", "")
if now_playing:
prefix_icon = "🎵"
action = "is currently playing"
else:
prefix_icon = "🎵"
action = "last played"
if album:
message_text = f"{prefix_icon} **{display_name}** {action}: **{name}** by **{artist}** from *{album}*"
else:
message_text = f"{prefix_icon} **{display_name}** {action}: **{name}** by **{artist}**"
youtube_link = await get_youtube_link(artist, name)
if youtube_link:
message_text += f" | [YouTube]({youtube_link})"
await bot.api.send_markdown_message(room.room_id, message_text)
logging.info(f"Sent now playing for {lastfm_user}")
# ---- !recent -----------------------------------------------------
async def cmd_recent(room, message, bot, args):
"""Handle !recent [user] [limit]"""
matrix_user = str(message.sender)
limit = 10
user_arg = list(args)
# Parse --limit if present
cleaned = []
i = 0
while i < len(args):
if args[i] == "--limit" and i + 1 < len(args):
limit = min(int(args[i + 1]), 50)
i += 2
else:
cleaned.append(args[i])
i += 1
user_arg = cleaned
# Allow limit as last argument if numeric
if user_arg and user_arg[-1].isdigit():
limit = min(int(user_arg[-1]), 50)
user_arg.pop()
lastfm_user, display_name = await resolve_username(matrix_user, user_arg, bot, room)
if not lastfm_user:
return
data = await call_lastfm_api(
"user.getRecentTracks", {"user": lastfm_user, "limit": str(limit)}, bot, room
)
if not data:
return
tracks = data.get("recenttracks", {}).get("track", [])
if not tracks:
await bot.api.send_text_message(room.room_id, f"🔍 No recent tracks for {lastfm_user}.")
return
total = int(data.get("recenttracks", {}).get("@attr", {}).get("total", "0"))
summary = f"🎵 {display_name} — Recent Tracks ({min(limit, len(tracks))} of {total})"
lines = []
for i, track in enumerate(tracks[:limit], 1):
artist = safe_text(track, "artist")
name = safe_text(track, "name")
album = safe_text(track, "album", "")
now = "🔊" if track.get("@attr", {}).get("nowplaying") == "true" else ""
date_str = ""
if "date" in track and "#text" in track["date"]:
date_str = f"{track['date']['#text']}"
album_str = f" | *{album}*" if album else ""
lines.append(f" {i}. {now}**{name}** by {artist}{album_str}{date_str}")
body = "<br>".join(lines)
await bot.api.send_markdown_message(room.room_id, wrap_collapsible(summary, body))
# ---- !toptracks --------------------------------------------------
async def cmd_toptracks(room, message, bot, args):
"""Handle !toptracks [user] [period]"""
matrix_user = str(message.sender)
period = "overall"
user_arg = list(args)
if user_arg and user_arg[-1] in VALID_PERIODS:
period = user_arg.pop()
else:
cleaned = []
i = 0
while i < len(args):
if args[i] in VALID_PERIODS:
period = args[i]
i += 1
else:
cleaned.append(args[i])
i += 1
user_arg = cleaned
lastfm_user, display_name = await resolve_username(matrix_user, user_arg, bot, room)
if not lastfm_user:
return
data = await call_lastfm_api(
"user.getTopTracks",
{"user": lastfm_user, "period": period, "limit": "10"},
bot, room,
)
if not data:
return
tracks = data.get("toptracks", {}).get("track", [])
if not tracks:
await bot.api.send_text_message(room.room_id, f"🔍 No top tracks for {lastfm_user}.")
return
period_label = PERIOD_LABELS.get(period, period)
summary = f"🏆 {display_name} — Top Tracks ({period_label})"
lines = []
for i, track in enumerate(tracks[:10], 1):
artist = safe_text(track, "artist")
name = safe_text(track, "name")
playcount = safe_int(track, "playcount")
lines.append(f" {i}. **{name}** by {artist} — *{playcount} plays*")
body = "<br>".join(lines)
await bot.api.send_markdown_message(room.room_id, wrap_collapsible(summary, body))
# ---- !topartists -------------------------------------------------
async def cmd_topartists(room, message, bot, args):
"""Handle !topartists [user] [period]"""
matrix_user = str(message.sender)
period = "overall"
user_arg = list(args)
if user_arg and user_arg[-1] in VALID_PERIODS:
period = user_arg.pop()
else:
cleaned = []
i = 0
while i < len(args):
if args[i] in VALID_PERIODS:
period = args[i]
i += 1
else:
cleaned.append(args[i])
i += 1
user_arg = cleaned
lastfm_user, display_name = await resolve_username(matrix_user, user_arg, bot, room)
if not lastfm_user:
return
data = await call_lastfm_api(
"user.getTopArtists",
{"user": lastfm_user, "period": period, "limit": "10"},
bot, room,
)
if not data:
return
artists = data.get("topartists", {}).get("artist", [])
if not artists:
await bot.api.send_text_message(room.room_id, f"🔍 No top artists for {lastfm_user}.")
return
period_label = PERIOD_LABELS.get(period, period)
summary = f"🎤 {display_name} — Top Artists ({period_label})"
lines = []
for i, artist in enumerate(artists[:10], 1):
name = safe_text(artist, "name")
playcount = safe_int(artist, "playcount")
lines.append(f" {i}. **{name}** — *{playcount} plays*")
body = "<br>".join(lines)
await bot.api.send_markdown_message(room.room_id, wrap_collapsible(summary, body))
# ---- !topalbums --------------------------------------------------
async def cmd_topalbums(room, message, bot, args):
"""Handle !topalbums [user] [period]"""
matrix_user = str(message.sender)
period = "overall"
user_arg = list(args)
if user_arg and user_arg[-1] in VALID_PERIODS:
period = user_arg.pop()
else:
cleaned = []
i = 0
while i < len(args):
if args[i] in VALID_PERIODS:
period = args[i]
i += 1
else:
cleaned.append(args[i])
i += 1
user_arg = cleaned
lastfm_user, display_name = await resolve_username(matrix_user, user_arg, bot, room)
if not lastfm_user:
return
data = await call_lastfm_api(
"user.getTopAlbums",
{"user": lastfm_user, "period": period, "limit": "10"},
bot, room,
)
if not data:
return
albums = data.get("topalbums", {}).get("album", [])
if not albums:
await bot.api.send_text_message(room.room_id, f"🔍 No top albums for {lastfm_user}.")
return
period_label = PERIOD_LABELS.get(period, period)
summary = f"💿 {display_name} — Top Albums ({period_label})"
lines = []
for i, album in enumerate(albums[:10], 1):
artist = safe_text(album, "artist")
name = safe_text(album, "name")
playcount = safe_int(album, "playcount")
lines.append(f" {i}. **{name}** by {artist} — *{playcount} plays*")
body = "<br>".join(lines)
await bot.api.send_markdown_message(room.room_id, wrap_collapsible(summary, body))
# ---- !loved ------------------------------------------------------
async def cmd_loved(room, message, bot, args):
"""Handle !loved [user]"""
matrix_user = str(message.sender)
lastfm_user, display_name = await resolve_username(matrix_user, args, bot, room)
if not lastfm_user:
return
data = await call_lastfm_api(
"user.getLovedTracks", {"user": lastfm_user, "limit": "10"}, bot, room
)
if not data:
return
tracks = data.get("lovedtracks", {}).get("track", [])
if not tracks:
await bot.api.send_text_message(room.room_id, f"💔 No loved tracks for {lastfm_user}.")
return
total = int(data.get("lovedtracks", {}).get("@attr", {}).get("total", "0"))
summary = f"❤️ {display_name} — Loved Tracks ({len(tracks)} of {total})"
lines = []
for i, track in enumerate(tracks[:10], 1):
artist = safe_text(track, "artist")
name = safe_text(track, "name")
date_str = ""
if "date" in track and "#text" in track["date"]:
date_str = f"{track['date']['#text']}"
lines.append(f" {i}. **{name}** by {artist}{date_str}")
body = "<br>".join(lines)
await bot.api.send_markdown_message(room.room_id, wrap_collapsible(summary, body))
# ---- !profile ----------------------------------------------------
async def cmd_profile(room, message, bot, args):
"""Handle !profile [user]"""
matrix_user = str(message.sender)
lastfm_user, display_name = await resolve_username(matrix_user, args, bot, room)
if not lastfm_user:
return
data = await call_lastfm_api("user.getInfo", {"user": lastfm_user}, bot, room)
if not data:
return
user_info = data.get("user", {})
if not user_info:
await bot.api.send_text_message(room.room_id, f"🔍 User {lastfm_user} not found.")
return
real_name = user_info.get("realname", "")
country = user_info.get("country", "Unknown")
playcount = safe_int(user_info, "playcount")
playlists = safe_int(user_info, "playlists")
registered = user_info.get("registered", {}).get("#text", "Unknown")
url = user_info.get("url", "")
subscriber = "" if user_info.get("subscriber", "0") == "1" else ""
summary = f"👤 Profile: {display_name} ({lastfm_user})"
lines = [
f" • **Last.fm:** [{lastfm_user}]({url})" if url else f" • **Last.fm:** {lastfm_user}",
f" • **Real Name:** {real_name}" if real_name else "",
f" • **Country:** {country}",
f" • **Registered:** {registered}",
f" • **Total Plays:** {playcount:,}",
f" • **Playlists:** {playlists}",
f" • **Subscriber:** {subscriber}",
]
lines = [l for l in lines if l]
body = "<br>".join(lines)
await bot.api.send_markdown_message(room.room_id, wrap_collapsible(summary, body))
# ---- !playcount --------------------------------------------------
async def cmd_playcount(room, message, bot, args):
"""Handle !playcount [user] - short output, no collapsible needed."""
matrix_user = str(message.sender)
lastfm_user, display_name = await resolve_username(matrix_user, args, bot, room)
if not lastfm_user:
return
data = await call_lastfm_api("user.getInfo", {"user": lastfm_user}, bot, room)
if not data:
return
playcount = safe_int(data.get("user", {}), "playcount")
await bot.api.send_markdown_message(
room.room_id, f"🔢 **{display_name}** has scrobbled **{playcount:,}** tracks total."
)
# ---- !scrobbles --------------------------------------------------
async def cmd_scrobbles(room, message, bot, args):
"""Handle !scrobbles [user] - detailed scrobbling stats"""
matrix_user = str(message.sender)
lastfm_user, display_name = await resolve_username(matrix_user, args, bot, room)
if not lastfm_user:
return
info_data = await call_lastfm_api("user.getInfo", {"user": lastfm_user}, bot, room)
if not info_data:
return
user_info = info_data.get("user", {})
playcount = safe_int(user_info, "playcount")
registered = user_info.get("registered", {}).get("#text", "Unknown")
artist_count = safe_int(user_info, "artist_count", 0)
recent_data = await call_lastfm_api(
"user.getRecentTracks", {"user": lastfm_user, "limit": "200"}, bot, room
)
today_count = 0
if recent_data:
tracks = recent_data.get("recenttracks", {}).get("track", [])
today = datetime.utcnow().strftime("%d %b %Y")
for track in tracks:
if "date" in track and "#text" in track["date"]:
if today in track["date"]["#text"]:
today_count += 1
try:
reg_date = datetime.strptime(registered, "%d %b %Y")
days_since = max((datetime.utcnow() - reg_date).days, 1)
avg_per_day = round(playcount / days_since, 1)
except (ValueError, TypeError):
avg_per_day = "?"
summary = f"📊 {display_name} — Scrobbling Stats"
lines = [
f" • **Total Scrobbles:** {playcount:,}",
f" • **Unique Artists:** {artist_count:,}" if artist_count else "",
f" • **Registered:** {registered}",
f" • **Avg Scrobbles/Day:** {avg_per_day}",
f" • **Today's Scrobbles:** {today_count}",
]
lines = [l for l in lines if l]
body = "<br>".join(lines)
await bot.api.send_markdown_message(room.room_id, wrap_collapsible(summary, body))
# ---- !compare ----------------------------------------------------
async def cmd_compare(room, message, bot, args):
"""Handle !compare <user1> <user2>"""
if len(args) < 2:
await bot.api.send_text_message(
room.room_id, "Usage: !compare <user1> <user2>\nExample: !compare alice bob"
)
return
user1, user2 = args[0].strip(), args[1].strip()
data1 = await call_lastfm_api(
"user.getTopArtists", {"user": user1, "period": "overall", "limit": "50"}, bot, room
)
data2 = await call_lastfm_api(
"user.getTopArtists", {"user": user2, "period": "overall", "limit": "50"}, bot, room
)
if not data1 or not data2:
return
artists1 = {safe_text(a, "name").lower(): safe_int(a, "playcount")
for a in data1.get("topartists", {}).get("artist", [])}
artists2 = {safe_text(a, "name").lower(): safe_int(a, "playcount")
for a in data2.get("topartists", {}).get("artist", [])}
set1, set2 = set(artists1.keys()), set(artists2.keys())
common = set1 & set2
only1 = set1 - set2
only2 = set2 - set1
similarity = round(len(common) / max(len(set1 | set2), 1) * 100, 1) if (set1 | set2) else 0
summary = f"🔄 Musical Taste Comparison: {user1} vs {user2}"
lines = [
f" • **Taste Similarity:** {similarity}%",
f" • **Common Artists:** {len(common)}",
f" • **Unique to {user1}:** {len(only1)}",
f" • **Unique to {user2}:** {len(only2)}",
]
if common:
top_common = sorted(common, key=lambda a: artists1[a] + artists2.get(a, 0), reverse=True)[:5]
lines.append(f" • **Top Shared:** {', '.join(top_common)}")
body = "<br>".join(lines)
await bot.api.send_markdown_message(room.room_id, wrap_collapsible(summary, body))
# ---- !taste ------------------------------------------------------
async def cmd_taste(room, message, bot, args):
"""Handle !taste [user] - top artists with taste-o-meter"""
matrix_user = str(message.sender)
lastfm_user, display_name = await resolve_username(matrix_user, args, bot, room)
if not lastfm_user:
return
data = await call_lastfm_api(
"user.getTopArtists", {"user": lastfm_user, "period": "overall", "limit": "15"}, bot, room
)
if not data:
return
artists = data.get("topartists", {}).get("artist", [])
if not artists:
await bot.api.send_text_message(room.room_id, f"🔍 No artists found for {lastfm_user}.")
return
total_plays = sum(safe_int(a, "playcount") for a in artists)
if total_plays == 0:
total_plays = 1
summary = f"🎯 {display_name} — Taste-o-Meter"
lines = []
for i, artist in enumerate(artists[:15], 1):
name = safe_text(artist, "name")
pc = safe_int(artist, "playcount")
pct = round(pc / total_plays * 100, 1) if total_plays else 0
bar = "" * min(int(pct * 2), 20)
lines.append(f" {i:2}. **{name}** {bar} {pct}%")
body = "<br>".join(lines)
await bot.api.send_markdown_message(room.room_id, wrap_collapsible(summary, body))
# ---- !friends ----------------------------------------------------
async def cmd_friends(room, message, bot, args):
"""Handle !friends [user]"""
matrix_user = str(message.sender)
lastfm_user, display_name = await resolve_username(matrix_user, args, bot, room)
if not lastfm_user:
return
data = await call_lastfm_api(
"user.getFriends", {"user": lastfm_user, "recenttracks": "1", "limit": "20"}, bot, room
)
if not data:
return
friends = data.get("friends", {}).get("user", [])
if not friends:
await bot.api.send_text_message(room.room_id, f"👥 No friends found for {lastfm_user}.")
return
total = int(data.get("friends", {}).get("@attr", {}).get("total", "0"))
summary = f"👥 {display_name} — Friends ({len(friends)} of {total})"
lines = []
for f in friends[:15]:
fname = safe_text(f, "name")
realname = f.get("realname", "")
now = ""
if "recenttrack" in f:
rt = f["recenttrack"]
now = f" — 🎵 {safe_text(rt, 'artist')} - {safe_text(rt, 'name')}"
display = f"{fname} ({realname})" if realname else fname
lines.append(f" • **{display}**{now}")
body = "<br>".join(lines)
await bot.api.send_markdown_message(room.room_id, wrap_collapsible(summary, body))
# ---- !recommend --------------------------------------------------
async def cmd_recommend(room, message, bot, args):
"""Handle !recommend [user] - artist recommendations"""
matrix_user = str(message.sender)
lastfm_user, display_name = await resolve_username(matrix_user, args, bot, room)
if not lastfm_user:
return
top_data = await call_lastfm_api(
"user.getTopArtists", {"user": lastfm_user, "period": "3month", "limit": "5"}, bot, room
)
if not top_data:
return
top_artists = [safe_text(a, "name") for a in top_data.get("topartists", {}).get("artist", [])]
if not top_artists:
await bot.api.send_text_message(room.room_id, f"🔍 Not enough data for {lastfm_user}.")
return
seen = set(a.lower() for a in top_artists)
recommendations = []
for artist_name in top_artists[:3]:
sim_data = await call_lastfm_api(
"artist.getSimilar", {"artist": artist_name, "limit": "5", "autocorrect": "1"}, bot
)
if sim_data:
for a in sim_data.get("similarartists", {}).get("artist", []):
name = safe_text(a, "name")
match = float(a.get("match", "0"))
if name.lower() not in seen:
seen.add(name.lower())
recommendations.append((name, match, artist_name))
recommendations.sort(key=lambda x: x[1], reverse=True)
recommendations = recommendations[:15]
if not recommendations:
await bot.api.send_text_message(room.room_id, "No recommendations found.")
return
summary = f"💡 Recommendations for {display_name} (based on top artists)"
lines = []
for i, (name, match, source) in enumerate(recommendations, 1):
pct = round(match * 100)
lines.append(f" {i}. **{name}** — {pct}% match (via {source})")
body = "<br>".join(lines)
await bot.api.send_markdown_message(room.room_id, wrap_collapsible(summary, body))
# ---- !similar ----------------------------------------------------
async def cmd_similar(room, message, bot, args):
"""Handle !similar <artist>"""
if not args:
await bot.api.send_text_message(room.room_id, "Usage: !similar <artist>")
return
artist_name = " ".join(args)
data = await call_lastfm_api(
"artist.getSimilar", {"artist": artist_name, "limit": "15", "autocorrect": "1"}, bot, room
)
if not data:
return
artists = data.get("similarartists", {}).get("artist", [])
if not artists:
await bot.api.send_text_message(room.room_id, f"🔍 No similar artists found for **{artist_name}**.")
return
summary = f"🔗 Similar to {artist_name}"
lines = []
for i, a in enumerate(artists[:15], 1):
name = safe_text(a, "name")
match_pct = round(float(a.get("match", "0")) * 100)
lines.append(f" {i}. **{name}** — {match_pct}% match")
body = "<br>".join(lines)
await bot.api.send_markdown_message(room.room_id, wrap_collapsible(summary, body))
# ---- !tag --------------------------------------------------------
async def cmd_tag(room, message, bot, args):
"""Handle !tag <tag>"""
if not args:
await bot.api.send_text_message(room.room_id, "Usage: !tag <tag/genre>\nExample: !tag metal")
return
tag = " ".join(args)
data = await call_lastfm_api(
"tag.getTopArtists", {"tag": tag, "limit": "15"}, bot, room
)
if not data:
return
artists = data.get("topartists", {}).get("artist", [])
if not artists:
await bot.api.send_text_message(room.room_id, f"🔍 No artists found for tag **{tag}**.")
return
summary = f"🏷️ Top Artists tagged '{tag}'"
lines = []
for i, a in enumerate(artists[:15], 1):
name = safe_text(a, "name")
count = safe_int(a, "count")
lines.append(f" {i}. **{name}** — *{count} taggings*")
body = "<br>".join(lines)
await bot.api.send_markdown_message(room.room_id, wrap_collapsible(summary, body))
# ---- !charts -----------------------------------------------------
async def cmd_charts(room, message, bot, args):
"""Handle !charts - global top tracks"""
data = await call_lastfm_api("chart.getTopTracks", {"limit": "10"}, bot, room)
if not data:
return
tracks = data.get("tracks", {}).get("track", [])
if not tracks:
await bot.api.send_text_message(room.room_id, "No chart data available.")
return
summary = "🌍 Global Top Tracks"
lines = []
for i, track in enumerate(tracks[:10], 1):
artist = safe_text(track, "artist")
name = safe_text(track, "name")
listeners = safe_int(track, "listeners")
lines.append(f" {i}. **{name}** by {artist} — *{listeners:,} listeners*")
body = "<br>".join(lines)
await bot.api.send_markdown_message(room.room_id, wrap_collapsible(summary, body))
# ---- !tagcloud ---------------------------------------------------
async def cmd_tagcloud(room, message, bot, args):
"""Handle !tagcloud [user]"""
matrix_user = str(message.sender)
lastfm_user, display_name = await resolve_username(matrix_user, args, bot, room)
if not lastfm_user:
return
data = await call_lastfm_api(
"user.getTopTags", {"user": lastfm_user, "limit": "30"}, bot, room
)
if not data:
return
tags = data.get("toptags", {}).get("tag", [])
if not tags:
await bot.api.send_text_message(room.room_id, f"🔍 No tags found for {lastfm_user}.")
return
summary = f"☁️ {display_name} — Tag Cloud"
tag_strs = []
for tag in tags:
name = safe_text(tag, "name")
count = safe_int(tag, "count")
tag_strs.append(f"{name}({count})")
lines = [" " + "".join(tag_strs[:30])]
body = "<br>".join(lines)
await bot.api.send_markdown_message(room.room_id, wrap_collapsible(summary, body))
# ---- !now --------------------------------------------------------
async def cmd_now(room, message, bot, args):
"""Handle !now - what are all registered users playing?"""
all_users = await get_all_registered_users()
if not all_users:
await bot.api.send_text_message(room.room_id, "No users registered yet.")
return
summary = "🎵 Now Playing Across Registered Users"
lines = []
found = False
for mx_user, lfm_user in all_users.items():
data = await call_lastfm_api(
"user.getRecentTracks", {"user": lfm_user, "limit": "1"}, bot
)
if not data:
continue
tracks = data.get("recenttracks", {}).get("track", [])
if not tracks:
continue
track = tracks[0] if isinstance(tracks, list) else tracks
if track.get("@attr", {}).get("nowplaying") == "true":
artist = safe_text(track, "artist")
name = safe_text(track, "name")
lines.append(f" • **{lfm_user}**: {name} by {artist}")
found = True
if not found:
lines.append(" • Nobody is currently scrobbling.")
body = "<br>".join(lines)
await bot.api.send_markdown_message(room.room_id, wrap_collapsible(summary, body))
# ---- !decades ----------------------------------------------------
async def cmd_decades(room, message, bot, args):
"""Handle !decades [user] - favorite decades"""
matrix_user = str(message.sender)
lastfm_user, display_name = await resolve_username(matrix_user, args, bot, room)
if not lastfm_user:
return
top_data = await call_lastfm_api(
"user.getTopArtists", {"user": lastfm_user, "period": "overall", "limit": "20"}, bot, room
)
if not top_data:
return
artists = top_data.get("topartists", {}).get("artist", [])
if not artists:
await bot.api.send_text_message(room.room_id, f"Not enough data for {lastfm_user}.")
return
decade_counts = {}
for artist_obj in artists[:10]:
artist_name = safe_text(artist_obj, "name")
playcount = safe_int(artist_obj, "playcount")
tag_data = await call_lastfm_api(
"artist.getTopTags", {"artist": artist_name, "autocorrect": "1"}, bot
)
if tag_data:
for tag in tag_data.get("toptags", {}).get("tag", []):
tag_name = safe_text(tag, "name").lower()
if tag_name.endswith("s") and len(tag_name) == 3 and tag_name[:2].isdigit():
decade = tag_name
decade_counts[decade] = decade_counts.get(decade, 0) + playcount
elif tag_name.endswith("s") and len(tag_name) == 5 and tag_name[:4].isdigit():
decade = tag_name
decade_counts[decade] = decade_counts.get(decade, 0) + playcount
if not decade_counts:
await bot.api.send_text_message(
room.room_id, f"Could not determine decade preferences for {lastfm_user}."
)
return
sorted_decades = sorted(decade_counts.items(), key=lambda x: x[1], reverse=True)
total = sum(decade_counts.values())
summary = f"📅 {display_name} — Favorite Decades"
lines = []
for decade, count in sorted_decades[:8]:
pct = round(count / total * 100, 1) if total else 0
bar = "" * min(int(pct * 2), 20)
lines.append(f" • **{decade}** {bar} {pct}%")
body = "<br>".join(lines)
await bot.api.send_markdown_message(room.room_id, wrap_collapsible(summary, body))
# ---- !genres -----------------------------------------------------
async def cmd_genres(room, message, bot, args):
"""Handle !genres [user] - top genres/tags"""
matrix_user = str(message.sender)
lastfm_user, display_name = await resolve_username(matrix_user, args, bot, room)
if not lastfm_user:
return
data = await call_lastfm_api(
"user.getTopTags", {"user": lastfm_user, "limit": "15"}, bot, room
)
if not data:
return
tags = data.get("toptags", {}).get("tag", [])
if not tags:
await bot.api.send_text_message(room.room_id, f"🔍 No genre tags for {lastfm_user}.")
return
summary = f"🎶 {display_name} — Top Genres"
lines = []
for i, tag in enumerate(tags[:15], 1):
name = safe_text(tag, "name")
count = safe_int(tag, "count")
lines.append(f" {i}. **{name}** — *{count}×*")
body = "<br>".join(lines)
await bot.api.send_markdown_message(room.room_id, wrap_collapsible(summary, body))
# ---- !era --------------------------------------------------------
async def cmd_era(room, message, bot, args):
"""Handle !era <year>"""
if not args:
await bot.api.send_text_message(room.room_id, "Usage: !era <year>\nExample: !era 1994")
return
year = args[0].strip()
if not year.isdigit() or len(year) != 4:
await bot.api.send_text_message(room.room_id, "Please specify a valid 4-digit year.")
return
tag = f"{year}s" if year.endswith("0") else year
data = await call_lastfm_api(
"tag.getTopTracks", {"tag": tag, "limit": "10"}, bot, room
)
if not data:
data = await call_lastfm_api(
"tag.getTopTracks", {"tag": year, "limit": "10"}, bot, room
)
if not data:
return
tracks = data.get("tracks", {}).get("track", [])
if not tracks:
await bot.api.send_text_message(room.room_id, f"🔍 No tracks found for era **{year}**.")
return
summary = f"🕰️ Popular Tracks — {year}"
lines = []
for i, track in enumerate(tracks[:10], 1):
artist = safe_text(track, "artist")
name = safe_text(track, "name")
lines.append(f" {i}. **{name}** by {artist}")
body = "<br>".join(lines)
await bot.api.send_markdown_message(room.room_id, wrap_collapsible(summary, body))
# ---- !weekly -----------------------------------------------------
async def cmd_weekly(room, message, bot, args):
"""Handle !weekly [user]"""
matrix_user = str(message.sender)
lastfm_user, display_name = await resolve_username(matrix_user, args, bot, room)
if not lastfm_user:
return
data = await call_lastfm_api(
"user.getWeeklyTrackChart", {"user": lastfm_user}, bot, room
)
if not data:
return
tracks = data.get("weeklytrackchart", {}).get("track", [])
if not tracks:
await bot.api.send_text_message(room.room_id, f"📊 No weekly chart for {lastfm_user}.")
return
summary = f"📅 {display_name} — Weekly Report"
lines = []
total_plays = 0
for i, track in enumerate(tracks[:10], 1):
artist = safe_text(track, "artist")
name = safe_text(track, "name")
playcount = safe_int(track, "playcount")
total_plays += playcount
lines.append(f" {i}. **{name}** by {artist} — *{playcount} plays*")
header = f" • **Total unique tracks:** {len(tracks)} | **Total plays this week:** {total_plays}<br>"
body = header + "<br>".join(lines)
await bot.api.send_markdown_message(room.room_id, wrap_collapsible(summary, body))
# ---- !monthly ----------------------------------------------------
async def cmd_monthly(room, message, bot, args):
"""Handle !monthly [user] - last 30 days"""
matrix_user = str(message.sender)
lastfm_user, display_name = await resolve_username(matrix_user, args, bot, room)
if not lastfm_user:
return
to_ts = int(time.time())
from_ts = int((datetime.utcnow() - timedelta(days=30)).timestamp())
data = await call_lastfm_api(
"user.getRecentTracks",
{"user": lastfm_user, "from": str(from_ts), "to": str(to_ts), "limit": "200"},
bot, room,
)
if not data:
return
tracks = data.get("recenttracks", {}).get("track", [])
if not tracks:
await bot.api.send_text_message(room.room_id, f"📊 No tracks in the last 30 days for {lastfm_user}.")
return
track_counts = {}
artist_counts = {}
for track in tracks:
name = safe_text(track, "name")
artist = safe_text(track, "artist")
key = f"{name}|||{artist}"
track_counts[key] = track_counts.get(key, 0) + 1
artist_counts[artist] = artist_counts.get(artist, 0) + 1
total = len(tracks)
unique_tracks = len(track_counts)
unique_artists = len(artist_counts)
top_tracks = sorted(track_counts.items(), key=lambda x: x[1], reverse=True)[:10]
top_artists = sorted(artist_counts.items(), key=lambda x: x[1], reverse=True)[:5]
summary = f"📆 {display_name} — Monthly Report (Last 30 Days)"
lines = [
f" • **Total Scrobbles:** {total} | **Unique Tracks:** {unique_tracks} | **Unique Artists:** {unique_artists}",
"<br><strong>Top Tracks:</strong>",
]
for i, (key, count) in enumerate(top_tracks, 1):
name, artist = key.split("|||", 1)
lines.append(f" {i}. **{name}** by {artist} — *{count} plays*")
lines.append("<br><strong>Top Artists:</strong>")
for i, (artist, count) in enumerate(top_artists, 1):
lines.append(f" {i}. **{artist}** — *{count} plays*")
body = "<br>".join(lines)
await bot.api.send_markdown_message(room.room_id, wrap_collapsible(summary, body))
# ---- !yearly -----------------------------------------------------
async def cmd_yearly(room, message, bot, args):
"""Handle !yearly [user] [year]"""
matrix_user = str(message.sender)
year = None
user_arg = list(args)
if user_arg:
last = user_arg[-1]
if last.isdigit() and len(last) == 4:
year = int(last)
user_arg.pop()
lastfm_user, display_name = await resolve_username(matrix_user, user_arg, bot, room)
if not lastfm_user:
return
if year:
try:
from_ts = int(datetime(year, 1, 1).timestamp())
to_ts = int(datetime(year, 12, 31, 23, 59, 59).timestamp())
except ValueError:
await bot.api.send_text_message(room.room_id, "Invalid year.")
return
else:
to_ts = int(time.time())
from_ts = int((datetime.utcnow() - timedelta(days=365)).timestamp())
year = datetime.utcnow().year
data = await call_lastfm_api(
"user.getRecentTracks",
{"user": lastfm_user, "from": str(from_ts), "to": str(to_ts), "limit": "200"},
bot, room,
)
if not data:
return
tracks = data.get("recenttracks", {}).get("track", [])
if not tracks:
await bot.api.send_text_message(room.room_id, f"📊 No tracks in {year} for {lastfm_user}.")
return
track_counts = {}
artist_counts = {}
for track in tracks:
name = safe_text(track, "name")
artist = safe_text(track, "artist")
key = f"{name}|||{artist}"
track_counts[key] = track_counts.get(key, 0) + 1
artist_counts[artist] = artist_counts.get(artist, 0) + 1
top_tracks = sorted(track_counts.items(), key=lambda x: x[1], reverse=True)[:10]
top_artists = sorted(artist_counts.items(), key=lambda x: x[1], reverse=True)[:5]
summary = f"📆 {display_name} — Yearly Report ({year})"
lines = [
f" • **Total Scrobbles:** {len(tracks)} | **Unique Tracks:** {len(track_counts)} | **Unique Artists:** {len(artist_counts)}",
"<br><strong>Top Tracks:</strong>",
]
for i, (key, count) in enumerate(top_tracks, 1):
name, artist = key.split("|||", 1)
lines.append(f" {i}. **{name}** by {artist} — *{count} plays*")
lines.append("<br><strong>Top Artists:</strong>")
for i, (artist, count) in enumerate(top_artists, 1):
lines.append(f" {i}. **{artist}** — *{count} plays*")
body = "<br>".join(lines)
await bot.api.send_markdown_message(room.room_id, wrap_collapsible(summary, body))
# ---- !first ------------------------------------------------------
async def cmd_first(room, message, bot, args):
"""Handle !first <artist> [user]"""
matrix_user = str(message.sender)
if not args:
await bot.api.send_text_message(room.room_id, "Usage: !first <artist> [username]")
return
artist_parts = list(args)
potential_user = artist_parts[-1]
user_arg = []
if len(artist_parts) >= 2:
if " " not in potential_user:
user_arg = [potential_user]
artist_parts.pop()
artist_name = " ".join(artist_parts)
lastfm_user, display_name = await resolve_username(matrix_user, user_arg, bot, room)
if not lastfm_user:
return
data = await call_lastfm_api(
"user.getRecentTracks",
{"user": lastfm_user, "limit": "200", "from": "0"},
bot, room,
)
if not data:
return
tracks = data.get("recenttracks", {}).get("track", [])
if not tracks:
await bot.api.send_text_message(room.room_id, f"No scrobbles found for {lastfm_user}.")
return
matches = []
for track in tracks:
track_artist = safe_text(track, "artist")
if artist_name.lower() in track_artist.lower():
date_str = ""
if "date" in track and "#text" in track["date"]:
date_str = track["date"]["#text"]
matches.append((track, date_str))
if not matches:
await bot.api.send_text_message(
room.room_id,
f"🔍 No scrobbles of **{artist_name}** found for {display_name} (within recent history).",
)
return
oldest_track, oldest_date = matches[-1]
name = safe_text(oldest_track, "name")
track_artist = safe_text(oldest_track, "artist")
await bot.api.send_markdown_message(
room.room_id,
f"🔍 **{display_name}** first scrobbled **{artist_name}** with:\n"
f" • **{name}** by {track_artist}\n"
f" • 📅 {oldest_date if oldest_date else 'Unknown date'}",
)
# ---- !concerts ---------------------------------------------------
async def cmd_concerts(room, message, bot, args):
"""Handle !concerts [user] - upcoming concerts for top artists"""
matrix_user = str(message.sender)
lastfm_user, display_name = await resolve_username(matrix_user, args, bot, room)
if not lastfm_user:
return
top_data = await call_lastfm_api(
"user.getTopArtists", {"user": lastfm_user, "period": "3month", "limit": "10"}, bot, room
)
if not top_data:
return
artists = [safe_text(a, "name") for a in top_data.get("topartists", {}).get("artist", [])]
if not artists:
return
await bot.api.send_text_message(room.room_id, "🔍 Searching for upcoming concerts...")
all_events = []
for artist_name in artists[:5]:
ev_data = await call_lastfm_api(
"artist.getEvents", {"artist": artist_name, "limit": "3", "autocorrect": "1"}, bot
)
if ev_data:
for ev in ev_data.get("events", {}).get("event", [])[:3]:
title = safe_text(ev, "title")
venue_name = safe_text(ev.get("venue", {}), "name", "Unknown Venue")
city = safe_text(ev.get("venue", {}).get("location", {}), "city", "")
country = safe_text(ev.get("venue", {}).get("location", {}), "country", "")
start_date = safe_text(ev, "startDate", "TBD")
location = f"{city}, {country}" if city else country
all_events.append((title, artist_name, venue_name, location, start_date))
if not all_events:
await bot.api.send_text_message(
room.room_id, f"🎫 No upcoming concerts found for {display_name}'s top artists."
)
return
summary = f"🎫 Upcoming Concerts for {display_name}'s Top Artists ({len(all_events)} found)"
lines = []
for title, artist, venue, location, date in all_events[:15]:
lines.append(f" • **{artist}** — {title}")
lines.append(f" 📍 {venue}, {location} | 📅 {date}")
body = "<br>".join(lines)
await bot.api.send_markdown_message(room.room_id, wrap_collapsible(summary, body))
# ---- !radio ------------------------------------------------------
async def cmd_radio(room, message, bot, args):
"""Handle !radio <artist> - generate playlist from similar artists"""
if not args:
await bot.api.send_text_message(room.room_id, "Usage: !radio <artist>")
return
artist_name = " ".join(args)
sim_data = await call_lastfm_api(
"artist.getSimilar", {"artist": artist_name, "limit": "10", "autocorrect": "1"}, bot, room
)
if not sim_data:
return
similar = sim_data.get("similarartists", {}).get("artist", [])
if not similar:
await bot.api.send_text_message(room.room_id, f"No similar artists for **{artist_name}**.")
return
playlist = []
for sim in similar[:8]:
sim_name = safe_text(sim, "name")
top_data = await call_lastfm_api(
"artist.getTopTracks", {"artist": sim_name, "limit": "1", "autocorrect": "1"}, bot
)
if top_data:
tracks = top_data.get("toptracks", {}).get("track", [])
if tracks:
track = tracks[0] if isinstance(tracks, list) else tracks
tname = safe_text(track, "name")
playlist.append((sim_name, tname))
if not playlist:
await bot.api.send_text_message(room.room_id, "Could not generate playlist.")
return
summary = f"📻 Radio: {artist_name} — Similar Artists Playlist ({len(playlist)} tracks)"
lines = []
for i, (art, track) in enumerate(playlist, 1):
yt = await get_youtube_link(art, track)
yt_str = f" | [▶️ YouTube]({yt})" if yt else ""
lines.append(f" {i}. **{track}** by {art}{yt_str}")
body = "<br>".join(lines)
await bot.api.send_markdown_message(room.room_id, wrap_collapsible(summary, body))
# ---- !mashup -----------------------------------------------------
async def cmd_mashup(room, message, bot, args):
"""Handle !mashup <artist1> <artist2> - find musical connections"""
if len(args) < 2:
await bot.api.send_text_message(
room.room_id, "Usage: !mashup <artist1> <artist2>"
)
return
full = " ".join(args)
if "," in full:
parts = full.split(",", 1)
artist1, artist2 = parts[0].strip(), parts[1].strip()
else:
mid = len(args) // 2
artist1 = " ".join(args[:mid])
artist2 = " ".join(args[mid:])
data1 = await call_lastfm_api(
"artist.getSimilar", {"artist": artist1, "limit": "20", "autocorrect": "1"}, bot, room
)
data2 = await call_lastfm_api(
"artist.getSimilar", {"artist": artist2, "limit": "20", "autocorrect": "1"}, bot, room
)
if not data1 or not data2:
return
sim1 = {safe_text(a, "name").lower(): float(a.get("match", 0))
for a in data1.get("similarartists", {}).get("artist", [])}
sim2 = {safe_text(a, "name").lower(): float(a.get("match", 0))
for a in data2.get("similarartists", {}).get("artist", [])}
common = set(sim1.keys()) & set(sim2.keys())
summary = f"🔀 Mashup: {artist1}{artist2}"
lines = []
if common:
shared = sorted(common, key=lambda a: sim1[a] + sim2[a], reverse=True)[:10]
lines.append(f" • **Shared similar artists:** {len(common)}")
lines.append(" • **Top connections:**")
for a in shared:
avg = round((sim1[a] + sim2[a]) / 2 * 100)
lines.append(f" - **{a}** ({avg}% avg match)")
else:
lines.append(" • No direct musical connections found between these artists.")
tags1_data = await call_lastfm_api(
"artist.getTopTags", {"artist": artist1, "autocorrect": "1"}, bot
)
tags2_data = await call_lastfm_api(
"artist.getTopTags", {"artist": artist2, "autocorrect": "1"}, bot
)
tags1 = set()
tags2 = set()
if tags1_data:
tags1 = {safe_text(t, "name").lower() for t in tags1_data.get("toptags", {}).get("tag", [])}
if tags2_data:
tags2 = {safe_text(t, "name").lower() for t in tags2_data.get("toptags", {}).get("tag", [])}
common_tags = tags1 & tags2
if common_tags:
lines.append(f" • **Shared genres:** {', '.join(sorted(common_tags)[:8])}")
body = "<br>".join(lines)
await bot.api.send_markdown_message(room.room_id, wrap_collapsible(summary, body))
# ===================================================================
# !collage new ImageMagick-based implementation
# ===================================================================
# ------------------------------------------------------------
# !collage command (using ImageMagick)
# ------------------------------------------------------------
async def cmd_collage(room, message, bot, args):
"""Handle !collage [user] [size] create album art collage via ImageMagick."""
matrix_user = str(message.sender)
size = 3
user_arg = list(args)
if user_arg and user_arg[-1].isdigit():
size = max(2, min(5, int(user_arg[-1])))
user_arg.pop()
lastfm_user, display_name = await resolve_username(matrix_user, user_arg, bot, room)
if not lastfm_user:
return
# 1. get top albums
data = await call_lastfm_api(
"user.getTopAlbums",
{"user": lastfm_user, "period": "overall", "limit": str(size * size)},
bot, room,
)
if not data:
return
albums = data.get("topalbums", {}).get("album", [])
if not albums:
await bot.api.send_text_message(room.room_id, f"No albums for {lastfm_user}.")
return
# 2. download all covers concurrently
timeout = aiohttp.ClientTimeout(total=60)
async with aiohttp.ClientSession(timeout=timeout, headers=HEADERS) as session:
tasks = [download_album_art_to_file(session, alb) for alb in albums[:size * size]]
results = await asyncio.gather(*tasks)
# results = list of (artist, album_name, filepath)
downloaded = [r for r in results if r[2] is not None]
if not downloaded:
await bot.api.send_text_message(room.room_id, "Could not download any album art.")
return
# 3. create a white placeholder tile for missing images
placeholder_path = os.path.join(tempfile.gettempdir(), "lastfm_placeholder.png")
subprocess.run(["convert", "-size", "200x200", "xc:white", placeholder_path], check=True)
# Build ordered list of files (placeholder where missing)
file_list = []
for _, _, path in results:
file_list.append(path if path else placeholder_path)
while len(file_list) < size * size:
file_list.append(placeholder_path)
# 4. Use ImageMagick montage to stitch the grid
collage_path = os.path.join(tempfile.gettempdir(), f"lastfm_collage_{lastfm_user}_{int(time.time())}.png")
cmd = ["montage", "-geometry", "200x200+2+2", "-tile", f"{size}x{size}"] + file_list + [collage_path]
try:
subprocess.run(cmd, check=True, timeout=30)
except subprocess.CalledProcessError as e:
logging.error(f"montage failed: {e}")
# fallback: send the first downloaded image
if downloaded:
collage_path = downloaded[0][2]
else:
await bot.api.send_text_message(room.room_id, "Failed to create collage.")
return
# 5. send the image
await bot.api.send_image_message(room_id=room.room_id, image_filepath=collage_path)
# 6. collapsible text details
summary = f"🖼️ {display_name} — Album Collage ({size}×{size})"
lines = [f"Top {size*size} albums for {display_name}"]
for i, album in enumerate(albums[:size * size], 1):
artist = album_artist_name(album)
name = safe_text(album, "name")
playcount = safe_int(album, "playcount")
lines.append(f" {i}. **{name}** by {artist} — *{playcount} plays*")
body = "<br>".join(lines)
await bot.api.send_markdown_message(room.room_id, wrap_collapsible(summary, body))
# Cleanup temp files
for _, _, path in downloaded:
if path and os.path.exists(path):
os.remove(path)
if os.path.exists(placeholder_path):
os.remove(placeholder_path)
if os.path.exists(collage_path):
os.remove(collage_path)
# ---- !listening --------------------------------------------------
async def cmd_listening(room, message, bot, args):
"""Handle !listening [user] - what's playing with album art"""
matrix_user = str(message.sender)
lastfm_user, display_name = await resolve_username(matrix_user, args, bot, room)
if not lastfm_user:
return
data = await call_lastfm_api(
"user.getRecentTracks", {"user": lastfm_user, "limit": "1"}, bot, room
)
if not data:
return
tracks = data.get("recenttracks", {}).get("track", [])
if not tracks:
await bot.api.send_text_message(room.room_id, f"No recent tracks for {lastfm_user}.")
return
track = tracks[0] if isinstance(tracks, list) else tracks
now_playing = track.get("@attr", {}).get("nowplaying", "false") == "true"
artist = safe_text(track, "artist")
name = safe_text(track, "name")
album = safe_text(track, "album", "")
image_url = ""
for img in track.get("image", []):
if img.get("size") == "extralarge":
image_url = img.get("#text", "")
break
if not image_url:
for img in track.get("image", []):
image_url = img.get("#text", "")
if image_url:
break
action = "is listening to" if now_playing else "last listened to"
summary = f"🎧 {display_name} {action}: {name} by {artist}"
lines = []
if image_url:
lines.append(f" ![Album Art]({image_url})")
lines.append(f" **{name}** by **{artist}**")
if album:
lines.append(f" Album: *{album}*")
yt = await get_youtube_link(artist, name)
if yt:
lines.append(f" [▶️ YouTube]({yt})")
body = "<br>".join(lines)
await bot.api.send_markdown_message(room.room_id, wrap_collapsible(summary, body))
# ---- !awards -----------------------------------------------------
async def cmd_awards(room, message, bot, args):
"""Handle !awards [user] - milestone achievements"""
matrix_user = str(message.sender)
lastfm_user, display_name = await resolve_username(matrix_user, args, bot, room)
if not lastfm_user:
return
info_data = await call_lastfm_api("user.getInfo", {"user": lastfm_user}, bot, room)
if not info_data:
return
user_info = info_data.get("user", {})
playcount = safe_int(user_info, "playcount")
artist_count = safe_int(user_info, "artist_count", 0)
registered = user_info.get("registered", {}).get("#text", "Unknown")
achievements = []
milestones = [
(100, "🎧 Newcomer"),
(1000, "🎶 Listener"),
(5000, "🎵 Collector"),
(10000, "💿 Music Fanatic"),
(25000, "🎸 Audiophile"),
(50000, "🎹 Music Scholar"),
(100000, "🏆 Scrobble Master"),
(250000, "👑 Scrobble King/Queen"),
(500000, "🌟 Scrobble Legend"),
(1000000, "🌌 Scrobble Galaxy"),
]
for threshold, title in milestones:
if playcount >= threshold:
achievements.append(f"{title}{threshold:,}+ scrobbles")
if artist_count >= 10:
achievements.append(f" • 🌿 Explorer — 10+ artists")
if artist_count >= 50:
achievements.append(f" • 🌳 Curator — 50+ artists")
if artist_count >= 100:
achievements.append(f" • 🌍 Globetrotter — 100+ artists")
if artist_count >= 500:
achievements.append(f" • 🌌 Universe Explorer — 500+ artists")
if artist_count >= 1000:
achievements.append(f" • 🚀 Cosmopolitan — 1,000+ artists")
try:
reg_date = datetime.strptime(registered, "%d %b %Y")
years = (datetime.utcnow() - reg_date).days // 365
if years >= 1:
achievements.append(f" • 📅 Veteran — {years} year{'s' if years > 1 else ''} on Last.fm")
if years >= 5:
achievements.append(f" • 🏅 Loyalist — 5+ years on Last.fm")
if years >= 10:
achievements.append(f" • 🎖️ Decade Club — 10+ years on Last.fm")
except (ValueError, TypeError):
pass
if user_info.get("subscriber", "0") == "1":
achievements.append(f" • ⭐ Subscriber — Supporting Last.fm")
if not achievements:
achievements.append(" • 🆕 Keep scrobbling to earn achievements!")
summary = f"🏆 {display_name} — Achievements"
header = f" • Total Scrobbles: **{playcount:,}** | Artists: **{artist_count:,}**<br>"
body = header + "<br>".join(achievements)
await bot.api.send_markdown_message(room.room_id, wrap_collapsible(summary, body))
# ---- !lastfm -----------------------------------------------------
async def cmd_lastfm_help(room, message, bot, args):
"""Handle !lastfm - show help for all Last.fm plugin commands."""
help_text = """
<details><summary><strong>🎵 Last.fm Plugin Commands</strong></summary>
<p>
<strong>Registration & Now Playing</strong><br>
• <code>!register &lt;username&gt;</code> - Register your Last.fm username<br>
• <code>!np [user]</code> - Show currently playing track (no collapsible)<br>
<br>
<strong>Recent & Loved</strong><br>
• <code>!recent [user] [limit]</code> - Recent tracks (default 10, max 50)<br>
• <code>!loved [user]</code> - Recently loved tracks<br>
<br>
<strong>Top Lists (period: overall/7day/1month/3month/6month/12month)</strong><br>
• <code>!toptracks [user] [period]</code> - Top tracks<br>
• <code>!topartists [user] [period]</code> - Top artists<br>
• <code>!topalbums [user] [period]</code> - Top albums<br>
<br>
<strong>Profile & Stats</strong><br>
• <code>!profile [user]</code> - Detailed profile<br>
• <code>!playcount [user]</code> - Total scrobbles (short output)<br>
• <code>!scrobbles [user]</code> - Detailed scrobbling statistics<br>
<br>
<strong>Social & Comparison</strong><br>
• <code>!compare &lt;user1&gt; &lt;user2&gt;</code> - Compare two users' musical tastes<br>
• <code>!taste [user]</code> - Top artists with taste-o-meter<br>
• <code>!friends [user]</code> - Last.fm friends<br>
<br>
<strong>Discovery</strong><br>
• <code>!recommend [user]</code> - Artist recommendations<br>
• <code>!similar &lt;artist&gt;</code> - Find similar artists<br>
• <code>!tag &lt;tag&gt;</code> - Top artists for a tag/genre<br>
• <code>!radio &lt;artist&gt;</code> - Generate a playlist from similar artists<br>
• <code>!mashup &lt;artist1&gt; &lt;artist2&gt;</code> - Find musical connections<br>
<br>
<strong>Charts & Tags</strong><br>
• <code>!charts</code> - Global top tracks<br>
• <code>!tagcloud [user]</code> - Top genre tags<br>
• <code>!genres [user]</code> - Top genres/tags<br>
<br>
<strong>Timebased Analysis</strong><br>
• <code>!decades [user]</code> - Favorite decades<br>
• <code>!era &lt;year&gt;</code> - Popular tracks from a year<br>
• <code>!weekly [user]</code> - Weekly listening report<br>
• <code>!monthly [user]</code> - Monthly listening report<br>
• <code>!yearly [user] [year]</code> - Yearly listening report<br>
<br>
<strong>Specialized</strong><br>
• <code>!first &lt;artist&gt; [user]</code> - First scrobble of an artist<br>
• <code>!concerts [user]</code> - Upcoming concerts<br>
• <code>!collage [user] [size]</code> - Album art collage (image)<br>
• <code>!listening [user]</code> - Now listening with album art<br>
• <code>!awards [user]</code> - Milestone achievements<br>
<br>
<strong>Roomwide</strong><br>
• <code>!now</code> - Show what registered users are playing<br>
</p>
</details>
"""
await bot.api.send_markdown_message(room.room_id, help_text)
# ===================================================================
# MAIN DISPATCH
# ===================================================================
async def handle_command(room, message, bot, prefix, config):
"""
Main command dispatcher for the Last.fm plugin.
Preserves all existing functionality and adds comprehensive new commands.
"""
match = botlib.MessageMatch(room, message, bot, prefix)
# Initialize database on first run
await init_db()
if not (match.is_not_from_this_bot() and match.prefix()):
return
command = match.command()
args = match.args()
# Command routing table
command_map = {
"register": cmd_register,
"np": cmd_np,
"recent": cmd_recent,
"toptracks": cmd_toptracks,
"topartists": cmd_topartists,
"topalbums": cmd_topalbums,
"loved": cmd_loved,
"profile": cmd_profile,
"playcount": cmd_playcount,
"scrobbles": cmd_scrobbles,
"compare": cmd_compare,
"taste": cmd_taste,
"friends": cmd_friends,
"recommend": cmd_recommend,
"similar": cmd_similar,
"tag": cmd_tag,
"charts": cmd_charts,
"tagcloud": cmd_tagcloud,
"now": cmd_now,
"decades": cmd_decades,
"genres": cmd_genres,
"era": cmd_era,
"weekly": cmd_weekly,
"monthly": cmd_monthly,
"yearly": cmd_yearly,
"first": cmd_first,
"concerts": cmd_concerts,
"radio": cmd_radio,
"mashup": cmd_mashup,
"collage": cmd_collage,
"listening": cmd_listening,
"awards": cmd_awards,
"lastfm": cmd_lastfm_help,
}
handler = command_map.get(command)
if handler:
try:
await handler(room, message, bot, args)
except Exception as e:
logging.error(f"Error in Last.fm command '{command}': {e}")
await bot.api.send_text_message(
room.room_id, f"❌ Error processing !{command}: {str(e)}"
)