""" This plugin provides Last.fm integration for the bot. It allows users to register their Last.fm username and display their currently playing track. """ import logging import os import sqlite3 import simplematrixbotlib as botlib import aiosqlite import json import aiohttp # Database file path DB_PATH = "lastfm.db" async def init_db(): """Initialize the database with the required tables.""" async with aiosqlite.connect(DB_PATH) as db: await db.execute(""" CREATE TABLE IF NOT EXISTS user_lastfm ( matrix_user TEXT PRIMARY KEY, lastfm_user TEXT NOT NULL ) """) await db.commit() async def get_lastfm_username(matrix_user): """Get Last.fm username for a Matrix user.""" async with aiosqlite.connect(DB_PATH) as db: async with db.execute("SELECT lastfm_user FROM user_lastfm WHERE matrix_user = ?", (matrix_user,)) as cursor: row = await cursor.fetchone() return row[0] if row else None async def set_lastfm_username(matrix_user, lastfm_user): """Associate a Last.fm username with a Matrix user.""" async with aiosqlite.connect(DB_PATH) as db: # Check if user already exists async with db.execute("SELECT lastfm_user FROM user_lastfm WHERE matrix_user = ?", (matrix_user,)) as cursor: row = await cursor.fetchone() if row: # Update existing record await db.execute("UPDATE user_lastfm SET lastfm_user = ? WHERE matrix_user = ?", (lastfm_user, matrix_user)) else: # Insert new record await db.execute("INSERT INTO user_lastfm (matrix_user, lastfm_user) VALUES (?, ?)", (matrix_user, lastfm_user)) await db.commit() async def get_recent_track(lastfm_user): """ Get the recent tracks for a Last.fm user. Args: lastfm_user (str): Last.fm username Returns: dict: Track information or None if error """ api_key = os.getenv("LASTFM_API_KEY") if not api_key: logging.error("LASTFM_API_KEY not found in environment variables") return None url = "http://ws.audioscrobbler.com/2.0/" params = { "method": "user.getrecenttracks", "user": lastfm_user, "api_key": api_key, "format": "json", "limit": "1" } try: async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as response: if response.status == 200: data = await response.json() return data else: logging.error(f"Last.fm API returned status {response.status}") return None except Exception as e: logging.error(f"Error querying Last.fm API: {e}") return None async def get_youtube_link(artist, track_name): """ Search for a YouTube link for the given artist and track using YouTube Data API. Args: artist (str): Artist name track_name (str): Track name Returns: str: YouTube video URL or None if not found """ youtube_api_key = os.getenv("YOUTUBE_API_KEY") if not youtube_api_key: return None search_query = f"{artist} {track_name}" url = "https://www.googleapis.com/youtube/v3/search" params = { "part": "snippet", "q": search_query, "type": "video", "key": youtube_api_key, "maxResults": "1" } try: async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as response: if response.status == 200: data = await response.json() items = data.get("items", []) if items: video_id = items[0].get("id", {}).get("videoId") if video_id: return f"https://www.youtube.com/watch?v={video_id}" return None except Exception as e: logging.error(f"Error searching YouTube: {e}") return None async def format_recent_track(matrix_user, lastfm_user, data): """ Format recent track information into a readable message. Args: matrix_user (str): The Matrix user ID lastfm_user (str): The Last.fm username data (dict): Data from Last.fm API Returns: str: Formatted message """ if not data or "error" in data: return f"🔍 No recent track data found for {lastfm_user}." # Check if there's a currently playing track tracks = data.get("recenttracks", {}).get("track", []) if not tracks: return f"🔍 No recent tracks found for {lastfm_user}." # Get the most recent track (first in the list) track = tracks[0] if tracks else {} # Check if it's currently playing now_playing = track.get("@attr", {}).get("nowplaying", "false") == "true" # Extract track information artist = track.get("artist", {}).get("#text", "Unknown Artist") name = track.get("name", "Unknown Track") album = track.get("album", {}).get("#text", "") # Create one-line message with bold track name if now_playing: if album: message = f"🎵 {matrix_user} is currently playing: **{name}** by {artist} from the album {album}" else: message = f"🎵 {matrix_user} is currently playing: **{name}** by {artist}" elif tracks: if album: message = f"🎵 {matrix_user} last played: **{name}** by {artist} from the album {album}" else: message = f"🎵 {matrix_user} last played: **{name}** by {artist}" else: message = f"🔍 No recent tracks found for {lastfm_user}." # Try to get YouTube link youtube_link = await get_youtube_link(artist, name) if youtube_link: message += f" | [YouTube Link]({youtube_link})" return message async def handle_command(room, message, bot, prefix, config): """ Function to handle the !register and !np commands. Args: room (Room): The Matrix room where the command was invoked. message (RoomMessage): The message object containing the command. bot (Bot): The bot object. prefix (str): The command prefix. config (dict): Configuration parameters. Returns: None """ match = botlib.MessageMatch(room, message, bot, prefix) # Initialize database on first run await init_db() if match.is_not_from_this_bot() and match.prefix() and match.command("register"): args = match.args() if len(args) < 1: await bot.api.send_text_message( room.room_id, "Usage: !register " ) return lastfm_user = args[0].strip() matrix_user = str(message.sender) # Register the Last.fm username await set_lastfm_username(matrix_user, lastfm_user) # Confirm registration await bot.api.send_text_message( room.room_id, f"✅ Registered Last.fm user {lastfm_user} for {matrix_user}" ) logging.info(f"Registered Last.fm user {lastfm_user} for {matrix_user}") elif match.is_not_from_this_bot() and match.prefix() and match.command("np"): # Get currently playing track matrix_user = str(message.sender) lastfm_user = await get_lastfm_username(matrix_user) if not lastfm_user: await bot.api.send_text_message( room.room_id, f"Please register your Last.fm username first with !register \n" f"Example: !register your_lastfm_username" ) return # Get recent track data track_data = await get_recent_track(lastfm_user) # Format and send results result_message = await format_recent_track(matrix_user, lastfm_user, track_data) await bot.api.send_markdown_message(room.room_id, result_message) logging.info(f"Sent Last.fm now playing information for {lastfm_user}")