391 lines
16 KiB
Python
391 lines
16 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Fully Featured OMDb Plugin for Funguy Bot
|
||
|
||
Provides complete OMDb API functionality:
|
||
!imdb <title> - Get movie/series details
|
||
!imdb id <imdb_id> - Lookup by IMDb ID (e.g. tt0133093)
|
||
!imdb search <query> - Search for titles (returns list)
|
||
!imdb episode <series> -s N -e N - Get episode details
|
||
!imdb help - Show this help
|
||
|
||
Optional flags (works with title and id):
|
||
-y <year> - Year of release
|
||
-t <movie|series|episode> - Type filter
|
||
--short-plot - Show short plot (default is full)
|
||
|
||
Requires: OMDB_API_KEY in environment variables.
|
||
"""
|
||
|
||
import logging
|
||
import os
|
||
import aiohttp
|
||
import asyncio
|
||
import tempfile
|
||
import argparse
|
||
import shlex
|
||
from urllib.parse import quote_plus
|
||
|
||
# ----------------------------------------------------------------------
|
||
# Plugin setup (called by FunguyBot after bot instance is created)
|
||
# ----------------------------------------------------------------------
|
||
def setup(bot):
|
||
"""Called by the bot after the bot instance is created."""
|
||
api_key = os.getenv("OMDB_API_KEY")
|
||
if not api_key:
|
||
logging.warning("OMDB_API_KEY not set in environment. !imdb will not work.")
|
||
else:
|
||
logging.info("OMDb plugin loaded. API key present.")
|
||
|
||
# ----------------------------------------------------------------------
|
||
# Helper functions
|
||
# ----------------------------------------------------------------------
|
||
async def fetch_omdb(session, params):
|
||
"""Perform async GET request to OMDb API, return JSON."""
|
||
api_key = os.getenv("OMDB_API_KEY")
|
||
if not api_key:
|
||
return {"Error": "OMDB_API_KEY missing", "Response": "False"}
|
||
params["apikey"] = api_key
|
||
url = "https://www.omdbapi.com/"
|
||
try:
|
||
async with session.get(url, params=params, timeout=15) as resp:
|
||
if resp.status == 200:
|
||
return await resp.json()
|
||
else:
|
||
return {"Error": f"HTTP {resp.status}", "Response": "False"}
|
||
except asyncio.TimeoutError:
|
||
return {"Error": "Request timed out", "Response": "False"}
|
||
except Exception as e:
|
||
return {"Error": str(e), "Response": "False"}
|
||
|
||
async def download_poster(session, poster_url):
|
||
"""Download poster to a temp file, return path or None."""
|
||
if not poster_url or poster_url == "N/A":
|
||
return None
|
||
try:
|
||
async with session.get(poster_url, timeout=10) as resp:
|
||
if resp.status == 200:
|
||
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
|
||
tmp.write(await resp.read())
|
||
return tmp.name
|
||
else:
|
||
logging.debug(f"Poster download failed: HTTP {resp.status}")
|
||
return None
|
||
except Exception as e:
|
||
logging.debug(f"Poster download error: {e}")
|
||
return None
|
||
|
||
def format_movie_html(data, poster_path=None):
|
||
"""Format OMDb movie/series data into HTML for Matrix."""
|
||
if not data or data.get("Response") == "False":
|
||
error = data.get("Error", "No information found.")
|
||
return f"<em>❌ {error}</em>"
|
||
|
||
title = data.get("Title", "Unknown")
|
||
year = data.get("Year", "N/A")
|
||
rated = data.get("Rated", "N/A")
|
||
runtime = data.get("Runtime", "N/A")
|
||
genre = data.get("Genre", "N/A")
|
||
director = data.get("Director", "N/A")
|
||
writer = data.get("Writer", "N/A")
|
||
actors = data.get("Actors", "N/A")
|
||
plot = data.get("Plot", "N/A")
|
||
language = data.get("Language", "N/A")
|
||
country = data.get("Country", "N/A")
|
||
awards = data.get("Awards", "N/A")
|
||
poster_url = data.get("Poster", "")
|
||
imdb_rating = data.get("imdbRating", "N/A")
|
||
imdb_votes = data.get("imdbVotes", "N/A")
|
||
imdb_id = data.get("imdbID", "")
|
||
type_ = data.get("Type", "movie")
|
||
box_office = data.get("BoxOffice", "N/A")
|
||
production = data.get("Production", "N/A")
|
||
website = data.get("Website", "N/A")
|
||
|
||
# Rotten Tomatoes ratings
|
||
ratings = data.get("Ratings", [])
|
||
rt_rating = next((r["Value"] for r in ratings if r["Source"] == "Rotten Tomatoes"), "N/A")
|
||
mc_rating = next((r["Value"] for r in ratings if r["Source"] == "Metacritic"), "N/A")
|
||
|
||
# Build HTML
|
||
imdb_link = f"https://www.imdb.com/title/{imdb_id}/" if imdb_id and imdb_id != "N/A" else ""
|
||
title_line = f"<strong>🎬 {title}</strong> ({year})"
|
||
if imdb_link:
|
||
title_line += f" – <a href='{imdb_link}'>IMDb</a>"
|
||
|
||
lines = [title_line]
|
||
lines.append(f"⭐ <strong>IMDb:</strong> {imdb_rating}/10 ({imdb_votes} votes) | 🍅 <strong>Rotten:</strong> {rt_rating} | 📊 <strong>Metacritic:</strong> {mc_rating}")
|
||
lines.append(f"📅 <strong>Rated:</strong> {rated} | ⏱ <strong>Runtime:</strong> {runtime} | 🎭 <strong>Genre:</strong> {genre} | 🎬 <strong>Type:</strong> {type_.capitalize()}")
|
||
|
||
if director != "N/A":
|
||
lines.append(f"🎬 <strong>Director:</strong> {director}")
|
||
if writer != "N/A":
|
||
lines.append(f"✍️ <strong>Writer:</strong> {writer}")
|
||
if actors != "N/A":
|
||
lines.append(f"🎭 <strong>Actors:</strong> {actors}")
|
||
|
||
if plot != "N/A":
|
||
lines.append(f"📝 <strong>Plot:</strong> {plot}")
|
||
|
||
extra = []
|
||
if language != "N/A":
|
||
extra.append(f"🗣 {language}")
|
||
if country != "N/A":
|
||
extra.append(f"🌍 {country}")
|
||
if awards != "N/A" and awards.lower() != "n/a":
|
||
extra.append(f"🏆 {awards}")
|
||
if box_office != "N/A":
|
||
extra.append(f"💰 {box_office}")
|
||
if production != "N/A":
|
||
extra.append(f"🏢 {production}")
|
||
if website != "N/A" and website.lower() != "n/a":
|
||
extra.append(f"🌐 <a href='{website}'>Website</a>")
|
||
if extra:
|
||
lines.append("<strong>ℹ️ More:</strong> " + " | ".join(extra))
|
||
|
||
if poster_path:
|
||
lines.append("<em>📸 A poster image has been sent separately.</em>")
|
||
elif poster_url and poster_url != "N/A":
|
||
lines.append(f"📸 <a href='{poster_url}'>View Poster</a>")
|
||
|
||
html = "<br>".join(lines)
|
||
return f"<details><summary><strong>🎬 OMDb: {title} ({year})</strong></summary>{html}</details>"
|
||
|
||
def format_search_results(results):
|
||
"""Format search results (list of movies) into HTML."""
|
||
if not results or results.get("Response") == "False":
|
||
error = results.get("Error", "No results found.")
|
||
return f"<em>❌ {error}</em>"
|
||
|
||
search_list = results.get("Search", [])
|
||
if not search_list:
|
||
return "<em>❌ No matching titles found.</em>"
|
||
|
||
lines = ["<strong>🔍 Search Results:</strong><br><ul>"]
|
||
for item in search_list[:10]: # OMDb returns up to 10 per page
|
||
title = item.get("Title", "Unknown")
|
||
year = item.get("Year", "N/A")
|
||
imdb_id = item.get("imdbID", "")
|
||
type_ = item.get("Type", "movie")
|
||
link = f"https://www.imdb.com/title/{imdb_id}/" if imdb_id else ""
|
||
if link:
|
||
lines.append(f"<li><strong>{title}</strong> ({year}) – {type_.capitalize()} – <a href='{link}'>IMDb</a></li>")
|
||
else:
|
||
lines.append(f"<li><strong>{title}</strong> ({year}) – {type_.capitalize()}</li>")
|
||
lines.append("</ul>")
|
||
lines.append("<em>Use `!imdb <title>` or `!imdb id <tt...>` to get full details.</em>")
|
||
return "<br>".join(lines)
|
||
|
||
def format_episode_html(data, poster_path=None):
|
||
"""Format episode data specially."""
|
||
if not data or data.get("Response") == "False":
|
||
error = data.get("Error", "Episode not found.")
|
||
return f"<em>❌ {error}</em>"
|
||
|
||
title = data.get("Title", "Unknown Episode")
|
||
series = data.get("SeriesTitle", data.get("Title", "")) # Some APIs return SeriesTitle
|
||
season = data.get("Season", "?")
|
||
episode = data.get("Episode", "?")
|
||
year = data.get("Year", "N/A")
|
||
plot = data.get("Plot", "N/A")
|
||
imdb_rating = data.get("imdbRating", "N/A")
|
||
imdb_id = data.get("imdbID", "")
|
||
|
||
imdb_link = f"https://www.imdb.com/title/{imdb_id}/" if imdb_id else ""
|
||
header = f"<strong>📺 {series} – S{season}E{episode}: {title}</strong> ({year})"
|
||
if imdb_link:
|
||
header += f" – <a href='{imdb_link}'>IMDb</a>"
|
||
|
||
lines = [header]
|
||
lines.append(f"⭐ <strong>IMDb Rating:</strong> {imdb_rating}/10")
|
||
if plot != "N/A":
|
||
lines.append(f"📝 <strong>Plot:</strong> {plot}")
|
||
if poster_path:
|
||
lines.append("<em>📸 Episode poster attached.</em>")
|
||
html = "<br>".join(lines)
|
||
return f"<details><summary><strong>📺 Episode Info</strong></summary>{html}</details>"
|
||
|
||
# ----------------------------------------------------------------------
|
||
# Command parsing and dispatching
|
||
# ----------------------------------------------------------------------
|
||
async def handle_command(room, message, bot, prefix, config):
|
||
"""Main entry point for !imdb commands."""
|
||
import simplematrixbotlib as botlib
|
||
match = botlib.MessageMatch(room, message, bot, prefix)
|
||
if not (match.is_not_from_this_bot() and match.prefix() and match.command("imdb")):
|
||
return
|
||
|
||
# Full raw command text (including arguments)
|
||
raw_args = message.body.split()[1:] # skip "!imdb"
|
||
if not raw_args:
|
||
await send_help(room, bot)
|
||
return
|
||
|
||
# First argument determines subcommand
|
||
subcmd = raw_args[0].lower()
|
||
if subcmd == "help":
|
||
await send_help(room, bot)
|
||
return
|
||
elif subcmd == "search":
|
||
# !imdb search <query>
|
||
if len(raw_args) < 2:
|
||
await bot.api.send_text_message(room.room_id, "Usage: !imdb search <movie title>")
|
||
return
|
||
query = " ".join(raw_args[1:]).strip()
|
||
await handle_search(room, bot, query)
|
||
elif subcmd == "id":
|
||
# !imdb id <tt1234567>
|
||
if len(raw_args) < 2:
|
||
await bot.api.send_text_message(room.room_id, "Usage: !imdb id <IMDb ID> (e.g. tt0133093)")
|
||
return
|
||
imdb_id = raw_args[1].strip()
|
||
await handle_lookup_by_id(room, bot, imdb_id)
|
||
elif subcmd == "episode":
|
||
# !imdb episode <series> -s <season> -e <episode> [--short-plot]
|
||
await handle_episode(room, bot, raw_args[1:])
|
||
else:
|
||
# Assume it's a title lookup with optional flags
|
||
# Reconstruct command line: "!imdb <title> -y 1999 -t movie --short-plot"
|
||
# We'll parse using argparse on the whole raw_args (which is everything after !imdb)
|
||
await handle_title_lookup(room, bot, raw_args)
|
||
|
||
# ----------------------------------------------------------------------
|
||
# Subcommand implementations
|
||
# ----------------------------------------------------------------------
|
||
async def send_help(room, bot):
|
||
help_text = """
|
||
<details><summary><strong>🎬 OMDb Plugin – Complete Commands</strong></summary>
|
||
<p>
|
||
<strong>!imdb <title></strong><br>
|
||
Get full details for a movie or series.<br>
|
||
Flags: <code>-y YEAR</code> <code>-t movie|series|episode</code> <code>--short-plot</code><br>
|
||
Example: <code>!imdb "The Matrix" -y 1999 -t movie</code><br><br>
|
||
|
||
<strong>!imdb id <tt1234567></strong><br>
|
||
Lookup by IMDb ID.<br>
|
||
Example: <code>!imdb id tt0133093</code><br><br>
|
||
|
||
<strong>!imdb search <query></strong><br>
|
||
Returns a list of matching titles (up to 10).<br>
|
||
Example: <code>!imdb search Lord of the Rings</code><br><br>
|
||
|
||
<strong>!imdb episode <series> -s N -e N</strong><br>
|
||
Get details for a specific episode.<br>
|
||
Example: <code>!imdb episode "Breaking Bad" -s 1 -e 1</code><br><br>
|
||
|
||
<strong>!imdb help</strong><br>
|
||
Show this help.<br><br>
|
||
|
||
<em>Requires OMDB_API_KEY environment variable.</em>
|
||
</p>
|
||
</details>
|
||
"""
|
||
await bot.api.send_markdown_message(room.room_id, help_text)
|
||
|
||
async def handle_search(room, bot, query):
|
||
"""Perform a search (s=) and return formatted list."""
|
||
params = {"s": query}
|
||
async with aiohttp.ClientSession() as session:
|
||
data = await fetch_omdb(session, params)
|
||
if data.get("Response") == "False":
|
||
await bot.api.send_text_message(room.room_id, f"❌ {data.get('Error', 'Search failed')}")
|
||
return
|
||
html = format_search_results(data)
|
||
await bot.api.send_markdown_message(room.room_id, html)
|
||
|
||
async def handle_lookup_by_id(room, bot, imdb_id):
|
||
"""Lookup by IMDb ID (i=)."""
|
||
params = {"i": imdb_id, "plot": "full"}
|
||
async with aiohttp.ClientSession() as session:
|
||
data = await fetch_omdb(session, params)
|
||
if data.get("Response") == "False":
|
||
await bot.api.send_text_message(room.room_id, f"❌ {data.get('Error', 'ID not found')}")
|
||
return
|
||
poster_path = await download_poster(session, data.get("Poster", ""))
|
||
html = format_movie_html(data, poster_path)
|
||
await bot.api.send_markdown_message(room.room_id, html)
|
||
if poster_path:
|
||
try:
|
||
await bot.api.send_image_message(room_id=room.room_id, image_filepath=poster_path)
|
||
finally:
|
||
os.unlink(poster_path)
|
||
|
||
async def handle_title_lookup(room, bot, args):
|
||
"""Parse arguments for title lookup (supports -y, -t, --short-plot)."""
|
||
parser = argparse.ArgumentParser(prog="!imdb", add_help=False)
|
||
parser.add_argument("title", nargs="+", help="Movie or series title")
|
||
parser.add_argument("-y", "--year", type=str, help="Year of release")
|
||
parser.add_argument("-t", "--type", choices=["movie", "series", "episode"], help="Type filter")
|
||
parser.add_argument("--short-plot", action="store_true", help="Use short plot (default is full)")
|
||
try:
|
||
# argparse expects a list of strings; we already have args as list
|
||
parsed = parser.parse_args(args)
|
||
title = " ".join(parsed.title)
|
||
params = {"t": title, "plot": "short" if parsed.short_plot else "full"}
|
||
if parsed.year:
|
||
params["y"] = parsed.year
|
||
if parsed.type:
|
||
params["type"] = parsed.type
|
||
except SystemExit:
|
||
# argparse would exit on error; we catch and show usage
|
||
await bot.api.send_markdown_message(room.room_id,
|
||
"Usage: `!imdb <title> [-y YEAR] [-t movie|series|episode] [--short-plot]`\n"
|
||
"Example: `!imdb 'The Matrix' -y 1999 -t movie`")
|
||
return
|
||
except Exception as e:
|
||
await bot.api.send_text_message(room.room_id, f"Error parsing arguments: {e}")
|
||
return
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
data = await fetch_omdb(session, params)
|
||
if data.get("Response") == "False":
|
||
await bot.api.send_text_message(room.room_id, f"❌ {data.get('Error', 'Not found')}")
|
||
return
|
||
poster_path = await download_poster(session, data.get("Poster", ""))
|
||
html = format_movie_html(data, poster_path)
|
||
await bot.api.send_markdown_message(room.room_id, html)
|
||
if poster_path:
|
||
try:
|
||
await bot.api.send_image_message(room_id=room.room_id, image_filepath=poster_path)
|
||
finally:
|
||
os.unlink(poster_path)
|
||
|
||
async def handle_episode(room, bot, args):
|
||
"""Handle episode lookup: !imdb episode <series> -s N -e N"""
|
||
parser = argparse.ArgumentParser(prog="!imdb episode", add_help=False)
|
||
parser.add_argument("series", nargs="+", help="Series title")
|
||
parser.add_argument("-s", "--season", type=int, required=True, help="Season number")
|
||
parser.add_argument("-e", "--episode", type=int, required=True, help="Episode number")
|
||
parser.add_argument("--short-plot", action="store_true", help="Short plot")
|
||
try:
|
||
parsed = parser.parse_args(args)
|
||
series = " ".join(parsed.series)
|
||
params = {
|
||
"t": series,
|
||
"season": parsed.season,
|
||
"episode": parsed.episode,
|
||
"plot": "short" if parsed.short_plot else "full"
|
||
}
|
||
except SystemExit:
|
||
await bot.api.send_markdown_message(room.room_id,
|
||
"Usage: `!imdb episode <series> -s <season> -e <episode>`\n"
|
||
"Example: `!imdb episode 'Breaking Bad' -s 1 -e 1`")
|
||
return
|
||
except Exception as e:
|
||
await bot.api.send_text_message(room.room_id, f"Error: {e}")
|
||
return
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
data = await fetch_omdb(session, params)
|
||
if data.get("Response") == "False":
|
||
await bot.api.send_text_message(room.room_id, f"❌ Episode not found: {data.get('Error', '')}")
|
||
return
|
||
poster_path = await download_poster(session, data.get("Poster", ""))
|
||
html = format_episode_html(data, poster_path)
|
||
await bot.api.send_markdown_message(room.room_id, html)
|
||
if poster_path:
|
||
try:
|
||
await bot.api.send_image_message(room_id=room.room_id, image_filepath=poster_path)
|
||
finally:
|
||
os.unlink(poster_path)
|