diff --git a/cron_jobs.db b/cron_jobs.db deleted file mode 100644 index 7f9a8cd..0000000 Binary files a/cron_jobs.db and /dev/null differ diff --git a/plugins/dns.py b/plugins/dns.py index 5bf3c79..2d45369 100644 --- a/plugins/dns.py +++ b/plugins/dns.py @@ -8,214 +8,118 @@ import dns.reversename import simplematrixbotlib as botlib import re -# Common DNS record types to query +from plugins.utils import is_public_destination + RECORD_TYPES = ['A', 'AAAA', 'MX', 'NS', 'TXT', 'CNAME', 'SOA', 'PTR', 'SRV'] - def is_valid_domain(domain): - """ - Validate if the provided string is a valid domain name. - - Args: - domain (str): The domain to validate. - - Returns: - bool: True if valid, False otherwise. - """ - # Basic domain validation regex pattern = r'^(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$' return re.match(pattern, domain) is not None - def format_dns_record(record_type, records): - """ - Format DNS records for display. - - Args: - record_type (str): The type of DNS record. - records (list): List of DNS record values. - - Returns: - str: Formatted HTML string. - """ if not records: return "" - output = f"{record_type} Records:
" for record in records: output += f" โ€ข {record}
" return output - async def query_dns_records(domain): - """ - Query all common DNS record types for a domain. - - Args: - domain (str): The domain to query. - - Returns: - dict: Dictionary with record types as keys and lists of records as values. - """ results = {} resolver = dns.resolver.Resolver() resolver.timeout = 5 resolver.lifetime = 5 - for record_type in RECORD_TYPES: try: logging.info(f"Querying {record_type} records for {domain}") answers = resolver.resolve(domain, record_type) - records = [] for rdata in answers: if record_type == 'MX': - # MX records have preference and exchange records.append(f"{rdata.preference} {rdata.exchange}") elif record_type == 'SOA': - # SOA records have multiple fields records.append(f"{rdata.mname} {rdata.rname}") elif record_type == 'SRV': - # SRV records have priority, weight, port, and target records.append(f"{rdata.priority} {rdata.weight} {rdata.port} {rdata.target}") elif record_type == 'TXT': - # TXT records can have multiple strings txt_data = ' '.join([s.decode() if isinstance(s, bytes) else str(s) for s in rdata.strings]) records.append(txt_data) else: records.append(str(rdata)) - if records: results[record_type] = records logging.info(f"Found {len(records)} {record_type} record(s)") - except dns.resolver.NoAnswer: - logging.debug(f"No {record_type} records found for {domain}") continue except dns.resolver.NXDOMAIN: logging.warning(f"Domain {domain} does not exist") return None except dns.resolver.Timeout: - logging.warning(f"Timeout querying {record_type} for {domain}") continue except Exception as e: logging.error(f"Error querying {record_type} for {domain}: {e}") continue - return results - async def handle_command(room, message, bot, prefix, config): - """ - Function to handle the !dns command. - - Args: - room (Room): The Matrix room where the command was invoked. - message (RoomMessage): The message object containing the command. - bot (Bot): The bot object. - prefix (str): The command prefix. - config (dict): Configuration parameters. - - Returns: - None - """ match = botlib.MessageMatch(room, message, bot, prefix) if match.is_not_from_this_bot() and match.prefix() and match.command("dns"): logging.info("Received !dns command") - args = match.args() - if len(args) != 1: - await bot.api.send_text_message( - room.room_id, - "Usage: !dns \nExample: !dns example.com" - ) - logging.info("Sent usage message for !dns") + await bot.api.send_text_message(room.room_id, + "Usage: !dns \nExample: !dns example.com") return - domain = args[0].lower().strip() - - # Remove protocol if present - domain = domain.replace('http://', '').replace('https://', '') - # Remove trailing slash if present - domain = domain.rstrip('/') - # Remove www. prefix if present (optional - you can keep it if you want) - # domain = domain.replace('www.', '') - - # Validate domain + domain = domain.replace('http://', '').replace('https://', '').rstrip('/') if not is_valid_domain(domain): - await bot.api.send_text_message( - room.room_id, - f"Invalid domain name: {domain}" - ) - logging.warning(f"Invalid domain provided: {domain}") + await bot.api.send_text_message(room.room_id, f"Invalid domain name: {domain}") return - try: - logging.info(f"Starting DNS reconnaissance for {domain}") - - # Send "working on it" message for longer queries - await bot.api.send_text_message( - room.room_id, - f"๐Ÿ” Performing DNS reconnaissance on {domain}..." - ) - - # Query DNS records + await bot.api.send_text_message(room.room_id, + f"๐Ÿ” Performing DNS reconnaissance on {domain}...") results = await query_dns_records(domain) - if results is None: - await bot.api.send_text_message( - room.room_id, - f"Domain {domain} does not exist (NXDOMAIN)" - ) + await bot.api.send_text_message(room.room_id, + f"Domain {domain} does not exist (NXDOMAIN)") return - if not results: - await bot.api.send_text_message( - room.room_id, - f"No DNS records found for {domain}" - ) + await bot.api.send_text_message(room.room_id, + f"No DNS records found for {domain}") + return + # SSRF / privacy check: if all A/AAAA records are private, refuse. + a_records = results.get('A', []) + aaaa_records = results.get('AAAA', []) + all_ips = a_records + aaaa_records + if all_ips and not any(is_public_destination(ip) for ip in all_ips): + await bot.api.send_text_message(room.room_id, + "โŒ This domain resolves exclusively to private/internal IPs.") return - - # Format the output output = f"๐Ÿ” DNS Records for {domain}

" - - # Order the records in a logical way preferred_order = ['A', 'AAAA', 'CNAME', 'MX', 'NS', 'TXT', 'SOA', 'SRV', 'PTR'] - for record_type in preferred_order: if record_type in results: output += format_dns_record(record_type, results[record_type]) output += "
" - - # Add any remaining record types not in preferred order for record_type in results: if record_type not in preferred_order: output += format_dns_record(record_type, results[record_type]) output += "
" - - # Wrap in collapsible details if output is large if output.count('
') > 15: output = f"
๐Ÿ” DNS Records for {domain}{output}
" - await bot.api.send_markdown_message(room.room_id, output) logging.info(f"Sent DNS records for {domain}") - except Exception as e: - await bot.api.send_text_message( - room.room_id, - f"An error occurred while performing DNS lookup: {str(e)}" - ) + await bot.api.send_text_message(room.room_id, + f"An error occurred while performing DNS lookup: {str(e)}") logging.error(f"Error in DNS plugin for {domain}: {e}", exc_info=True) - # --------------------------------------------------------------------------- # Plugin Metadata # --------------------------------------------------------------------------- - -__version__ = "1.0.0" +__version__ = "1.0.1" __author__ = "Funguy Bot" -__description__ = "DNS reconnaissance" +__description__ = "DNS reconnaissance (SSRFโ€‘safe)" __help__ = """
!dns โ€“ DNS reconnaissance diff --git a/plugins/geo.py b/plugins/geo.py index 210eb9b..afae3c5 100644 --- a/plugins/geo.py +++ b/plugins/geo.py @@ -9,70 +9,37 @@ import simplematrixbotlib as botlib import socket import re +from plugins.utils import is_public_destination + async def is_valid_ip(ip): - """ - Check if the provided string is a valid IP address. - - Args: - ip (str): The IP address to validate. - - Returns: - bool: True if valid IP, False otherwise. - """ + """Check if the provided string is a valid IP address.""" try: - # Check for IPv4 socket.inet_pton(socket.AF_INET, ip) return True except socket.error: try: - # Check for IPv6 socket.inet_pton(socket.AF_INET6, ip) return True except socket.error: return False def is_domain(domain): - """ - Check if the provided string is a domain name. - - Args: - domain (str): The string to check. - - Returns: - bool: True if it's a domain, False otherwise. - """ + """Check if the provided string is a domain name.""" domain_pattern = re.compile( r'^([a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$' ) return bool(domain_pattern.match(domain)) async def resolve_domain(domain): - """ - Resolve a domain name to an IP address. - - Args: - domain (str): The domain to resolve. - - Returns: - str: The resolved IP address or None. - """ + """Resolve a domain name to an IP address.""" try: return socket.gethostbyname(domain) except socket.gaierror: return None async def query_ip_api_com(ip): - """ - Query ip-api.com for geolocation information. - - Args: - ip (str): The IP address to geolocate. - - Returns: - dict: Geolocation data or None if error. - """ + """Query ip-api.com for geolocation information.""" url = f"http://ip-api.com/json/{ip}" - try: async with aiohttp.ClientSession() as session: async with session.get(url) as response: @@ -87,17 +54,8 @@ async def query_ip_api_com(ip): return None async def query_ipapi_co(ip): - """ - Query ipapi.co for geolocation information (fallback). - - Args: - ip (str): The IP address to geolocate. - - Returns: - dict: Geolocation data or None if error. - """ + """Query ipapi.co for geolocation information (fallback).""" url = f"https://ipapi.co/{ip}/json/" - try: async with aiohttp.ClientSession() as session: async with session.get(url) as response: @@ -112,45 +70,20 @@ async def query_ipapi_co(ip): return None async def query_geolocation(ip): - """ - Query geolocation information using primary and fallback APIs. - - Args: - ip (str): The IP address to geolocate. - - Returns: - dict: Geolocation data or None if error. - """ - # Try primary API first + """Query geolocation information using primary and fallback APIs.""" data = await query_ip_api_com(ip) - - # If primary API fails, try fallback API if not data or data.get('status') == 'fail': logging.info("Primary API failed, trying fallback API") data = await query_ipapi_co(ip) - return data async def format_geolocation_results(ip, data): - """ - Format geolocation results into a readable message. - - Args: - ip (str): The queried IP address - data (dict): Geolocation data - - Returns: - str: Formatted message - """ + """Format geolocation results into a readable message.""" if not data: return f"๐Ÿ” No geolocation data found for {ip}." - - # Check if data is from ip-api.com or ipapi.co and format accordingly if 'status' in data and data.get('status') == 'fail': return f"๐Ÿ” No geolocation data found for {ip}." - - # Extract relevant information based on API used - if 'country' in data: # ip-api.com format + if 'country' in data: country = data.get('country', 'N/A') country_code = data.get('countryCode', 'N/A') region = data.get('regionName', data.get('region', 'N/A')) @@ -162,7 +95,7 @@ async def format_geolocation_results(ip, data): isp = data.get('isp', 'N/A') org = data.get('org', 'N/A') asn = data.get('as', 'N/A') - else: # ipapi.co format + else: country = data.get('country_name', data.get('country', 'N/A')) country_code = data.get('country_code', data.get('countryCode', 'N/A')) region = data.get('region', 'N/A') @@ -174,8 +107,6 @@ async def format_geolocation_results(ip, data): isp = data.get('org', 'N/A') org = data.get('org', 'N/A') asn = data.get('asn', 'N/A') - - # Create collapsible content content = f"๐Ÿ” IP Geolocation Results for {ip}

