""" This plugin provides Shodan.io integration for security research and reconnaissance. """ import logging import os import requests import simplematrixbotlib as botlib from dotenv import load_dotenv # Load environment variables from .env file plugin_dir = os.path.dirname(os.path.abspath(__file__)) parent_dir = os.path.dirname(plugin_dir) dotenv_path = os.path.join(parent_dir, '.env') load_dotenv(dotenv_path) SHODAN_API_KEY = os.getenv("SHODAN_KEY", "") SHODAN_API_BASE = "https://api.shodan.io" async def handle_command(room, message, bot, prefix, config): """ Function to handle Shodan commands. Args: room (Room): The Matrix room where the command was invoked. message (RoomMessage): The message object containing the command. bot (Bot): The bot object. prefix (str): The command prefix. config (dict): Configuration parameters. Returns: None """ match = botlib.MessageMatch(room, message, bot, prefix) if match.is_not_from_this_bot() and match.prefix() and match.command("shodan"): logging.info("Received !shodan command") # Check if API key is configured if not SHODAN_API_KEY: await bot.api.send_text_message( room.room_id, "Shodan API key not configured. Please set SHODAN_KEY environment variable." ) logging.error("Shodan API key not configured") return args = match.args() if len(args) < 1: await show_usage(room, bot) return subcommand = args[0].lower() if subcommand == "ip": if len(args) < 2: await bot.api.send_text_message(room.room_id, "Usage: !shodan ip ") return ip = args[1] await shodan_ip_lookup(room, bot, ip) elif subcommand == "search": if len(args) < 2: await bot.api.send_text_message(room.room_id, "Usage: !shodan search ") return query = ' '.join(args[1:]) await shodan_search(room, bot, query) elif subcommand == "host": if len(args) < 2: await bot.api.send_text_message(room.room_id, "Usage: !shodan host ") return host = args[1] await shodan_host(room, bot, host) elif subcommand == "count": if len(args) < 2: await bot.api.send_text_message(room.room_id, "Usage: !shodan count ") return query = ' '.join(args[1:]) await shodan_count(room, bot, query) else: await show_usage(room, bot) async def show_usage(room, bot): """Display Shodan command usage.""" usage = """ 🔍 Shodan Commands: !shodan ip <ip_address> - Get detailed information about an IP !shodan search <query> - Search Shodan database !shodan host <domain/ip> - Get host information !shodan count <query> - Count results for a search query Search Examples:!shodan search apache!shodan search "port:22"!shodan search "country:US product:nginx"!shodan search "net:192.168.1.0/24" """ await bot.api.send_markdown_message(room.room_id, usage) async def shodan_ip_lookup(room, bot, ip): """Look up information about a specific IP address.""" try: url = f"{SHODAN_API_BASE}/shodan/host/{ip}" params = {"key": SHODAN_API_KEY} logging.info(f"Fetching Shodan IP info for: {ip}") response = requests.get(url, params=params, timeout=15) if response.status_code == 404: await bot.api.send_text_message(room.room_id, f"No information found for IP: {ip}") return elif response.status_code == 401: await bot.api.send_text_message(room.room_id, "Invalid Shodan API key") return elif response.status_code != 200: await bot.api.send_text_message(room.room_id, f"Shodan API error: {response.status_code}") return data = response.json() # Format the response output = f"🔍 Shodan IP Lookup: {ip}

" if data.get('country_name'): output += f"📍 Location: {data.get('city', 'N/A')}, {data.get('country_name', 'N/A')}
" if data.get('org'): output += f"🏢 Organization: {data['org']}
" if data.get('os'): output += f"💻 Operating System: {data['os']}
" if data.get('ports'): output += f"🔌 Open Ports: {', '.join(map(str, data['ports']))}
" output += f"🕒 Last Update: {data.get('last_update', 'N/A')}

