""" Shodan.io integration for security research and reconnaissance. Output uses shared code_block for aligned columns. """ import logging import os import aiohttp import simplematrixbotlib as botlib from plugins.common import html_escape, code_block, collapsible_summary SHODAN_API_KEY = os.getenv("SHODAN_KEY", "") SHODAN_API_BASE = "https://api.shodan.io" async def handle_command(room, message, bot, prefix, config): match = botlib.MessageMatch(room, message, bot, prefix) if match.is_not_from_this_bot() and match.prefix() and match.command("shodan"): if not SHODAN_API_KEY: await bot.api.send_text_message(room.room_id, "Shodan API key not configured.") return args = match.args() if len(args) < 1: await show_usage(room, bot) return sub = args[0].lower() if sub == "ip" and len(args) >= 2: await shodan_ip_lookup(room, bot, args[1]) elif sub == "search" and len(args) >= 2: query = " ".join(args[1:]) await shodan_search(room, bot, query) elif sub == "host" and len(args) >= 2: await shodan_host(room, bot, args[1]) elif sub == "count" and len(args) >= 2: query = " ".join(args[1:]) await shodan_count(room, bot, query) else: await show_usage(room, bot) async def show_usage(room, bot): usage = """🔍 Shodan Commands: !shodan ip <ip_address> - Get detailed information about an IP !shodan search <query> - Search Shodan database !shodan host <domain/ip> - Get host information !shodan count <query> - Count results for a search query Search Examples:!shodan search apache!shodan search "port:22"!shodan search "country:US product:nginx"!shodan search "net:192.168.1.0/24" """ await bot.api.send_markdown_message(room.room_id, usage) async def shodan_ip_lookup(room, bot, ip): safe_ip = html_escape(ip) try: url = f"{SHODAN_API_BASE}/shodan/host/{ip}?key={SHODAN_API_KEY}" async with aiohttp.ClientSession() as session: async with session.get(url, timeout=15) as resp: if resp.status == 404: await bot.api.send_text_message(room.room_id, f"No information found for IP: {safe_ip}") return resp.raise_for_status() data = await resp.json() rows = [ ("🌐", "IP", safe_ip), ("📍", "Location", f"{data.get('city','N/A')}, {data.get('country_name','N/A')}"), ("🏢", "Organization", data.get('org', 'N/A')), ("💻", "OS", data.get('os', 'N/A')), ("🔌", "Open Ports", ', '.join(map(str, data.get('ports', []))) or 'None'), ] if data.get('data'): for svc in data['data'][:5]: rows.append(("📡", f"Port {svc.get('port')}", svc.get('product','Unknown'))) sections = [{"title": f"Shodan IP Lookup: {safe_ip}", "rows": rows}] block = code_block(f"🔍 Shodan IP Lookup: {safe_ip}", sections) output = collapsible_summary(f"🔍 Shodan: {safe_ip}", block) await bot.api.send_markdown_message(room.room_id, output) except aiohttp.ClientError as e: await bot.api.send_text_message(room.room_id, f"API error: {e}") async def shodan_search(room, bot, query): safe_query = html_escape(query) try: url = f"{SHODAN_API_BASE}/shodan/host/search?key={SHODAN_API_KEY}&query={query}&minify=true&limit=5" async with aiohttp.ClientSession() as session: async with session.get(url, timeout=15) as resp: resp.raise_for_status() data = await resp.json() if not data.get('matches'): await bot.api.send_text_message(room.room_id, f"No results for '{safe_query}'.") return rows = [] for match in data['matches'][:5]: ip = match.get('ip_str', 'N/A') port = match.get('port', '') org = match.get('org', 'Unknown') product = match.get('product', 'Unknown') rows.append(("🌐", f"{ip}:{port}", f"{product} – {org}")) sections = [{"title": f"Search: {safe_query}", "rows": rows}] block = code_block(f"🔍 Shodan Search: {safe_query}", sections) output = collapsible_summary(f"Shodan Search: {safe_query}", block) await bot.api.send_markdown_message(room.room_id, output) except aiohttp.ClientError as e: await bot.api.send_text_message(room.room_id, f"API error: {e}") async def shodan_host(room, bot, host): safe_host = html_escape(host) try: url = f"{SHODAN_API_BASE}/dns/domain/{host}?key={SHODAN_API_KEY}" async with aiohttp.ClientSession() as session: async with session.get(url, timeout=15) as resp: if resp.status == 404: await shodan_ip_lookup(room, bot, host) return resp.raise_for_status() data = await resp.json() rows = [("🌐", "Domain", safe_host)] if data.get('subdomains'): for sub in sorted(data['subdomains'])[:10]: rows.append(("", "Subdomain", f"{sub}.{safe_host}")) if len(data['subdomains']) > 10: rows.append(("", "", f"... and {len(data['subdomains']) - 10} more")) sections = [{"title": f"Host: {safe_host}", "rows": rows}] block = code_block(f"🔍 Shodan Host: {safe_host}", sections) output = collapsible_summary(f"Shodan Host: {safe_host}", block) await bot.api.send_markdown_message(room.room_id, output) except aiohttp.ClientError as e: await bot.api.send_text_message(room.room_id, f"API error: {e}") async def shodan_count(room, bot, query): safe_query = html_escape(query) try: url = f"{SHODAN_API_BASE}/shodan/host/count?key={SHODAN_API_KEY}&query={query}" async with aiohttp.ClientSession() as session: async with session.get(url, timeout=15) as resp: resp.raise_for_status() data = await resp.json() rows = [("🔢", "Total Results", f"{data.get('total', 0):,}")] if data.get('facets'): for facet_name, facet_data in data['facets'].items(): for item in facet_data[:5]: rows.append(("", facet_name.capitalize(), f"{item['value']}: {item['count']:,}")) sections = [{"title": f"Count: {safe_query}", "rows": rows}] block = code_block(f"🔍 Shodan Count: {safe_query}", sections) output = collapsible_summary(f"Shodan Count: {safe_query}", block) await bot.api.send_markdown_message(room.room_id, output) except aiohttp.ClientError as e: await bot.api.send_text_message(room.room_id, f"API error: {e}") # --------------------------------------------------------------------------- # Plugin Metadata # --------------------------------------------------------------------------- __version__ = "1.0.2" __author__ = "Funguy Bot" __description__ = "Shodan.io reconnaissance" __help__ = """
!shodan – Shodan search

Requires SHODAN_KEY env var.

"""