Files
FunguyBot/plugins/imdb.py
T

412 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Fully Featured OMDb Plugin for Funguy Bot
Provides complete OMDb API functionality:
!imdb <title> - Get movie/series details
!imdb id <imdb_id> - Lookup by IMDb ID (e.g. tt0133093)
!imdb search <query> - Search for titles (returns list)
!imdb episode <series> -s N -e N - Get episode details
!imdb help - Show this help
Optional flags (works with title and id):
-y <year> - Year of release
-t <movie|series|episode> - Type filter
--short-plot - Show short plot (default is full)
Requires: OMDB_API_KEY in environment variables.
"""
import logging
import os
import aiohttp
import asyncio
import tempfile
import argparse
import shlex
from urllib.parse import quote_plus
# ----------------------------------------------------------------------
# Plugin setup (called by FunguyBot after bot instance is created)
# ----------------------------------------------------------------------
def setup(bot):
"""Called by the bot after the bot instance is created."""
api_key = os.getenv("OMDB_API_KEY")
if not api_key:
logging.warning("OMDB_API_KEY not set in environment. !imdb will not work.")
else:
logging.info("OMDb plugin loaded. API key present.")
# ----------------------------------------------------------------------
# Helper functions
# ----------------------------------------------------------------------
async def fetch_omdb(session, params):
"""Perform async GET request to OMDb API, return JSON."""
api_key = os.getenv("OMDB_API_KEY")
if not api_key:
return {"Error": "OMDB_API_KEY missing", "Response": "False"}
params["apikey"] = api_key
url = "https://www.omdbapi.com/"
try:
async with session.get(url, params=params, timeout=15) as resp:
if resp.status == 200:
return await resp.json()
else:
return {"Error": f"HTTP {resp.status}", "Response": "False"}
except asyncio.TimeoutError:
return {"Error": "Request timed out", "Response": "False"}
except Exception as e:
return {"Error": str(e), "Response": "False"}
async def download_poster(session, poster_url):
"""Download poster to a temp file, return path or None."""
if not poster_url or poster_url == "N/A":
return None
try:
async with session.get(poster_url, timeout=10) as resp:
if resp.status == 200:
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
tmp.write(await resp.read())
return tmp.name
else:
logging.debug(f"Poster download failed: HTTP {resp.status}")
return None
except Exception as e:
logging.debug(f"Poster download error: {e}")
return None
def format_movie_html(data, poster_path=None):
"""Format OMDb movie/series data into HTML for Matrix."""
if not data or data.get("Response") == "False":
error = data.get("Error", "No information found.")
return f"<em>❌ {error}</em>"
title = data.get("Title", "Unknown")
year = data.get("Year", "N/A")
rated = data.get("Rated", "N/A")
runtime = data.get("Runtime", "N/A")
genre = data.get("Genre", "N/A")
director = data.get("Director", "N/A")
writer = data.get("Writer", "N/A")
actors = data.get("Actors", "N/A")
plot = data.get("Plot", "N/A")
language = data.get("Language", "N/A")
country = data.get("Country", "N/A")
awards = data.get("Awards", "N/A")
poster_url = data.get("Poster", "")
imdb_rating = data.get("imdbRating", "N/A")
imdb_votes = data.get("imdbVotes", "N/A")
imdb_id = data.get("imdbID", "")
type_ = data.get("Type", "movie")
box_office = data.get("BoxOffice", "N/A")
production = data.get("Production", "N/A")
website = data.get("Website", "N/A")
# Rotten Tomatoes ratings
ratings = data.get("Ratings", [])
rt_rating = next((r["Value"] for r in ratings if r["Source"] == "Rotten Tomatoes"), "N/A")
mc_rating = next((r["Value"] for r in ratings if r["Source"] == "Metacritic"), "N/A")
# Build HTML
imdb_link = f"https://www.imdb.com/title/{imdb_id}/" if imdb_id and imdb_id != "N/A" else ""
title_line = f"<strong>🎬 {title}</strong> ({year})"
if imdb_link:
title_line += f" <a href='{imdb_link}'>IMDb</a>"
lines = [title_line]
lines.append(f"⭐ <strong>IMDb:</strong> {imdb_rating}/10 ({imdb_votes} votes) | 🍅 <strong>Rotten:</strong> {rt_rating} | 📊 <strong>Metacritic:</strong> {mc_rating}")
lines.append(f"📅 <strong>Rated:</strong> {rated} | ⏱ <strong>Runtime:</strong> {runtime} | 🎭 <strong>Genre:</strong> {genre} | 🎬 <strong>Type:</strong> {type_.capitalize()}")
if director != "N/A":
lines.append(f"🎬 <strong>Director:</strong> {director}")
if writer != "N/A":
lines.append(f"✍️ <strong>Writer:</strong> {writer}")
if actors != "N/A":
lines.append(f"🎭 <strong>Actors:</strong> {actors}")
if plot != "N/A":
lines.append(f"📝 <strong>Plot:</strong> {plot}")
extra = []
if language != "N/A":
extra.append(f"🗣 {language}")
if country != "N/A":
extra.append(f"🌍 {country}")
if awards != "N/A" and awards.lower() != "n/a":
extra.append(f"🏆 {awards}")
if box_office != "N/A":
extra.append(f"💰 {box_office}")
if production != "N/A":
extra.append(f"🏢 {production}")
if website != "N/A" and website.lower() != "n/a":
extra.append(f"🌐 <a href='{website}'>Website</a>")
if extra:
lines.append("<strong>️ More:</strong> " + " | ".join(extra))
if poster_path:
lines.append("<em>📸 A poster image has been sent separately.</em>")
elif poster_url and poster_url != "N/A":
lines.append(f"📸 <a href='{poster_url}'>View Poster</a>")
html = "<br>".join(lines)
return f"<details><summary><strong>🎬 OMDb: {title} ({year})</strong></summary>{html}</details>"
def format_search_results(results):
"""Format search results (list of movies) into HTML."""
if not results or results.get("Response") == "False":
error = results.get("Error", "No results found.")
return f"<em>❌ {error}</em>"
search_list = results.get("Search", [])
if not search_list:
return "<em>❌ No matching titles found.</em>"
lines = ["<strong>🔍 Search Results:</strong><br><ul>"]
for item in search_list[:10]: # OMDb returns up to 10 per page
title = item.get("Title", "Unknown")
year = item.get("Year", "N/A")
imdb_id = item.get("imdbID", "")
type_ = item.get("Type", "movie")
link = f"https://www.imdb.com/title/{imdb_id}/" if imdb_id else ""
if link:
lines.append(f"<li><strong>{title}</strong> ({year}) {type_.capitalize()} <a href='{link}'>IMDb</a></li>")
else:
lines.append(f"<li><strong>{title}</strong> ({year}) {type_.capitalize()}</li>")
lines.append("</ul>")
lines.append("<em>Use `!imdb <title>` or `!imdb id <tt...>` to get full details.</em>")
return "<br>".join(lines)
def format_episode_html(data, poster_path=None):
"""Format episode data specially."""
if not data or data.get("Response") == "False":
error = data.get("Error", "Episode not found.")
return f"<em>❌ {error}</em>"
title = data.get("Title", "Unknown Episode")
series = data.get("SeriesTitle", data.get("Title", "")) # Some APIs return SeriesTitle
season = data.get("Season", "?")
episode = data.get("Episode", "?")
year = data.get("Year", "N/A")
plot = data.get("Plot", "N/A")
imdb_rating = data.get("imdbRating", "N/A")
imdb_id = data.get("imdbID", "")
imdb_link = f"https://www.imdb.com/title/{imdb_id}/" if imdb_id else ""
header = f"<strong>📺 {series} S{season}E{episode}: {title}</strong> ({year})"
if imdb_link:
header += f" <a href='{imdb_link}'>IMDb</a>"
lines = [header]
lines.append(f"⭐ <strong>IMDb Rating:</strong> {imdb_rating}/10")
if plot != "N/A":
lines.append(f"📝 <strong>Plot:</strong> {plot}")
if poster_path:
lines.append("<em>📸 Episode poster attached.</em>")
html = "<br>".join(lines)
return f"<details><summary><strong>📺 Episode Info</strong></summary>{html}</details>"
# ----------------------------------------------------------------------
# Command parsing and dispatching
# ----------------------------------------------------------------------
async def handle_command(room, message, bot, prefix, config):
"""Main entry point for !imdb commands."""
import simplematrixbotlib as botlib
match = botlib.MessageMatch(room, message, bot, prefix)
if not (match.is_not_from_this_bot() and match.prefix() and match.command("imdb")):
return
# Full raw command text (including arguments)
raw_args = message.body.split()[1:] # skip "!imdb"
if not raw_args:
await send_help(room, bot)
return
# First argument determines subcommand
subcmd = raw_args[0].lower()
if subcmd == "help":
await send_help(room, bot)
return
elif subcmd == "search":
# !imdb search <query>
if len(raw_args) < 2:
await bot.api.send_text_message(room.room_id, "Usage: !imdb search <movie title>")
return
query = " ".join(raw_args[1:]).strip()
await handle_search(room, bot, query)
elif subcmd == "id":
# !imdb id <tt1234567>
if len(raw_args) < 2:
await bot.api.send_text_message(room.room_id, "Usage: !imdb id <IMDb ID> (e.g. tt0133093)")
return
imdb_id = raw_args[1].strip()
await handle_lookup_by_id(room, bot, imdb_id)
elif subcmd == "episode":
# !imdb episode <series> -s <season> -e <episode> [--short-plot]
await handle_episode(room, bot, raw_args[1:])
else:
# Assume it's a title lookup with optional flags
# Reconstruct command line: "!imdb <title> -y 1999 -t movie --short-plot"
# We'll parse using argparse on the whole raw_args (which is everything after !imdb)
await handle_title_lookup(room, bot, raw_args)
# ----------------------------------------------------------------------
# Subcommand implementations
# ----------------------------------------------------------------------
async def send_help(room, bot):
help_text = """
<details><summary><strong>🎬 OMDb Plugin Complete Commands</strong></summary>
<p>
<strong>!imdb &lt;title&gt;</strong><br>
Get full details for a movie or series.<br>
Flags: <code>-y YEAR</code> <code>-t movie|series|episode</code> <code>--short-plot</code><br>
Example: <code>!imdb "The Matrix" -y 1999 -t movie</code><br><br>
<strong>!imdb id &lt;tt1234567&gt;</strong><br>
Lookup by IMDb ID.<br>
Example: <code>!imdb id tt0133093</code><br><br>
<strong>!imdb search &lt;query&gt;</strong><br>
Returns a list of matching titles (up to 10).<br>
Example: <code>!imdb search Lord of the Rings</code><br><br>
<strong>!imdb episode &lt;series&gt; -s N -e N</strong><br>
Get details for a specific episode.<br>
Example: <code>!imdb episode "Breaking Bad" -s 1 -e 1</code><br><br>
<strong>!imdb help</strong><br>
Show this help.<br><br>
<em>Requires OMDB_API_KEY environment variable.</em>
</p>
</details>
"""
await bot.api.send_markdown_message(room.room_id, help_text)
async def handle_search(room, bot, query):
"""Perform a search (s=) and return formatted list."""
params = {"s": query}
async with aiohttp.ClientSession() as session:
data = await fetch_omdb(session, params)
if data.get("Response") == "False":
await bot.api.send_text_message(room.room_id, f"{data.get('Error', 'Search failed')}")
return
html = format_search_results(data)
await bot.api.send_markdown_message(room.room_id, html)
async def handle_lookup_by_id(room, bot, imdb_id):
"""Lookup by IMDb ID (i=)."""
params = {"i": imdb_id, "plot": "full"}
async with aiohttp.ClientSession() as session:
data = await fetch_omdb(session, params)
if data.get("Response") == "False":
await bot.api.send_text_message(room.room_id, f"{data.get('Error', 'ID not found')}")
return
poster_path = await download_poster(session, data.get("Poster", ""))
html = format_movie_html(data, poster_path)
await bot.api.send_markdown_message(room.room_id, html)
if poster_path:
try:
await bot.api.send_image_message(room_id=room.room_id, image_filepath=poster_path)
finally:
os.unlink(poster_path)
async def handle_title_lookup(room, bot, args):
"""Parse arguments for title lookup (supports -y, -t, --short-plot)."""
parser = argparse.ArgumentParser(prog="!imdb", add_help=False)
parser.add_argument("title", nargs="+", help="Movie or series title")
parser.add_argument("-y", "--year", type=str, help="Year of release")
parser.add_argument("-t", "--type", choices=["movie", "series", "episode"], help="Type filter")
parser.add_argument("--short-plot", action="store_true", help="Use short plot (default is full)")
try:
# argparse expects a list of strings; we already have args as list
parsed = parser.parse_args(args)
title = " ".join(parsed.title)
params = {"t": title, "plot": "short" if parsed.short_plot else "full"}
if parsed.year:
params["y"] = parsed.year
if parsed.type:
params["type"] = parsed.type
except SystemExit:
# argparse would exit on error; we catch and show usage
await bot.api.send_markdown_message(room.room_id,
"Usage: `!imdb <title> [-y YEAR] [-t movie|series|episode] [--short-plot]`\n"
"Example: `!imdb 'The Matrix' -y 1999 -t movie`")
return
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error parsing arguments: {e}")
return
async with aiohttp.ClientSession() as session:
data = await fetch_omdb(session, params)
if data.get("Response") == "False":
await bot.api.send_text_message(room.room_id, f"{data.get('Error', 'Not found')}")
return
poster_path = await download_poster(session, data.get("Poster", ""))
html = format_movie_html(data, poster_path)
await bot.api.send_markdown_message(room.room_id, html)
if poster_path:
try:
await bot.api.send_image_message(room_id=room.room_id, image_filepath=poster_path)
finally:
os.unlink(poster_path)
async def handle_episode(room, bot, args):
"""Handle episode lookup: !imdb episode <series> -s N -e N"""
parser = argparse.ArgumentParser(prog="!imdb episode", add_help=False)
parser.add_argument("series", nargs="+", help="Series title")
parser.add_argument("-s", "--season", type=int, required=True, help="Season number")
parser.add_argument("-e", "--episode", type=int, required=True, help="Episode number")
parser.add_argument("--short-plot", action="store_true", help="Short plot")
try:
parsed = parser.parse_args(args)
series = " ".join(parsed.series)
params = {
"t": series,
"season": parsed.season,
"episode": parsed.episode,
"plot": "short" if parsed.short_plot else "full"
}
except SystemExit:
await bot.api.send_markdown_message(room.room_id,
"Usage: `!imdb episode <series> -s <season> -e <episode>`\n"
"Example: `!imdb episode 'Breaking Bad' -s 1 -e 1`")
return
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error: {e}")
return
async with aiohttp.ClientSession() as session:
data = await fetch_omdb(session, params)
if data.get("Response") == "False":
await bot.api.send_text_message(room.room_id, f"❌ Episode not found: {data.get('Error', '')}")
return
poster_path = await download_poster(session, data.get("Poster", ""))
html = format_episode_html(data, poster_path)
await bot.api.send_markdown_message(room.room_id, html)
if poster_path:
try:
await bot.api.send_image_message(room_id=room.room_id, image_filepath=poster_path)
finally:
os.unlink(poster_path)
# ---------------------------------------------------------------------------
# Plugin Metadata
# ---------------------------------------------------------------------------
__version__ = "1.0.0"
__author__ = "Funguy Bot"
__description__ = "IMDb lookup via OMDb API"
__help__ = """
<details>
<summary><strong>!imdb</strong> Movie/series details via OMDb</summary>
<ul>
<li><code>!imdb &lt;title&gt;</code> Full details (poster sent separately)</li>
<li><code>!imdb id &lt;tt1234567&gt;</code> Lookup by IMDb ID</li>
<li><code>!imdb search &lt;query&gt;</code> Search titles</li>
<li><code>!imdb episode &lt;series&gt; -s N -e N</code> Episode info</li>
</ul>
<p>Optional flags: <code>-y year</code>, <code>-t movie|series|episode</code>, <code>--short-plot</code><br>
Requires <strong>OMDB_API_KEY</strong> env var.</p>
</details>
"""