" # Show services if data.get('data'): output += "📡 Services:
" for service in data['data'][:5]: # Limit to first 5 services port = service.get('port', 'N/A') product = service.get('product', 'Unknown') version = service.get('version', '') banner = service.get('data', '')[:100] + "..." if len(service.get('data', '')) > 100 else service.get('data', '') output += f" • Port {port}: {product} {version}
" if banner: output += f" {banner}
" if len(data['data']) > 5: output += f" • ... and {len(data['data']) - 5} more services
" # Wrap in collapsible if output is large if len(output) > 500: output = f"
🔍 Shodan IP Lookup: {ip}{output}
" await bot.api.send_markdown_message(room.room_id, output) logging.info(f"Sent Shodan IP info for {ip}") except requests.exceptions.Timeout: await bot.api.send_text_message(room.room_id, "Shodan API request timed out") logging.error("Shodan API timeout") except Exception as e: await bot.api.send_text_message(room.room_id, f"Error fetching Shodan data: {str(e)}") logging.error(f"Error in shodan_ip_lookup: {e}") async def shodan_search(room, bot, query): """Search the Shodan database.""" try: url = f"{SHODAN_API_BASE}/shodan/host/search" params = { "key": SHODAN_API_KEY, "query": query, "minify": True, "limit": 5 # Limit results to avoid huge responses } logging.info(f"Searching Shodan for: {query}") response = requests.get(url, params=params, timeout=15) if response.status_code != 200: await handle_shodan_error(room, bot, response.status_code) return data = response.json() if not data.get('matches'): await bot.api.send_text_message(room.room_id, f"No results found for: {query}") return output = f"🔍 Shodan Search: '{query}'
" output += f"Total Results: {data.get('total', 0):,}

" for match in data['matches'][:5]: # Show first 5 results ip = match.get('ip_str', 'N/A') port = match.get('port', 'N/A') org = match.get('org', 'Unknown') product = match.get('product', 'Unknown') output += f"🌐 {ip}:{port}
" output += f" • Organization: {org}
" output += f" • Service: {product}
" if match.get('location'): loc = match['location'] if loc.get('city') and loc.get('country_name'): output += f" • Location: {loc['city']}, {loc['country_name']}
" output += "
" if data.get('total', 0) > 5: output += f"Showing 5 of {data['total']:,} results. Refine your search for more specific results." await bot.api.send_markdown_message(room.room_id, output) logging.info(f"Sent Shodan search results for: {query}") except requests.exceptions.Timeout: await bot.api.send_text_message(room.room_id, "Shodan API request timed out") logging.error("Shodan API timeout") except Exception as e: await bot.api.send_text_message(room.room_id, f"Error searching Shodan: {str(e)}") logging.error(f"Error in shodan_search: {e}") async def shodan_host(room, bot, host): """Get host information (domain or IP).""" try: url = f"{SHODAN_API_BASE}/dns/domain/{host}" params = {"key": SHODAN_API_KEY} logging.info(f"Fetching Shodan host info for: {host}") response = requests.get(url, params=params, timeout=15) if response.status_code == 404: # Try IP lookup instead await shodan_ip_lookup(room, bot, host) return elif response.status_code != 200: await handle_shodan_error(room, bot, response.status_code) return data = response.json() output = f"🔍 Shodan Host: {host}

" if data.get('subdomains'): output += f"🌐 Subdomains ({len(data['subdomains'])}):
" for subdomain in sorted(data['subdomains'])[:10]: # Show first 10 output += f" • {subdomain}.{host}
" if len(data['subdomains']) > 10: output += f" • ... and {len(data['subdomains']) - 10} more
" if data.get('tags'): output += f"
🏷️ Tags: {', '.join(data['tags'])}
" if data.get('data'): output += f"
📊 Records Found: {len(data['data'])}
" await bot.api.send_markdown_message(room.room_id, output) logging.info(f"Sent Shodan host info for: {host}") except requests.exceptions.Timeout: await bot.api.send_text_message(room.room_id, "Shodan API request timed out") logging.error("Shodan API timeout") except Exception as e: await bot.api.send_text_message(room.room_id, f"Error fetching host info: {str(e)}") logging.error(f"Error in shodan_host: {e}") async def shodan_count(room, bot, query): """Count results for a search query.""" try: url = f"{SHODAN_API_BASE}/shodan/host/count" params = { "key": SHODAN_API_KEY, "query": query } logging.info(f"Counting Shodan results for: {query}") response = requests.get(url, params=params, timeout=15) if response.status_code != 200: await handle_shodan_error(room, bot, response.status_code) return data = response.json() output = f"🔍 Shodan Count: '{query}'

" output += f"Total Results: {data.get('total', 0):,}
" # Show top countries if available if data.get('facets') and 'country' in data['facets']: output += "
🌍 Top Countries:
" for country in data['facets']['country'][:5]: output += f" • {country['value']}: {country['count']:,}
" # Show top organizations if available if data.get('facets') and 'org' in data['facets']: output += "
🏢 Top Organizations:
" for org in data['facets']['org'][:5]: output += f" • {org['value']}: {org['count']:,}
" await bot.api.send_markdown_message(room.room_id, output) logging.info(f"Sent Shodan count for: {query}") except requests.exceptions.Timeout: await bot.api.send_text_message(room.room_id, "Shodan API request timed out") logging.error("Shodan API timeout") except Exception as e: await bot.api.send_text_message(room.room_id, f"Error counting Shodan results: {str(e)}") logging.error(f"Error in shodan_count: {e}") async def handle_shodan_error(room, bot, status_code): """Handle Shodan API errors.""" error_messages = { 401: "Invalid Shodan API key", 403: "Access denied - check API key permissions", 404: "No results found", 429: "Rate limit exceeded - try again later", 500: "Shodan API server error", 503: "Shodan API temporarily unavailable" } message = error_messages.get(status_code, f"Shodan API error: {status_code}") await bot.api.send_text_message(room.room_id, message) logging.error(f"Shodan API error: {status_code}")