""" This plugin provides comprehensive SSL/TLS security scanning and analysis. """ import logging import socket import ssl import OpenSSL import datetime import re import simplematrixbotlib as botlib from urllib.parse import urlparse # SSL/TLS configuration - handle missing protocols in modern Python TLS_VERSIONS = { 'TLSv1.2': ssl.PROTOCOL_TLSv1_2, 'TLSv1.3': ssl.PROTOCOL_TLS } # Try to add older protocols if available (they're removed in modern Python) try: TLS_VERSIONS['TLSv1.1'] = ssl.PROTOCOL_TLSv1_1 except AttributeError: pass try: TLS_VERSIONS['TLSv1'] = ssl.PROTOCOL_TLSv1 except AttributeError: pass # Cipher suites by strength and category CIPHER_CATEGORIES = { 'STRONG': [ 'TLS_AES_256_GCM_SHA384', 'TLS_CHACHA20_POLY1305_SHA256', 'TLS_AES_128_GCM_SHA256', 'ECDHE-RSA-AES256-GCM-SHA384', 'ECDHE-ECDSA-AES256-GCM-SHA384', 'ECDHE-RSA-CHACHA20-POLY1305', 'ECDHE-ECDSA-CHACHA20-POLY1305', 'DHE-RSA-AES256-GCM-SHA384' ], 'WEAK': [ 'RC4', 'DES', '3DES', 'MD5', 'EXPORT', 'NULL', 'ANON', 'ADH', 'CBC' ], 'OBSOLETE': [ 'SSLv2', 'SSLv3' ] } async def handle_command(room, message, bot, prefix, config): """ Function to handle !sslscan command for comprehensive SSL/TLS analysis. Args: room (Room): The Matrix room where the command was invoked. message (RoomMessage): The message object containing the command. bot (Bot): The bot object. prefix (str): The command prefix. config (dict): Configuration parameters. Returns: None """ match = botlib.MessageMatch(room, message, bot, prefix) if match.is_not_from_this_bot() and match.prefix() and match.command("sslscan"): logging.info("Received !sslscan command") args = match.args() if len(args) < 1: await show_usage(room, bot) return target = args[0].strip() port = 443 # Parse port if provided if ':' in target: parts = target.split(':') target = parts[0] try: port = int(parts[1]) except ValueError: await bot.api.send_text_message(room.room_id, "Invalid port number") return await perform_ssl_scan(room, bot, target, port) async def show_usage(room, bot): """Display sslscan command usage.""" usage = """ 🔐 SSL/TLS Security Scanner !sslscan <domain[:port]> - Comprehensive SSL/TLS security analysis Examples: â€ĸ !sslscan example.com â€ĸ !sslscan github.com:443 â€ĸ !sslscan localhost:8443 Tests Performed: â€ĸ SSL/TLS protocol support and versions â€ĸ Certificate chain validation and expiration â€ĸ Cipher suite strength and configuration â€ĸ Security headers and extensions â€ĸ Handshake simulation and vulnerabilities â€ĸ PCI DSS compliance checking â€ĸ SSL/TLS best practices assessment """ await bot.api.send_markdown_message(room.room_id, usage) async def perform_ssl_scan(room, bot, target, port): """Perform comprehensive SSL/TLS security scan.""" try: await bot.api.send_text_message(room.room_id, f"🔍 Starting comprehensive SSL/TLS scan for {target}:{port}...") scan_results = { 'target': target, 'port': port, 'certificate': {}, 'protocols': {}, 'ciphers': {}, 'vulnerabilities': [], 'recommendations': [], 'security_score': 0 } # Test basic connectivity if not await test_connectivity(target, port): await bot.api.send_text_message(room.room_id, f"❌ Cannot connect to {target}:{port}") return # Perform comprehensive tests await get_certificate_info(scan_results, target, port) await test_protocol_support(scan_results, target, port) await test_cipher_suites(scan_results, target, port) await check_vulnerabilities(scan_results) await calculate_security_score(scan_results) await generate_recommendations(scan_results) # Format and send results output = await format_ssl_scan_results(scan_results) await bot.api.send_markdown_message(room.room_id, output) logging.info(f"Completed SSL scan for {target}:{port}") except Exception as e: await bot.api.send_text_message(room.room_id, f"Error performing SSL scan: {str(e)}") logging.error(f"Error in perform_ssl_scan: {e}") async def test_connectivity(target, port): """Test basic connectivity to the target.""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(10) result = sock.connect_ex((target, port)) sock.close() return result == 0 except: return False async def get_certificate_info(scan_results, target, port): """Get comprehensive certificate information.""" try: context = ssl.create_default_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE with socket.create_connection((target, port), timeout=10) as sock: with context.wrap_socket(sock, server_hostname=target) as ssock: cert_bin = ssock.getpeercert(binary_form=True) cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert_bin) # Basic certificate info subject = cert.get_subject() issuer = cert.get_issuer() scan_results['certificate'] = { 'subject': { 'common_name': subject.CN, 'organization': subject.O, 'organizational_unit': subject.OU, 'country': subject.C, 'state': subject.ST, 'locality': subject.L }, 'issuer': { 'common_name': issuer.CN, 'organization': issuer.O, 'organizational_unit': issuer.OU }, 'serial_number': cert.get_serial_number(), 'version': cert.get_version(), 'not_before': cert.get_notBefore().decode('utf-8'), 'not_after': cert.get_notAfter().decode('utf-8'), 'signature_algorithm': cert.get_signature_algorithm().decode('utf-8'), 'extensions': [] } # Parse extensions for i in range(cert.get_extension_count()): ext = cert.get_extension(i) scan_results['certificate']['extensions'].append({ 'name': ext.get_short_name().decode('utf-8'), 'value': str(ext) }) # Calculate days until expiration not_after = datetime.datetime.strptime(scan_results['certificate']['not_after'], '%Y%m%d%H%M%SZ') days_until_expiry = (not_after - datetime.datetime.utcnow()).days scan_results['certificate']['days_until_expiry'] = days_until_expiry except Exception as e: scan_results['certificate_error'] = str(e) async def test_protocol_support(scan_results, target, port): """Test support for various SSL/TLS protocols.""" protocols = { 'SSLv2': False, 'SSLv3': False, 'TLSv1': False, 'TLSv1.1': False, 'TLSv1.2': False, 'TLSv1.3': False } # Test available protocols for protocol_name in protocols.keys(): try: if protocol_name in TLS_VERSIONS: context = ssl.SSLContext(TLS_VERSIONS[protocol_name]) else: # For protocols not available in this Python version, assume False protocols[protocol_name] = False continue context.check_hostname = False context.verify_mode = ssl.CERT_NONE with socket.create_connection((target, port), timeout=5) as sock: with context.wrap_socket(sock, server_hostname=target) as ssock: protocols[protocol_name] = True # Get negotiated protocol if hasattr(ssock, 'version'): scan_results['negotiated_protocol'] = ssock.version() except: protocols[protocol_name] = False scan_results['protocols'] = protocols async def test_cipher_suites(scan_results, target, port): """Test supported cipher suites.""" try: context = ssl.create_default_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE # Get default cipher suites context.set_ciphers('ALL:COMPLEMENTOFALL') with socket.create_connection((target, port), timeout=10) as sock: with context.wrap_socket(sock, server_hostname=target) as ssock: cipher = ssock.cipher() scan_results['ciphers'] = { 'negotiated_cipher': cipher[0] if cipher else 'Unknown', 'supported_ciphers': await get_supported_ciphers(target, port), 'weak_ciphers': [], 'strong_ciphers': [] } except Exception as e: scan_results['cipher_error'] = str(e) async def get_supported_ciphers(target, port): """Get list of supported cipher suites.""" supported_ciphers = [] # Test common cipher suites test_ciphers = [ 'ECDHE-RSA-AES256-GCM-SHA384', 'ECDHE-ECDSA-AES256-GCM-SHA384', 'ECDHE-RSA-AES256-SHA384', 'ECDHE-ECDSA-AES256-SHA384', 'ECDHE-RSA-AES256-SHA', 'ECDHE-ECDSA-AES256-SHA', 'AES256-GCM-SHA384', 'AES256-SHA256', 'AES256-SHA', 'CAMELLIA256-SHA', 'PSK-AES256-CBC-SHA', 'ECDHE-RSA-AES128-GCM-SHA256', 'ECDHE-ECDSA-AES128-GCM-SHA256', 'ECDHE-RSA-AES128-SHA256', 'ECDHE-ECDSA-AES128-SHA256', 'ECDHE-RSA-AES128-SHA', 'ECDHE-ECDSA-AES128-SHA', 'AES128-GCM-SHA256', 'AES128-SHA256', 'AES128-SHA', 'CAMELLIA128-SHA', 'PSK-AES128-CBC-SHA', 'DES-CBC3-SHA', 'RC4-SHA', 'RC4-MD5' ] for cipher in test_ciphers: try: context = ssl.create_default_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE context.set_ciphers(cipher) with socket.create_connection((target, port), timeout=5) as sock: with context.wrap_socket(sock, server_hostname=target) as ssock: if ssock.cipher(): supported_ciphers.append(cipher) except: pass return supported_ciphers async def check_vulnerabilities(scan_results): """Check for common SSL/TLS vulnerabilities.""" vulnerabilities = [] # Check for weak protocols (these will be False in modern Python, which is good) if scan_results['protocols'].get('SSLv2', False): vulnerabilities.append({ 'name': 'SSLv2 Support', 'severity': 'CRITICAL', 'description': 'SSLv2 is obsolete and contains critical vulnerabilities', 'cve': 'Multiple CVEs' }) if scan_results['protocols'].get('SSLv3', False): vulnerabilities.append({ 'name': 'SSLv3 Support', 'severity': 'HIGH', 'description': 'SSLv3 is vulnerable to POODLE attack', 'cve': 'CVE-2014-3566' }) # Check certificate expiration cert = scan_results.get('certificate', {}) if cert.get('days_until_expiry', 0) < 30: vulnerabilities.append({ 'name': 'Certificate Expiring Soon', 'severity': 'MEDIUM', 'description': f"Certificate expires in {cert['days_until_expiry']} days", 'cve': 'N/A' }) # Check for weak ciphers supported_ciphers = scan_results.get('ciphers', {}).get('supported_ciphers', []) weak_ciphers_found = [] for cipher in supported_ciphers: if any(weak in cipher.upper() for weak in CIPHER_CATEGORIES['WEAK']): weak_ciphers_found.append(cipher) if weak_ciphers_found: vulnerabilities.append({ 'name': 'Weak Cipher Suites', 'severity': 'HIGH', 'description': f'Weak ciphers supported: {", ".join(weak_ciphers_found[:3])}', 'cve': 'Multiple CVEs' }) # Check for missing modern protocols if not scan_results['protocols'].get('TLSv1.2', False): vulnerabilities.append({ 'name': 'TLS 1.2 Not Supported', 'severity': 'HIGH', 'description': 'TLS 1.2 is required for modern security', 'cve': 'N/A' }) if not scan_results['protocols'].get('TLSv1.3', False): vulnerabilities.append({ 'name': 'TLS 1.3 Not Supported', 'severity': 'MEDIUM', 'description': 'TLS 1.3 provides improved security and performance', 'cve': 'N/A' }) scan_results['vulnerabilities'] = vulnerabilities async def calculate_security_score(scan_results): """Calculate overall security score.""" score = 100 # Protocol penalties (in modern Python, SSLv2/SSLv3 will be False, which is good) if scan_results['protocols'].get('SSLv2', False): score -= 30 if scan_results['protocols'].get('SSLv3', False): score -= 20 if not scan_results['protocols'].get('TLSv1.2', False): score -= 15 if not scan_results['protocols'].get('TLSv1.3', False): score -= 10 # Certificate penalties cert = scan_results.get('certificate', {}) if cert.get('days_until_expiry', 0) < 30: score -= 10 if cert.get('days_until_expiry', 0) < 7: score -= 20 # Cipher penalties supported_ciphers = scan_results.get('ciphers', {}).get('supported_ciphers', []) weak_cipher_count = sum(1 for cipher in supported_ciphers if any(weak in cipher.upper() for weak in CIPHER_CATEGORIES['WEAK'])) score -= min(weak_cipher_count * 5, 25) # Vulnerability penalties for vuln in scan_results.get('vulnerabilities', []): if vuln['severity'] == 'CRITICAL': score -= 20 elif vuln['severity'] == 'HIGH': score -= 15 elif vuln['severity'] == 'MEDIUM': score -= 10 elif vuln['severity'] == 'LOW': score -= 5 scan_results['security_score'] = max(0, score) async def generate_recommendations(scan_results): """Generate security recommendations.""" recommendations = [] # Protocol recommendations if scan_results['protocols'].get('SSLv2', False): recommendations.append("🔴 IMMEDIATELY disable SSLv2 - critically vulnerable") if scan_results['protocols'].get('SSLv3', False): recommendations.append("🔴 Disable SSLv3 - vulnerable to POODLE attack") if not scan_results['protocols'].get('TLSv1.3', False): recommendations.append("🟡 Enable TLSv1.3 for best security and performance") # Certificate recommendations cert = scan_results.get('certificate', {}) if cert.get('days_until_expiry', 0) < 30: recommendations.append("🟡 Renew SSL certificate - expiring soon") # Cipher recommendations supported_ciphers = scan_results.get('ciphers', {}).get('supported_ciphers', []) weak_ciphers = [c for c in supported_ciphers if any(weak in c.upper() for weak in CIPHER_CATEGORIES['WEAK'])] if weak_ciphers: recommendations.append("🔴 Remove weak cipher suites (RC4, DES, 3DES, NULL)") # General recommendations if scan_results['security_score'] < 80: recommendations.append("đŸ›Ąī¸ Implement modern TLS configuration following Mozilla guidelines") if not any('ECDHE' in c for c in supported_ciphers): recommendations.append("🟡 Enable Forward Secrecy with ECDHE cipher suites") # Add note about Python version limitations recommendations.append("â„šī¸ Note: SSLv2/SSLv3 testing limited by Python security features") scan_results['recommendations'] = recommendations async def format_ssl_scan_results(scan_results): """Format comprehensive SSL scan results.""" output = f"🔐 SSL/TLS Security Scan: {scan_results['target']}:{scan_results['port']}

