#!/usr/bin/env python3
"""
Fully Featured OMDb Plugin for Funguy Bot
Provides complete OMDb API functionality:
!imdb
- Get movie/series details
!imdb id - Lookup by IMDb ID (e.g. tt0133093)
!imdb search - Search for titles (returns list)
!imdb episode -s N -e N - Get episode details
!imdb help - Show this help
Optional flags (works with title and id):
-y - Year of release
-t - Type filter
--short-plot - Show short plot (default is full)
Requires: OMDB_API_KEY in environment variables.
"""
import logging
import os
import aiohttp
import asyncio
import tempfile
import argparse
import shlex
from urllib.parse import quote_plus
# ----------------------------------------------------------------------
# Plugin setup (called by FunguyBot after bot instance is created)
# ----------------------------------------------------------------------
def setup(bot):
"""Called by the bot after the bot instance is created."""
api_key = os.getenv("OMDB_API_KEY")
if not api_key:
logging.warning("OMDB_API_KEY not set in environment. !imdb will not work.")
else:
logging.info("OMDb plugin loaded. API key present.")
# ----------------------------------------------------------------------
# Helper functions
# ----------------------------------------------------------------------
async def fetch_omdb(session, params):
"""Perform async GET request to OMDb API, return JSON."""
api_key = os.getenv("OMDB_API_KEY")
if not api_key:
return {"Error": "OMDB_API_KEY missing", "Response": "False"}
params["apikey"] = api_key
url = "https://www.omdbapi.com/"
try:
async with session.get(url, params=params, timeout=15) as resp:
if resp.status == 200:
return await resp.json()
else:
return {"Error": f"HTTP {resp.status}", "Response": "False"}
except asyncio.TimeoutError:
return {"Error": "Request timed out", "Response": "False"}
except Exception as e:
return {"Error": str(e), "Response": "False"}
async def download_poster(session, poster_url):
"""Download poster to a temp file, return path or None."""
if not poster_url or poster_url == "N/A":
return None
try:
async with session.get(poster_url, timeout=10) as resp:
if resp.status == 200:
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
tmp.write(await resp.read())
return tmp.name
else:
logging.debug(f"Poster download failed: HTTP {resp.status}")
return None
except Exception as e:
logging.debug(f"Poster download error: {e}")
return None
def format_movie_html(data, poster_path=None):
"""Format OMDb movie/series data into HTML for Matrix."""
if not data or data.get("Response") == "False":
error = data.get("Error", "No information found.")
return f"â {error}"
title = data.get("Title", "Unknown")
year = data.get("Year", "N/A")
rated = data.get("Rated", "N/A")
runtime = data.get("Runtime", "N/A")
genre = data.get("Genre", "N/A")
director = data.get("Director", "N/A")
writer = data.get("Writer", "N/A")
actors = data.get("Actors", "N/A")
plot = data.get("Plot", "N/A")
language = data.get("Language", "N/A")
country = data.get("Country", "N/A")
awards = data.get("Awards", "N/A")
poster_url = data.get("Poster", "")
imdb_rating = data.get("imdbRating", "N/A")
imdb_votes = data.get("imdbVotes", "N/A")
imdb_id = data.get("imdbID", "")
type_ = data.get("Type", "movie")
box_office = data.get("BoxOffice", "N/A")
production = data.get("Production", "N/A")
website = data.get("Website", "N/A")
# Rotten Tomatoes ratings
ratings = data.get("Ratings", [])
rt_rating = next((r["Value"] for r in ratings if r["Source"] == "Rotten Tomatoes"), "N/A")
mc_rating = next((r["Value"] for r in ratings if r["Source"] == "Metacritic"), "N/A")
# Build HTML
imdb_link = f"https://www.imdb.com/title/{imdb_id}/" if imdb_id and imdb_id != "N/A" else ""
title_line = f"đŦ {title} ({year})"
if imdb_link:
title_line += f" â IMDb"
lines = [title_line]
lines.append(f"â IMDb: {imdb_rating}/10 ({imdb_votes} votes) | đ
Rotten: {rt_rating} | đ Metacritic: {mc_rating}")
lines.append(f"đ
Rated: {rated} | âą Runtime: {runtime} | đ Genre: {genre} | đŦ Type: {type_.capitalize()}")
if director != "N/A":
lines.append(f"đŦ Director: {director}")
if writer != "N/A":
lines.append(f"âī¸ Writer: {writer}")
if actors != "N/A":
lines.append(f"đ Actors: {actors}")
if plot != "N/A":
lines.append(f"đ Plot: {plot}")
extra = []
if language != "N/A":
extra.append(f"đŖ {language}")
if country != "N/A":
extra.append(f"đ {country}")
if awards != "N/A" and awards.lower() != "n/a":
extra.append(f"đ {awards}")
if box_office != "N/A":
extra.append(f"đ° {box_office}")
if production != "N/A":
extra.append(f"đĸ {production}")
if website != "N/A" and website.lower() != "n/a":
extra.append(f"đ Website")
if extra:
lines.append("âšī¸ More: " + " | ".join(extra))
if poster_path:
lines.append("đ¸ A poster image has been sent separately.")
elif poster_url and poster_url != "N/A":
lines.append(f"đ¸ View Poster")
html = "
".join(lines)
return f"đŦ OMDb: {title} ({year})
{html} "
def format_search_results(results):
"""Format search results (list of movies) into HTML."""
if not results or results.get("Response") == "False":
error = results.get("Error", "No results found.")
return f"â {error}"
search_list = results.get("Search", [])
if not search_list:
return "â No matching titles found."
lines = ["đ Search Results:
"]
for item in search_list[:10]: # OMDb returns up to 10 per page
title = item.get("Title", "Unknown")
year = item.get("Year", "N/A")
imdb_id = item.get("imdbID", "")
type_ = item.get("Type", "movie")
link = f"https://www.imdb.com/title/{imdb_id}/" if imdb_id else ""
if link:
lines.append(f"- {title} ({year}) â {type_.capitalize()} â IMDb
")
else:
lines.append(f"- {title} ({year}) â {type_.capitalize()}
")
lines.append("
")
lines.append("Use `!imdb ` or `!imdb id ` to get full details.")
return "
".join(lines)
def format_episode_html(data, poster_path=None):
"""Format episode data specially."""
if not data or data.get("Response") == "False":
error = data.get("Error", "Episode not found.")
return f"â {error}"
title = data.get("Title", "Unknown Episode")
series = data.get("SeriesTitle", data.get("Title", "")) # Some APIs return SeriesTitle
season = data.get("Season", "?")
episode = data.get("Episode", "?")
year = data.get("Year", "N/A")
plot = data.get("Plot", "N/A")
imdb_rating = data.get("imdbRating", "N/A")
imdb_id = data.get("imdbID", "")
imdb_link = f"https://www.imdb.com/title/{imdb_id}/" if imdb_id else ""
header = f"đē {series} â S{season}E{episode}: {title} ({year})"
if imdb_link:
header += f" â IMDb"
lines = [header]
lines.append(f"â IMDb Rating: {imdb_rating}/10")
if plot != "N/A":
lines.append(f"đ Plot: {plot}")
if poster_path:
lines.append("đ¸ Episode poster attached.")
html = "
".join(lines)
return f"đē Episode Info
{html} "
# ----------------------------------------------------------------------
# Command parsing and dispatching
# ----------------------------------------------------------------------
async def handle_command(room, message, bot, prefix, config):
"""Main entry point for !imdb commands."""
import simplematrixbotlib as botlib
match = botlib.MessageMatch(room, message, bot, prefix)
if not (match.is_not_from_this_bot() and match.prefix() and match.command("imdb")):
return
# Full raw command text (including arguments)
raw_args = message.body.split()[1:] # skip "!imdb"
if not raw_args:
await send_help(room, bot)
return
# First argument determines subcommand
subcmd = raw_args[0].lower()
if subcmd == "help":
await send_help(room, bot)
return
elif subcmd == "search":
# !imdb search
if len(raw_args) < 2:
await bot.api.send_text_message(room.room_id, "Usage: !imdb search ")
return
query = " ".join(raw_args[1:]).strip()
await handle_search(room, bot, query)
elif subcmd == "id":
# !imdb id
if len(raw_args) < 2:
await bot.api.send_text_message(room.room_id, "Usage: !imdb id (e.g. tt0133093)")
return
imdb_id = raw_args[1].strip()
await handle_lookup_by_id(room, bot, imdb_id)
elif subcmd == "episode":
# !imdb episode -s -e [--short-plot]
await handle_episode(room, bot, raw_args[1:])
else:
# Assume it's a title lookup with optional flags
# Reconstruct command line: "!imdb -y 1999 -t movie --short-plot"
# We'll parse using argparse on the whole raw_args (which is everything after !imdb)
await handle_title_lookup(room, bot, raw_args)
# ----------------------------------------------------------------------
# Subcommand implementations
# ----------------------------------------------------------------------
async def send_help(room, bot):
help_text = """
đŦ OMDb Plugin â Complete Commands
!imdb <title>
Get full details for a movie or series.
Flags: -y YEAR -t movie|series|episode --short-plot
Example: !imdb "The Matrix" -y 1999 -t movie
!imdb id <tt1234567>
Lookup by IMDb ID.
Example: !imdb id tt0133093
!imdb search <query>
Returns a list of matching titles (up to 10).
Example: !imdb search Lord of the Rings
!imdb episode <series> -s N -e N
Get details for a specific episode.
Example: !imdb episode "Breaking Bad" -s 1 -e 1
!imdb help
Show this help.
Requires OMDB_API_KEY environment variable.
"""
await bot.api.send_markdown_message(room.room_id, help_text)
async def handle_search(room, bot, query):
"""Perform a search (s=) and return formatted list."""
params = {"s": query}
async with aiohttp.ClientSession() as session:
data = await fetch_omdb(session, params)
if data.get("Response") == "False":
await bot.api.send_text_message(room.room_id, f"â {data.get('Error', 'Search failed')}")
return
html = format_search_results(data)
await bot.api.send_markdown_message(room.room_id, html)
async def handle_lookup_by_id(room, bot, imdb_id):
"""Lookup by IMDb ID (i=)."""
params = {"i": imdb_id, "plot": "full"}
async with aiohttp.ClientSession() as session:
data = await fetch_omdb(session, params)
if data.get("Response") == "False":
await bot.api.send_text_message(room.room_id, f"â {data.get('Error', 'ID not found')}")
return
poster_path = await download_poster(session, data.get("Poster", ""))
html = format_movie_html(data, poster_path)
await bot.api.send_markdown_message(room.room_id, html)
if poster_path:
try:
await bot.api.send_image_message(room_id=room.room_id, image_filepath=poster_path)
finally:
os.unlink(poster_path)
async def handle_title_lookup(room, bot, args):
"""Parse arguments for title lookup (supports -y, -t, --short-plot)."""
parser = argparse.ArgumentParser(prog="!imdb", add_help=False)
parser.add_argument("title", nargs="+", help="Movie or series title")
parser.add_argument("-y", "--year", type=str, help="Year of release")
parser.add_argument("-t", "--type", choices=["movie", "series", "episode"], help="Type filter")
parser.add_argument("--short-plot", action="store_true", help="Use short plot (default is full)")
try:
# argparse expects a list of strings; we already have args as list
parsed = parser.parse_args(args)
title = " ".join(parsed.title)
params = {"t": title, "plot": "short" if parsed.short_plot else "full"}
if parsed.year:
params["y"] = parsed.year
if parsed.type:
params["type"] = parsed.type
except SystemExit:
# argparse would exit on error; we catch and show usage
await bot.api.send_markdown_message(room.room_id,
"Usage: `!imdb [-y YEAR] [-t movie|series|episode] [--short-plot]`\n"
"Example: `!imdb 'The Matrix' -y 1999 -t movie`")
return
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error parsing arguments: {e}")
return
async with aiohttp.ClientSession() as session:
data = await fetch_omdb(session, params)
if data.get("Response") == "False":
await bot.api.send_text_message(room.room_id, f"â {data.get('Error', 'Not found')}")
return
poster_path = await download_poster(session, data.get("Poster", ""))
html = format_movie_html(data, poster_path)
await bot.api.send_markdown_message(room.room_id, html)
if poster_path:
try:
await bot.api.send_image_message(room_id=room.room_id, image_filepath=poster_path)
finally:
os.unlink(poster_path)
async def handle_episode(room, bot, args):
"""Handle episode lookup: !imdb episode -s N -e N"""
parser = argparse.ArgumentParser(prog="!imdb episode", add_help=False)
parser.add_argument("series", nargs="+", help="Series title")
parser.add_argument("-s", "--season", type=int, required=True, help="Season number")
parser.add_argument("-e", "--episode", type=int, required=True, help="Episode number")
parser.add_argument("--short-plot", action="store_true", help="Short plot")
try:
parsed = parser.parse_args(args)
series = " ".join(parsed.series)
params = {
"t": series,
"season": parsed.season,
"episode": parsed.episode,
"plot": "short" if parsed.short_plot else "full"
}
except SystemExit:
await bot.api.send_markdown_message(room.room_id,
"Usage: `!imdb episode -s -e `\n"
"Example: `!imdb episode 'Breaking Bad' -s 1 -e 1`")
return
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error: {e}")
return
async with aiohttp.ClientSession() as session:
data = await fetch_omdb(session, params)
if data.get("Response") == "False":
await bot.api.send_text_message(room.room_id, f"â Episode not found: {data.get('Error', '')}")
return
poster_path = await download_poster(session, data.get("Poster", ""))
html = format_episode_html(data, poster_path)
await bot.api.send_markdown_message(room.room_id, html)
if poster_path:
try:
await bot.api.send_image_message(room_id=room.room_id, image_filepath=poster_path)
finally:
os.unlink(poster_path)
# ---------------------------------------------------------------------------
# Plugin Metadata
# ---------------------------------------------------------------------------
__version__ = "1.0.0"
__author__ = "Funguy Bot"
__description__ = "IMDb lookup via OMDb API"
__help__ = """
!imdb â Movie/series details via OMDb
!imdb <title> â Full details (poster sent separately)
!imdb id <tt1234567> â Lookup by IMDb ID
!imdb search <query> â Search titles
!imdb episode <series> -s N -e N â Episode info
Optional flags: -y year, -t movie|series|episode, --short-plot
Requires OMDB_API_KEY env var.
"""