" content += f"Country: {country} ({country_code})
" content += f"Region: {region}
" @@ -185,95 +116,64 @@ async def format_geolocation_results(ip, data): content += f"Timezone: {timezone}
" content += f"ISP/Organization: {isp}
" content += f"ASN: {asn}
" - - # Wrap in details tag for Matrix compatibility message = f"
๐Ÿ” Geolocation: {ip}{content}
" - return message async def handle_command(room, message, bot, prefix, config): - """ - Function to handle the !geo command. - - Args: - room (Room): The Matrix room where the command was invoked. - message (RoomMessage): The message object containing the command. - bot (Bot): The bot object. - prefix (str): The command prefix. - config (dict): Configuration parameters. - - Returns: - None - """ + """Handle the !geo command.""" match = botlib.MessageMatch(room, message, bot, prefix) - if match.is_not_from_this_bot() and match.prefix() and match.command("geo"): args = match.args() - if len(args) < 1: await bot.api.send_text_message( room.room_id, "Usage: !geo \nExample: !geo 8.8.8.8\nExample: !geo example.com" ) return - query = args[0].strip() logging.info(f"Received !geo command for: {query}") - try: - # Determine if input is IP or domain ip = query if is_domain(query): - # Resolve domain to IP first await bot.api.send_text_message( room.room_id, f"๐Ÿ” Resolving domain {query} to IP address..." ) ip = await resolve_domain(query) if not ip: - await bot.api.send_text_message( - room.room_id, - f"Failed to resolve domain {query} to IP address." - ) + await bot.api.send_text_message(room.room_id, + f"Failed to resolve domain {query} to IP address.") return - await bot.api.send_text_message( - room.room_id, - f"Domain {query} resolved to IP {ip}" - ) + if not is_public_destination(ip): + await bot.api.send_text_message(room.room_id, + "โŒ That domain resolves to a private/internal IP, geo not allowed.") + return + await bot.api.send_text_message(room.room_id, + f"Domain {query} resolved to IP {ip}") elif not await is_valid_ip(query): - await bot.api.send_text_message( - room.room_id, - f"Invalid IP address or domain format: {query}" - ) + await bot.api.send_text_message(room.room_id, + f"Invalid IP address or domain format: {query}") return - - # Notify user that we're starting the lookup - await bot.api.send_text_message( - room.room_id, - f"๐Ÿ” Looking up geolocation for {ip}..." - ) - - # Query geolocation data with fallback + else: + if not is_public_destination(ip): + await bot.api.send_text_message(room.room_id, + "โŒ Geolocation of private IP addresses is not allowed.") + return + await bot.api.send_text_message(room.room_id, + f"๐Ÿ” Looking up geolocation for {ip}...") geo_data = await query_geolocation(ip) - - # Format and send results result_message = await format_geolocation_results(ip, geo_data) await bot.api.send_markdown_message(room.room_id, result_message) logging.info(f"Successfully sent geolocation results for {ip}") - except Exception as e: - await bot.api.send_text_message( - room.room_id, - f"An error occurred during geolocation lookup for {query}. Please try again later." - ) + await bot.api.send_text_message(room.room_id, + f"An error occurred during geolocation lookup for {query}. Please try again later.") logging.error(f"Error in geo plugin for {query}: {e}", exc_info=True) - # --------------------------------------------------------------------------- # Plugin Metadata # --------------------------------------------------------------------------- - -__version__ = "1.0.0" +__version__ = "1.0.1" __author__ = "Funguy Bot" __description__ = "IP geolocation lookup" __help__ = """ diff --git a/plugins/headers.py b/plugins/headers.py index 7536af0..2307ef0 100644 --- a/plugins/headers.py +++ b/plugins/headers.py @@ -9,6 +9,8 @@ from urllib.parse import urlparse import ssl import socket +from plugins.utils import is_public_destination + async def handle_command(room, message, bot, prefix, config): """ Function to handle !headers command for HTTP security header analysis. @@ -39,6 +41,14 @@ async def handle_command(room, message, bot, prefix, config): if not url.startswith(('http://', 'https://')): url = 'https://' + url + # SSRF protection: refuse internal hosts + parsed = urlparse(url) + host = parsed.hostname + if not is_public_destination(host): + await bot.api.send_text_message(room.room_id, + "โŒ Scanning of private/internal addresses is not allowed.") + return + await analyze_headers(room, bot, url) async def show_usage(room, bot): @@ -391,7 +401,7 @@ async def format_header_analysis(results): # Plugin Metadata # --------------------------------------------------------------------------- -__version__ = "1.0.0" +__version__ = "1.0.1" __author__ = "Funguy Bot" __description__ = "HTTP security header analysis" __help__ = """ diff --git a/plugins/isup.py b/plugins/isup.py index 18df6a3..23412ae 100644 --- a/plugins/isup.py +++ b/plugins/isup.py @@ -2,23 +2,15 @@ This plugin provides a command to check if a website or server is up. """ -# plugins/isup.py - import logging import aiohttp import socket import simplematrixbotlib as botlib +from plugins.utils import is_public_destination + async def check_http(domain): - """ - Check if HTTP service is up for the given domain. - - Args: - domain (str): The target domain. - - Returns: - bool: True if HTTP service is up, False otherwise. - """ + """Check if HTTP service is up for the given domain.""" try: async with aiohttp.ClientSession() as session: async with session.get(f"http://{domain}") as response: @@ -27,15 +19,7 @@ async def check_http(domain): return False async def check_https(domain): - """ - Check if HTTPS service is up for the given domain. - - Args: - domain (str): The target domain. - - Returns: - bool: True if HTTPS service is up, False otherwise. - """ + """Check if HTTPS service is up for the given domain.""" try: async with aiohttp.ClientSession() as session: async with session.get(f"https://{domain}") as response: @@ -44,67 +28,38 @@ async def check_https(domain): return False async def handle_command(room, message, bot, prefix, config): - """ - Function to handle the !isup command. - - Args: - room (Room): The Matrix room where the command was invoked. - message (RoomMessage): The message object containing the command. - bot (Bot): The bot instance. - prefix (str): The bot command prefix. - config (FunguyConfig): The bot configuration instance. - - Returns: - None - """ - # Check if the message matches the command pattern and is not from this bot + """Handle the !isup command.""" match = botlib.MessageMatch(room, message, bot, prefix) if match.is_not_from_this_bot() and match.prefix() and match.command("isup"): - # Log that the !isup command has been received logging.info("Received !isup command") args = match.args() - # Check if the command has exactly one argument if len(args) != 1: - # If the command does not have exactly one argument, send usage message await bot.api.send_markdown_message(room.room_id, "Usage: !isup ") - logging.info("Sent usage message to the room") return - target = args[0] - - # Perform DNS resolution try: ip_address = socket.gethostbyname(target) - # Log successful DNS resolution - logging.info(f"DNS resolution successful for {target}: {ip_address}") - # Send DNS resolution success message - await bot.api.send_markdown_message(room.room_id, f"โœ… DNS resolution successful for **{target}**: **{ip_address}** (A record)") except socket.gaierror: - # Log DNS resolution failure logging.info(f"DNS resolution failed for {target}") - # Send DNS resolution failure message await bot.api.send_markdown_message(room.room_id, f"โŒ DNS resolution failed for **{target}**") return - - # Check HTTP/HTTPS services + if not is_public_destination(ip_address): + await bot.api.send_text_message(room.room_id, + "โŒ Checking internal/private IPs is not allowed.") + return + await bot.api.send_markdown_message(room.room_id, + f"โœ… DNS resolution successful for **{target}**: **{ip_address}** (A record)") if await check_http(target): - # If HTTP service is up, send HTTP service up message await bot.api.send_markdown_message(room.room_id, f"๐Ÿ–ง **{target}** HTTP service is up") - logging.info(f"{target} HTTP service is up") elif await check_https(target): - # If HTTPS service is up, send HTTPS service up message await bot.api.send_markdown_message(room.room_id, f"๐Ÿ–ง **{target}** HTTPS service is up") - logging.info(f"{target} HTTPS service is up") else: - # If both HTTP and HTTPS services are down, send service down message await bot.api.send_markdown_message(room.room_id, f"๐Ÿ˜• **{target}** HTTP/HTTPS services are down") - logging.info(f"{target} HTTP/HTTPS services are down") # --------------------------------------------------------------------------- # Plugin Metadata # --------------------------------------------------------------------------- - -__version__ = "1.0.0" +__version__ = "1.0.1" __author__ = "Funguy Bot" __description__ = "Check if a site is up" __help__ = """ diff --git a/plugins/proxy.py b/plugins/proxy.py index d611813..bd2623b 100644 --- a/plugins/proxy.py +++ b/plugins/proxy.py @@ -2,8 +2,6 @@ This plugin provides a command to get random SOCKS5 proxies. """ -# plugins/proxy.py - import os import logging import random @@ -14,10 +12,11 @@ from datetime import datetime, timedelta import concurrent.futures import simplematrixbotlib as botlib import sqlite3 +import ipaddress + +from plugins.utils import is_public_destination -# Constants SOCKS5_LIST_URL = 'https://raw.githubusercontent.com/TheSpeedX/SOCKS-List/master/socks5.txt' -# SOCKS5_LIST_URL = 'https://raw.githubusercontent.com/proxifly/free-proxy-list/main/proxies/protocols/socks5/data.txt' MAX_TRIES = 64 PROXY_LIST_FILENAME = 'socks5.txt' PROXY_LIST_EXPIRATION = timedelta(hours=8) @@ -25,19 +24,10 @@ MAX_THREADS = 128 PROXIES_DB_FILE = 'proxies.db' MAX_PROXIES_IN_DB = 10 -# Setup verbose logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') def test_proxy(proxy): - """ - Test a SOCKS5 proxy and return the outcome. - - Args: - proxy (str): The SOCKS5 proxy address in the format 'ip:port'. - - Returns: - tuple: (bool: success, str: proxy, int: latency) - """ + """Test a SOCKS5 proxy and return the outcome.""" try: ip, port = proxy.split(':') logging.info(f"Testing SOCKS5 proxy: {ip}:{port}") @@ -53,14 +43,7 @@ def test_proxy(proxy): except Exception as e: return False, proxy, None - async def download_proxy_list(): - """ - Download the SOCKS5 proxy list file if it doesn't already exist or if it's expired. - - Returns: - bool: True if the proxy list is downloaded or up-to-date, False otherwise. - """ try: if not os.path.exists(PROXY_LIST_FILENAME) or \ datetime.now() - datetime.fromtimestamp(os.path.getctime(PROXY_LIST_FILENAME)) > PROXY_LIST_EXPIRATION: @@ -77,15 +60,7 @@ async def download_proxy_list(): logging.error(f"Error downloading proxy list: {e}") return False - def check_db_for_proxy(): - """ - Check the proxies database for a working proxy. - If found, test the proxy and remove it from the database if it doesn't work. - - Returns: - str or None: The working proxy if found, None otherwise. - """ try: with sqlite3.connect(PROXIES_DB_FILE) as conn: cursor = conn.cursor() @@ -115,15 +90,7 @@ def check_db_for_proxy(): logging.error(f"Error checking proxies database: {e}") return None, None - def save_proxy_to_db(proxy, latency): - """ - Save a working proxy to the proxies database. - - Args: - proxy (str): The working proxy to be saved. - latency (int): Latency of the proxy. - """ try: with sqlite3.connect(PROXIES_DB_FILE) as conn: cursor = conn.cursor() @@ -140,44 +107,25 @@ def save_proxy_to_db(proxy, latency): except Exception as e: logging.error(f"Error saving proxy to database: {e}") - async def handle_command(room, message, bot, prefix, config): - """ - Function to handle the !proxy command. - - Args: - room (Room): The Matrix room where the command was invoked. - message (RoomMessage): The message object containing the command. - - Returns: - None - """ match = botlib.MessageMatch(room, message, bot, prefix) if match.is_not_from_this_bot() and match.prefix() and match.command("proxy"): logging.info("Received !proxy command") - - # Check database for a working proxy working_proxy, latency = check_db_for_proxy() if working_proxy: await bot.api.send_markdown_message(room.room_id, f"โœ… Using cached working SOCKS5 Proxy: **{working_proxy}** - Latency: **{latency} ms**") - logging.info(f"Using cached working SOCKS5 proxy {working_proxy}") return - - # Download proxy list if needed else: if not await download_proxy_list(): await bot.api.send_markdown_message(room.room_id, "Error downloading proxy list") - logging.error("Error downloading proxy list") return - try: - # Read proxies from file with open(PROXY_LIST_FILENAME, 'r') as f: socks5_proxies = [line.replace("socks5://", "") for line in f.read().splitlines()] + # Filter out private/internal proxies before testing + socks5_proxies = [p for p in socks5_proxies if is_public_destination(p.split(':')[0])] random.shuffle(socks5_proxies) - - # Test proxies concurrently tested_proxies = 0 with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_THREADS) as executor: futures = [] @@ -188,36 +136,26 @@ async def handle_command(room, message, bot, prefix, config): if success: await bot.api.send_markdown_message(room.room_id, f"โœ… Anonymous SOCKS5 Proxy: **{proxy}** - Latency: **{latency} ms**") - logging.info(f"Sent SOCKS5 proxy {proxy} to the room") - save_proxy_to_db(proxy, latency) # Save working proxy to the database + save_proxy_to_db(proxy, latency) tested_proxies += 1 if tested_proxies >= MAX_PROXIES_IN_DB: - break # Stop testing proxies once MAX_PROXIES_IN_DB are saved to the database - - # Check database for a working proxy after testing + break working_proxy, latency = check_db_for_proxy() if working_proxy: await bot.api.send_markdown_message(room.room_id, f"โœ… Using cached working SOCKS5 Proxy: **{working_proxy}** - Latency: **{latency} ms**") - logging.info(f"Using cached working SOCKS5 proxy {working_proxy}") - else: - # If no working proxy found after testing await bot.api.send_markdown_message(room.room_id, "โŒ No working anonymous SOCKS5 proxy found") - logging.info("No working anonymous SOCKS5 proxy found") - except Exception as e: logging.error(f"Error handling !proxy command: {e}") await bot.api.send_markdown_message(room.room_id, "โŒ Error handling !proxy command") - # --------------------------------------------------------------------------- # Plugin Metadata # --------------------------------------------------------------------------- - -__version__ = "1.0.0" +__version__ = "1.0.1" __author__ = "Funguy Bot" -__description__ = "Working SOCKS5 proxy finder" +__description__ = "Working SOCKS5 proxy finder (SSRFโ€‘safe)" __help__ = """
!proxy โ€“ Random working SOCKS5 proxy diff --git a/plugins/sslscan.py b/plugins/sslscan.py index c0b1bc1..7312d75 100644 --- a/plugins/sslscan.py +++ b/plugins/sslscan.py @@ -11,6 +11,8 @@ import re import simplematrixbotlib as botlib from urllib.parse import urlparse +from plugins.utils import is_public_destination + # SSL/TLS configuration - handle missing protocols in modern Python TLS_VERSIONS = { 'TLSv1.2': ssl.PROTOCOL_TLSv1_2, @@ -94,6 +96,14 @@ async def handle_command(room, message, bot, prefix, config): await bot.api.send_text_message(room.room_id, "Invalid port number") return + # SSRF protection: refuse internal hosts + if not is_public_destination(target): + await bot.api.send_text_message( + room.room_id, + "โŒ Scanning of private/internal addresses is not allowed." + ) + return + await perform_ssl_scan(room, bot, target, port) async def show_usage(room, bot): @@ -334,7 +344,7 @@ async def check_vulnerabilities(scan_results): """Check for common SSL/TLS vulnerabilities.""" vulnerabilities = [] - # Check for weak protocols (these will be False in modern Python, which is good) + # Check for weak protocols if scan_results['protocols'].get('SSLv2', False): vulnerabilities.append({ 'name': 'SSLv2 Support', @@ -400,7 +410,7 @@ async def calculate_security_score(scan_results): """Calculate overall security score.""" score = 100 - # Protocol penalties (in modern Python, SSLv2/SSLv3 will be False, which is good) + # Protocol penalties if scan_results['protocols'].get('SSLv2', False): score -= 30 if scan_results['protocols'].get('SSLv3', False): @@ -598,9 +608,9 @@ def format_cert_date(date_str): # Plugin Metadata # --------------------------------------------------------------------------- -__version__ = "1.0.0" +__version__ = "1.0.1" __author__ = "Funguy Bot" -__description__ = "SSL/TLS security scanner" +__description__ = "SSL/TLS security scanner (SSRFโ€‘safe)" __help__ = """
!sslscan โ€“ SSL/TLS analysis diff --git a/plugins/utils.py b/plugins/utils.py new file mode 100644 index 0000000..efad10e --- /dev/null +++ b/plugins/utils.py @@ -0,0 +1,59 @@ +""" +Security utilities for Funguy Bot plugins. +""" + +import ipaddress +import socket +import logging + +logger = logging.getLogger("security_utils") + +# Networks considered unsafe for outbound connections +PRIVATE_RANGES = [ + ipaddress.ip_network('10.0.0.0/8'), + ipaddress.ip_network('172.16.0.0/12'), + ipaddress.ip_network('192.168.0.0/16'), + ipaddress.ip_network('127.0.0.0/8'), + ipaddress.ip_network('169.254.0.0/16'), # linkโ€‘local + ipaddress.ip_network('0.0.0.0/8'), # "this" network + ipaddress.ip_network('::1/128'), # IPv6 loopback + ipaddress.ip_network('fc00::/7'), # unique local + ipaddress.ip_network('fe80::/10'), # linkโ€‘local + ipaddress.ip_network('::/128'), # unspecified +] + +def is_public_destination(target: str) -> bool: + """ + Returns True if `target` (hostname or IP) does NOT resolve to any + private, loopback, or linkโ€‘local address. + """ + try: + # Try parsing as an IP address first + addr = ipaddress.ip_address(target) + if any(addr in net for net in PRIVATE_RANGES): + return False + return True + except ValueError: + pass + + # Resolve hostname to IPs + try: + addrinfo = socket.getaddrinfo(target, None) + for _, _, _, _, sockaddr in addrinfo: + ip = sockaddr[0] + addr = ipaddress.ip_address(ip) + if any(addr in net for net in PRIVATE_RANGES): + return False + return True + except Exception as e: + logger.warning(f"Cannot resolve {target}: {e}") + return False + + +# --------------------------------------------------------------------------- +# Noโ€‘op command handler โ€“ prevents bot crash because funguy.py calls +# handle_command() on every module in the plugins directory. +# --------------------------------------------------------------------------- +async def handle_command(room, message, bot, prefix, config): + """This module is not a command plugin; ignore all messages.""" + pass diff --git a/roomstats.db b/roomstats.db deleted file mode 100644 index 3fc87a6..0000000 Binary files a/roomstats.db and /dev/null differ