299 lines
12 KiB
Python
299 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Wikipedia plugin for FunguyBot – uses MediaWiki APIs exclusively.
|
||
No HTML scraping, no BeautifulSoup, no regex on article HTML.
|
||
|
||
Commands:
|
||
!wp <search term> - Fetch summary + main image
|
||
!wp help - Show usage
|
||
|
||
Title resolution (strategies in order):
|
||
1. Action API `opensearch` – fast, returns the most likely title directly.
|
||
2. Action API `list=search` – full‑text search, good fallback.
|
||
3. Direct hyphenated title (spaces → hyphens) – works for many drugs/chemicals.
|
||
All errors are logged. No API key required.
|
||
"""
|
||
|
||
import logging
|
||
import re
|
||
import aiohttp
|
||
import tempfile
|
||
import os
|
||
import simplematrixbotlib as botlib
|
||
from urllib.parse import quote
|
||
|
||
# ----------------------------------------------------------------------
|
||
# Plugin lifecycle
|
||
# ----------------------------------------------------------------------
|
||
def setup(bot):
|
||
logging.info("Wikipedia plugin (API‑only) loaded. !wp command ready.")
|
||
|
||
# ----------------------------------------------------------------------
|
||
# Constants
|
||
# ----------------------------------------------------------------------
|
||
HEADERS = {'User-Agent': 'FunguyBot/2.0 (Matrix bot; educational; contact: your-email@example.com)'}
|
||
BASE = 'https://en.wikipedia.org'
|
||
|
||
# ----------------------------------------------------------------------
|
||
# REST API: summary (the cleanest way to get extract + image)
|
||
# ----------------------------------------------------------------------
|
||
async def _fetch_summary(session: aiohttp.ClientSession, title: str, lang: str = 'en') -> dict | None:
|
||
"""
|
||
GET /api/rest_v1/page/summary/<title>
|
||
Wikipedia follows redirects automatically (e.g. 4-aco-dmt -> 4-AcO-DMT).
|
||
Returns JSON dict or None.
|
||
"""
|
||
encoded = quote(title.replace(' ', '_'))
|
||
url = f'https://{lang}.wikipedia.org/api/rest_v1/page/summary/{encoded}'
|
||
try:
|
||
async with session.get(url, timeout=aiohttp.ClientTimeout(total=12)) as resp:
|
||
logging.info(f'Summary [{resp.status}] for {title!r}')
|
||
if resp.status == 200:
|
||
return await resp.json(content_type=None)
|
||
return None
|
||
except Exception as e:
|
||
logging.warning(f'Summary fetch error for {title!r}: {e}')
|
||
return None
|
||
|
||
# ----------------------------------------------------------------------
|
||
# Strategy 1: OpenSearch – Wikipedia's "suggest" API, returns exact title
|
||
# ----------------------------------------------------------------------
|
||
async def _strategy_opensearch(session: aiohttp.ClientSession, query: str, lang: str) -> str | None:
|
||
"""
|
||
GET /w/api.php?action=opensearch&search=<query>&namespace=0&limit=1
|
||
Returns a list: [query, [title1,...], [desc1,...], [url1,...]]
|
||
The first title is usually the best match.
|
||
"""
|
||
url = f'https://{lang}.wikipedia.org/w/api.php'
|
||
params = {
|
||
'action': 'opensearch',
|
||
'search': query,
|
||
'namespace': 0,
|
||
'limit': 1,
|
||
'format': 'json'
|
||
}
|
||
try:
|
||
async with session.get(url, params=params, timeout=10) as resp:
|
||
logging.info(f'OpenSearch [{resp.status}] for {query!r}')
|
||
if resp.status == 200:
|
||
data = await resp.json()
|
||
# data = [query, [title], [description], [url]]
|
||
if len(data) >= 2 and data[1]:
|
||
title = data[1][0]
|
||
logging.info(f'OpenSearch found: {title!r}')
|
||
return title
|
||
except Exception as e:
|
||
logging.warning(f'OpenSearch error: {e}')
|
||
return None
|
||
|
||
# ----------------------------------------------------------------------
|
||
# Strategy 2: Action API full‑text search (list=search)
|
||
# ----------------------------------------------------------------------
|
||
async def _strategy_action_search(session: aiohttp.ClientSession, query: str, lang: str) -> str | None:
|
||
url = f'https://{lang}.wikipedia.org/w/api.php'
|
||
params = {
|
||
'action': 'query',
|
||
'list': 'search',
|
||
'srsearch': query,
|
||
'srlimit': 1,
|
||
'srnamespace': 0,
|
||
'format': 'json',
|
||
}
|
||
try:
|
||
async with session.get(url, params=params, timeout=10) as resp:
|
||
logging.info(f'Action search [{resp.status}] for {query!r}')
|
||
if resp.status == 200:
|
||
data = await resp.json(content_type=None)
|
||
results = data.get('query', {}).get('search', [])
|
||
if results:
|
||
title = results[0].get('title')
|
||
logging.info(f'Action search found: {title!r}')
|
||
return title
|
||
except Exception as e:
|
||
logging.warning(f'Action search error: {e}')
|
||
return None
|
||
|
||
# ----------------------------------------------------------------------
|
||
# Strategy 3: Direct hyphenated title (spaces → hyphens)
|
||
# ----------------------------------------------------------------------
|
||
async def _strategy_direct_hyphenated(session: aiohttp.ClientSession, query: str, lang: str) -> dict | None:
|
||
"""
|
||
Replace spaces with hyphens and call summary directly.
|
||
Wikipedia redirects mean "4-aco-dmt" -> "4-AcO-DMT" transparently.
|
||
Returns the full summary dict if successful.
|
||
"""
|
||
if ' ' not in query:
|
||
return None
|
||
hyphenated = query.replace(' ', '-')
|
||
data = await _fetch_summary(session, hyphenated, lang)
|
||
if data and data.get('type') == 'standard' and data.get('extract'):
|
||
logging.info(f'Direct hyphenated hit: {hyphenated!r} -> {data.get("title")!r}')
|
||
return data
|
||
return None
|
||
|
||
# ----------------------------------------------------------------------
|
||
# Main fetch (orchestration)
|
||
# ----------------------------------------------------------------------
|
||
async def fetch_wikipedia(query: str, lang: str = 'en') -> tuple:
|
||
"""
|
||
Returns (title, extract, article_url, image_path | None, error_msg | None)
|
||
"""
|
||
async with aiohttp.ClientSession(headers=HEADERS) as session:
|
||
|
||
summary_data = None
|
||
title = None
|
||
|
||
# Strategy 1: OpenSearch (fastest, most accurate)
|
||
title = await _strategy_opensearch(session, query, lang)
|
||
if title:
|
||
summary_data = await _fetch_summary(session, title, lang)
|
||
|
||
# Strategy 2: Full‑text search
|
||
if not summary_data:
|
||
title = await _strategy_action_search(session, query, lang)
|
||
if title:
|
||
summary_data = await _fetch_summary(session, title, lang)
|
||
|
||
# Strategy 3: Hyphenated fallback (for compounds like "4-aco-dmt")
|
||
if not summary_data:
|
||
summary_data = await _strategy_direct_hyphenated(session, query, lang)
|
||
|
||
if not summary_data:
|
||
logging.error(f'All strategies exhausted for {query!r}')
|
||
return None, None, None, None, f'No Wikipedia article found for "{query}".'
|
||
|
||
page_type = summary_data.get('type', 'standard')
|
||
|
||
if page_type == 'disambiguation':
|
||
url = summary_data.get('content_urls', {}).get('desktop', {}).get('page', '')
|
||
return (
|
||
None, None, None, None,
|
||
f'"{query}" is a disambiguation page – try a more specific term.'
|
||
+ (f' See: {url}' if url else '')
|
||
)
|
||
|
||
if page_type == 'no-extract' or not summary_data.get('extract'):
|
||
return None, None, None, None, f'No summary available for "{query}".'
|
||
|
||
title = summary_data.get('title', query)
|
||
extract = summary_data.get('extract', '').strip()
|
||
url = summary_data.get('content_urls', {}).get('desktop', {}).get('page', '')
|
||
|
||
# Trim long extracts (Matrix messages have ~4KB limit, but keep reasonable)
|
||
if len(extract) > 1200:
|
||
extract = extract[:1200].rsplit(' ', 1)[0] + '…'
|
||
|
||
# Image: prefer originalimage, fallback to thumbnail
|
||
image_url = (
|
||
summary_data.get('originalimage', {}).get('source')
|
||
or summary_data.get('thumbnail', {}).get('source')
|
||
)
|
||
image_path = await _download_image(session, image_url) if image_url else None
|
||
|
||
return title, extract, url, image_path, None
|
||
|
||
# ----------------------------------------------------------------------
|
||
# Image download helper
|
||
# ----------------------------------------------------------------------
|
||
async def _download_image(session: aiohttp.ClientSession, image_url: str) -> str | None:
|
||
if not image_url:
|
||
return None
|
||
if image_url.startswith('//'):
|
||
image_url = 'https:' + image_url
|
||
try:
|
||
async with session.get(image_url, timeout=aiohttp.ClientTimeout(total=15)) as resp:
|
||
if resp.status != 200:
|
||
return None
|
||
ct = resp.headers.get('Content-Type', '')
|
||
suffix = '.png' if 'png' in ct else '.gif' if 'gif' in ct else '.jpg'
|
||
data = await resp.read()
|
||
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp:
|
||
tmp.write(data)
|
||
return tmp.name
|
||
except Exception as e:
|
||
logging.warning(f'Image download failed: {e}')
|
||
return None
|
||
|
||
# ----------------------------------------------------------------------
|
||
# Formatting (HTML for Matrix)
|
||
# ----------------------------------------------------------------------
|
||
def format_response(title: str, extract: str, url: str) -> str:
|
||
body = extract.replace('\n', '<br>')
|
||
return (
|
||
f'<details>'
|
||
f'<summary><strong>📖 Wikipedia: {title}</strong></summary>'
|
||
f'<p>{body}</p>'
|
||
f'<p><a href="{url}">🔗 Read full article</a></p>'
|
||
f'</details>'
|
||
)
|
||
|
||
# ----------------------------------------------------------------------
|
||
# Command handler (called by FunguyBot)
|
||
# ----------------------------------------------------------------------
|
||
async def handle_command(room, message, bot, prefix, config):
|
||
match = botlib.MessageMatch(room, message, bot, prefix)
|
||
if not (match.is_not_from_this_bot() and match.prefix() and match.command('wp')):
|
||
return
|
||
|
||
args = match.args()
|
||
if not args or args[0].lower() == 'help':
|
||
help_text = """
|
||
<details>
|
||
<summary><strong>📖 Wikipedia Help</strong></summary>
|
||
<p>
|
||
<strong>!wp <search term></strong> – Get the lead section and main image from Wikipedia.<br>
|
||
<strong>!wp help</strong> – Show this help.<br><br>
|
||
<strong>Examples:</strong><br>
|
||
<code>!wp 4 aco dmt</code><br>
|
||
<code>!wp psilocybin</code><br>
|
||
<code>!wp Albert Einstein</code><br>
|
||
<code>!wp Python programming language</code>
|
||
</p>
|
||
</details>
|
||
"""
|
||
await bot.api.send_markdown_message(room.room_id, help_text)
|
||
return
|
||
|
||
query = ' '.join(args).strip()
|
||
await bot.api.send_text_message(room.room_id, f'🔍 Searching Wikipedia for: {query}…')
|
||
|
||
try:
|
||
title, extract, url, image_path, error = await fetch_wikipedia(query)
|
||
except Exception as e:
|
||
logging.exception('Unexpected error in wikipedia plugin')
|
||
await bot.api.send_text_message(room.room_id, f'❌ Unexpected error: {e}')
|
||
return
|
||
|
||
if error:
|
||
await bot.api.send_text_message(room.room_id, f'❌ {error}')
|
||
return
|
||
|
||
await bot.api.send_markdown_message(room.room_id, format_response(title, extract, url))
|
||
|
||
if image_path and os.path.exists(image_path):
|
||
try:
|
||
await bot.api.send_image_message(room_id=room.room_id, image_filepath=image_path)
|
||
except Exception as e:
|
||
logging.warning(f'Image send failed: {e}')
|
||
finally:
|
||
try:
|
||
os.unlink(image_path)
|
||
except OSError:
|
||
pass
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Plugin Metadata
|
||
# ---------------------------------------------------------------------------
|
||
|
||
__version__ = "1.0.0"
|
||
__author__ = "Funguy Bot"
|
||
__description__ = "Wikipedia article summary"
|
||
__help__ = """
|
||
<details>
|
||
<summary><strong>!wp</strong> – Wikipedia summary</summary>
|
||
<p><code>!wp <search term></code> – Returns the lead section and main image from Wikipedia.<br>
|
||
Uses MediaWiki APIs, no scraping.</p>
|
||
</details>
|
||
"""
|