" # Security Score score = scan_results['security_score'] if score >= 90: score_emoji, rating = "đŸŸĸ", "Excellent" elif score >= 80: score_emoji, rating = "🟡", "Good" elif score >= 60: score_emoji, rating = "🟠", "Fair" else: score_emoji, rating = "🔴", "Poor" output += f"{score_emoji} Security Score: {score}/100 ({rating})

" # Certificate Information cert = scan_results.get('certificate', {}) if cert: output += "📜 Certificate Information
" output += f" â€ĸ Subject: {cert.get('subject', {}).get('common_name', 'N/A')}
" output += f" â€ĸ Issuer: {cert.get('issuer', {}).get('common_name', 'N/A')}
" output += f" â€ĸ Valid From: {format_cert_date(cert.get('not_before', ''))}
" output += f" â€ĸ Valid Until: {format_cert_date(cert.get('not_after', ''))}
" output += f" â€ĸ Expires In: {cert.get('days_until_expiry', 'N/A')} days
" output += f" â€ĸ Signature Algorithm: {cert.get('signature_algorithm', 'N/A')}
" output += "
" # Protocol Support output += "🔌 Protocol Support
" protocols = scan_results.get('protocols', {}) for proto, supported in protocols.items(): # Handle protocols that can't be tested in this Python version if proto in ['SSLv2', 'SSLv3'] and proto not in TLS_VERSIONS: emoji = "âšĢ" status = "Cannot test (Python security)" else: emoji = "✅" if supported else "❌" # Highlight insecure protocols if proto in ['SSLv2', 'SSLv3'] and supported: emoji = "🔴" elif proto in ['TLSv1.3'] and supported: emoji = "✅" output += f" â€ĸ {emoji} {proto}: {status if 'status' in locals() else 'Supported' if supported else 'Not Supported'}
" output += "
" # Cipher Information ciphers = scan_results.get('ciphers', {}) if ciphers.get('supported_ciphers'): output += "🔐 Cipher Suites
" output += f" â€ĸ Negotiated: {ciphers.get('negotiated_cipher', 'Unknown')}
" output += f" â€ĸ Total Supported: {len(ciphers['supported_ciphers'])}
" # Show weak ciphers if any weak_ciphers = [c for c in ciphers['supported_ciphers'] if any(weak in c.upper() for weak in CIPHER_CATEGORIES['WEAK'])] if weak_ciphers: output += f" â€ĸ Weak Ciphers: {len(weak_ciphers)} found
" for cipher in weak_ciphers[:3]: output += f" └─ 🔴 {cipher}
" # Show strong ciphers if any strong_ciphers = [c for c in ciphers['supported_ciphers'] if any(strong in c.upper() for strong in CIPHER_CATEGORIES['STRONG'])] if strong_ciphers: output += f" â€ĸ Strong Ciphers: {len(strong_ciphers)} found
" output += "
" # Vulnerabilities vulnerabilities = scan_results.get('vulnerabilities', []) if vulnerabilities: output += "âš ī¸ Security Vulnerabilities
" for vuln in vulnerabilities[:5]: # Show top 5 severity_emoji = "🔴" if vuln['severity'] == 'CRITICAL' else "🟠" if vuln['severity'] == 'HIGH' else "🟡" output += f" â€ĸ {severity_emoji} {vuln['name']} ({vuln['severity']})
" output += f" └─ {vuln['description']}
" output += "
" # Recommendations recommendations = scan_results.get('recommendations', []) if recommendations: output += "💡 Security Recommendations
" for rec in recommendations[:8]: output += f" â€ĸ {rec}
" output += "
" # Quick Assessment output += "📊 Quick Assessment
" if score >= 90: output += " â€ĸ ✅ Excellent TLS configuration
" output += " â€ĸ ✅ Modern protocols and ciphers
" output += " â€ĸ ✅ Good certificate management
" elif score >= 70: output += " â€ĸ âš ī¸ Good configuration with minor issues
" output += " â€ĸ 🔧 Some improvements recommended
" else: output += " â€ĸ 🚨 Significant security issues found
" output += " â€ĸ 🔴 Immediate action required
" # Add note about testing limitations output += "
â„šī¸ Note: Some protocol tests limited by Python security features" # Always wrap in collapsible due to comprehensive output output = f"
🔐 SSL/TLS Scan: {scan_results['target']}:{scan_results['port']} (Score: {score}/100){output}
" return output def format_cert_date(date_str): """Format certificate date string for display.""" try: if date_str: dt = datetime.datetime.strptime(date_str, '%Y%m%d%H%M%SZ') return dt.strftime('%Y-%m-%d %H:%M:%S UTC') except: pass return date_str