#!/usr/bin/env python3 """ Fully Featured OMDb Plugin for Funguy Bot Provides complete OMDb API functionality: !imdb - Get movie/series details !imdb id <imdb_id> - Lookup by IMDb ID (e.g. tt0133093) !imdb search <query> - Search for titles (returns list) !imdb episode <series> -s N -e N - Get episode details !imdb help - Show this help Optional flags (works with title and id): -y <year> - Year of release -t <movie|series|episode> - Type filter --short-plot - Show short plot (default is full) Requires: OMDB_API_KEY in environment variables. """ import logging import os import aiohttp import asyncio import tempfile import argparse import shlex from urllib.parse import quote_plus # ---------------------------------------------------------------------- # Plugin setup (called by FunguyBot after bot instance is created) # ---------------------------------------------------------------------- def setup(bot): """Called by the bot after the bot instance is created.""" api_key = os.getenv("OMDB_API_KEY") if not api_key: logging.warning("OMDB_API_KEY not set in environment. !imdb will not work.") else: logging.info("OMDb plugin loaded. API key present.") # ---------------------------------------------------------------------- # Helper functions # ---------------------------------------------------------------------- async def fetch_omdb(session, params): """Perform async GET request to OMDb API, return JSON.""" api_key = os.getenv("OMDB_API_KEY") if not api_key: return {"Error": "OMDB_API_KEY missing", "Response": "False"} params["apikey"] = api_key url = "https://www.omdbapi.com/" try: async with session.get(url, params=params, timeout=15) as resp: if resp.status == 200: return await resp.json() else: return {"Error": f"HTTP {resp.status}", "Response": "False"} except asyncio.TimeoutError: return {"Error": "Request timed out", "Response": "False"} except Exception as e: return {"Error": str(e), "Response": "False"} async def download_poster(session, poster_url): """Download poster to a temp file, return path or None.""" if not poster_url or poster_url == "N/A": return None try: async with session.get(poster_url, timeout=10) as resp: if resp.status == 200: with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: tmp.write(await resp.read()) return tmp.name else: logging.debug(f"Poster download failed: HTTP {resp.status}") return None except Exception as e: logging.debug(f"Poster download error: {e}") return None def format_movie_html(data, poster_path=None): """Format OMDb movie/series data into HTML for Matrix.""" if not data or data.get("Response") == "False": error = data.get("Error", "No information found.") return f"<em>❌ {error}</em>" title = data.get("Title", "Unknown") year = data.get("Year", "N/A") rated = data.get("Rated", "N/A") runtime = data.get("Runtime", "N/A") genre = data.get("Genre", "N/A") director = data.get("Director", "N/A") writer = data.get("Writer", "N/A") actors = data.get("Actors", "N/A") plot = data.get("Plot", "N/A") language = data.get("Language", "N/A") country = data.get("Country", "N/A") awards = data.get("Awards", "N/A") poster_url = data.get("Poster", "") imdb_rating = data.get("imdbRating", "N/A") imdb_votes = data.get("imdbVotes", "N/A") imdb_id = data.get("imdbID", "") type_ = data.get("Type", "movie") box_office = data.get("BoxOffice", "N/A") production = data.get("Production", "N/A") website = data.get("Website", "N/A") # Rotten Tomatoes ratings ratings = data.get("Ratings", []) rt_rating = next((r["Value"] for r in ratings if r["Source"] == "Rotten Tomatoes"), "N/A") mc_rating = next((r["Value"] for r in ratings if r["Source"] == "Metacritic"), "N/A") # Build HTML imdb_link = f"https://www.imdb.com/title/{imdb_id}/" if imdb_id and imdb_id != "N/A" else "" title_line = f"<strong>đŸŽŦ {title}</strong> ({year})" if imdb_link: title_line += f" – <a href='{imdb_link}'>IMDb</a>" lines = [title_line] lines.append(f"⭐ <strong>IMDb:</strong> {imdb_rating}/10 ({imdb_votes} votes) | 🍅 <strong>Rotten:</strong> {rt_rating} | 📊 <strong>Metacritic:</strong> {mc_rating}") lines.append(f"📅 <strong>Rated:</strong> {rated} | ⏱ <strong>Runtime:</strong> {runtime} | 🎭 <strong>Genre:</strong> {genre} | đŸŽŦ <strong>Type:</strong> {type_.capitalize()}") if director != "N/A": lines.append(f"đŸŽŦ <strong>Director:</strong> {director}") if writer != "N/A": lines.append(f"âœī¸ <strong>Writer:</strong> {writer}") if actors != "N/A": lines.append(f"🎭 <strong>Actors:</strong> {actors}") if plot != "N/A": lines.append(f"📝 <strong>Plot:</strong> {plot}") extra = [] if language != "N/A": extra.append(f"đŸ—Ŗ {language}") if country != "N/A": extra.append(f"🌍 {country}") if awards != "N/A" and awards.lower() != "n/a": extra.append(f"🏆 {awards}") if box_office != "N/A": extra.append(f"💰 {box_office}") if production != "N/A": extra.append(f"đŸĸ {production}") if website != "N/A" and website.lower() != "n/a": extra.append(f"🌐 <a href='{website}'>Website</a>") if extra: lines.append("<strong>â„šī¸ More:</strong> " + " | ".join(extra)) if poster_path: lines.append("<em>📸 A poster image has been sent separately.</em>") elif poster_url and poster_url != "N/A": lines.append(f"📸 <a href='{poster_url}'>View Poster</a>") html = "<br>".join(lines) return f"<details><summary><strong>đŸŽŦ OMDb: {title} ({year})</strong></summary>{html}</details>" def format_search_results(results): """Format search results (list of movies) into HTML.""" if not results or results.get("Response") == "False": error = results.get("Error", "No results found.") return f"<em>❌ {error}</em>" search_list = results.get("Search", []) if not search_list: return "<em>❌ No matching titles found.</em>" lines = ["<strong>🔍 Search Results:</strong><br><ul>"] for item in search_list[:10]: # OMDb returns up to 10 per page title = item.get("Title", "Unknown") year = item.get("Year", "N/A") imdb_id = item.get("imdbID", "") type_ = item.get("Type", "movie") link = f"https://www.imdb.com/title/{imdb_id}/" if imdb_id else "" if link: lines.append(f"<li><strong>{title}</strong> ({year}) – {type_.capitalize()} – <a href='{link}'>IMDb</a></li>") else: lines.append(f"<li><strong>{title}</strong> ({year}) – {type_.capitalize()}</li>") lines.append("</ul>") lines.append("<em>Use `!imdb <title>` or `!imdb id <tt...>` to get full details.</em>") return "<br>".join(lines) def format_episode_html(data, poster_path=None): """Format episode data specially.""" if not data or data.get("Response") == "False": error = data.get("Error", "Episode not found.") return f"<em>❌ {error}</em>" title = data.get("Title", "Unknown Episode") series = data.get("SeriesTitle", data.get("Title", "")) # Some APIs return SeriesTitle season = data.get("Season", "?") episode = data.get("Episode", "?") year = data.get("Year", "N/A") plot = data.get("Plot", "N/A") imdb_rating = data.get("imdbRating", "N/A") imdb_id = data.get("imdbID", "") imdb_link = f"https://www.imdb.com/title/{imdb_id}/" if imdb_id else "" header = f"<strong>đŸ“ē {series} – S{season}E{episode}: {title}</strong> ({year})" if imdb_link: header += f" – <a href='{imdb_link}'>IMDb</a>" lines = [header] lines.append(f"⭐ <strong>IMDb Rating:</strong> {imdb_rating}/10") if plot != "N/A": lines.append(f"📝 <strong>Plot:</strong> {plot}") if poster_path: lines.append("<em>📸 Episode poster attached.</em>") html = "<br>".join(lines) return f"<details><summary><strong>đŸ“ē Episode Info</strong></summary>{html}</details>" # ---------------------------------------------------------------------- # Command parsing and dispatching # ---------------------------------------------------------------------- async def handle_command(room, message, bot, prefix, config): """Main entry point for !imdb commands.""" import simplematrixbotlib as botlib match = botlib.MessageMatch(room, message, bot, prefix) if not (match.is_not_from_this_bot() and match.prefix() and match.command("imdb")): return # Full raw command text (including arguments) raw_args = message.body.split()[1:] # skip "!imdb" if not raw_args: await send_help(room, bot) return # First argument determines subcommand subcmd = raw_args[0].lower() if subcmd == "help": await send_help(room, bot) return elif subcmd == "search": # !imdb search <query> if len(raw_args) < 2: await bot.api.send_text_message(room.room_id, "Usage: !imdb search <movie title>") return query = " ".join(raw_args[1:]).strip() await handle_search(room, bot, query) elif subcmd == "id": # !imdb id <tt1234567> if len(raw_args) < 2: await bot.api.send_text_message(room.room_id, "Usage: !imdb id <IMDb ID> (e.g. tt0133093)") return imdb_id = raw_args[1].strip() await handle_lookup_by_id(room, bot, imdb_id) elif subcmd == "episode": # !imdb episode <series> -s <season> -e <episode> [--short-plot] await handle_episode(room, bot, raw_args[1:]) else: # Assume it's a title lookup with optional flags # Reconstruct command line: "!imdb <title> -y 1999 -t movie --short-plot" # We'll parse using argparse on the whole raw_args (which is everything after !imdb) await handle_title_lookup(room, bot, raw_args) # ---------------------------------------------------------------------- # Subcommand implementations # ---------------------------------------------------------------------- async def send_help(room, bot): help_text = """ <details><summary><strong>đŸŽŦ OMDb Plugin – Complete Commands</strong></summary> <p> <strong>!imdb <title></strong><br> Get full details for a movie or series.<br> Flags: <code>-y YEAR</code> <code>-t movie|series|episode</code> <code>--short-plot</code><br> Example: <code>!imdb "The Matrix" -y 1999 -t movie</code><br><br> <strong>!imdb id <tt1234567></strong><br> Lookup by IMDb ID.<br> Example: <code>!imdb id tt0133093</code><br><br> <strong>!imdb search <query></strong><br> Returns a list of matching titles (up to 10).<br> Example: <code>!imdb search Lord of the Rings</code><br><br> <strong>!imdb episode <series> -s N -e N</strong><br> Get details for a specific episode.<br> Example: <code>!imdb episode "Breaking Bad" -s 1 -e 1</code><br><br> <strong>!imdb help</strong><br> Show this help.<br><br> <em>Requires OMDB_API_KEY environment variable.</em> </p> </details> """ await bot.api.send_markdown_message(room.room_id, help_text) async def handle_search(room, bot, query): """Perform a search (s=) and return formatted list.""" params = {"s": query} async with aiohttp.ClientSession() as session: data = await fetch_omdb(session, params) if data.get("Response") == "False": await bot.api.send_text_message(room.room_id, f"❌ {data.get('Error', 'Search failed')}") return html = format_search_results(data) await bot.api.send_markdown_message(room.room_id, html) async def handle_lookup_by_id(room, bot, imdb_id): """Lookup by IMDb ID (i=).""" params = {"i": imdb_id, "plot": "full"} async with aiohttp.ClientSession() as session: data = await fetch_omdb(session, params) if data.get("Response") == "False": await bot.api.send_text_message(room.room_id, f"❌ {data.get('Error', 'ID not found')}") return poster_path = await download_poster(session, data.get("Poster", "")) html = format_movie_html(data, poster_path) await bot.api.send_markdown_message(room.room_id, html) if poster_path: try: await bot.api.send_image_message(room_id=room.room_id, image_filepath=poster_path) finally: os.unlink(poster_path) async def handle_title_lookup(room, bot, args): """Parse arguments for title lookup (supports -y, -t, --short-plot).""" parser = argparse.ArgumentParser(prog="!imdb", add_help=False) parser.add_argument("title", nargs="+", help="Movie or series title") parser.add_argument("-y", "--year", type=str, help="Year of release") parser.add_argument("-t", "--type", choices=["movie", "series", "episode"], help="Type filter") parser.add_argument("--short-plot", action="store_true", help="Use short plot (default is full)") try: # argparse expects a list of strings; we already have args as list parsed = parser.parse_args(args) title = " ".join(parsed.title) params = {"t": title, "plot": "short" if parsed.short_plot else "full"} if parsed.year: params["y"] = parsed.year if parsed.type: params["type"] = parsed.type except SystemExit: # argparse would exit on error; we catch and show usage await bot.api.send_markdown_message(room.room_id, "Usage: `!imdb <title> [-y YEAR] [-t movie|series|episode] [--short-plot]`\n" "Example: `!imdb 'The Matrix' -y 1999 -t movie`") return except Exception as e: await bot.api.send_text_message(room.room_id, f"Error parsing arguments: {e}") return async with aiohttp.ClientSession() as session: data = await fetch_omdb(session, params) if data.get("Response") == "False": await bot.api.send_text_message(room.room_id, f"❌ {data.get('Error', 'Not found')}") return poster_path = await download_poster(session, data.get("Poster", "")) html = format_movie_html(data, poster_path) await bot.api.send_markdown_message(room.room_id, html) if poster_path: try: await bot.api.send_image_message(room_id=room.room_id, image_filepath=poster_path) finally: os.unlink(poster_path) async def handle_episode(room, bot, args): """Handle episode lookup: !imdb episode <series> -s N -e N""" parser = argparse.ArgumentParser(prog="!imdb episode", add_help=False) parser.add_argument("series", nargs="+", help="Series title") parser.add_argument("-s", "--season", type=int, required=True, help="Season number") parser.add_argument("-e", "--episode", type=int, required=True, help="Episode number") parser.add_argument("--short-plot", action="store_true", help="Short plot") try: parsed = parser.parse_args(args) series = " ".join(parsed.series) params = { "t": series, "season": parsed.season, "episode": parsed.episode, "plot": "short" if parsed.short_plot else "full" } except SystemExit: await bot.api.send_markdown_message(room.room_id, "Usage: `!imdb episode <series> -s <season> -e <episode>`\n" "Example: `!imdb episode 'Breaking Bad' -s 1 -e 1`") return except Exception as e: await bot.api.send_text_message(room.room_id, f"Error: {e}") return async with aiohttp.ClientSession() as session: data = await fetch_omdb(session, params) if data.get("Response") == "False": await bot.api.send_text_message(room.room_id, f"❌ Episode not found: {data.get('Error', '')}") return poster_path = await download_poster(session, data.get("Poster", "")) html = format_episode_html(data, poster_path) await bot.api.send_markdown_message(room.room_id, html) if poster_path: try: await bot.api.send_image_message(room_id=room.room_id, image_filepath=poster_path) finally: os.unlink(poster_path)