HashID and SSLScan plugins created
This commit is contained in:
		
							
								
								
									
										594
									
								
								plugins/sslscan.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										594
									
								
								plugins/sslscan.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,594 @@
 | 
			
		||||
"""
 | 
			
		||||
This plugin provides comprehensive SSL/TLS security scanning and analysis.
 | 
			
		||||
"""
 | 
			
		||||
 | 
			
		||||
import logging
 | 
			
		||||
import socket
 | 
			
		||||
import ssl
 | 
			
		||||
import OpenSSL
 | 
			
		||||
import datetime
 | 
			
		||||
import re
 | 
			
		||||
import simplematrixbotlib as botlib
 | 
			
		||||
from urllib.parse import urlparse
 | 
			
		||||
 | 
			
		||||
# SSL/TLS configuration - handle missing protocols in modern Python
 | 
			
		||||
TLS_VERSIONS = {
 | 
			
		||||
    'TLSv1.2': ssl.PROTOCOL_TLSv1_2,
 | 
			
		||||
    'TLSv1.3': ssl.PROTOCOL_TLS
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
# Try to add older protocols if available (they're removed in modern Python)
 | 
			
		||||
try:
 | 
			
		||||
    TLS_VERSIONS['TLSv1.1'] = ssl.PROTOCOL_TLSv1_1
 | 
			
		||||
except AttributeError:
 | 
			
		||||
    pass
 | 
			
		||||
 | 
			
		||||
try:
 | 
			
		||||
    TLS_VERSIONS['TLSv1'] = ssl.PROTOCOL_TLSv1
 | 
			
		||||
except AttributeError:
 | 
			
		||||
    pass
 | 
			
		||||
 | 
			
		||||
# Cipher suites by strength and category
 | 
			
		||||
CIPHER_CATEGORIES = {
 | 
			
		||||
    'STRONG': [
 | 
			
		||||
        'TLS_AES_256_GCM_SHA384',
 | 
			
		||||
        'TLS_CHACHA20_POLY1305_SHA256',
 | 
			
		||||
        'TLS_AES_128_GCM_SHA256',
 | 
			
		||||
        'ECDHE-RSA-AES256-GCM-SHA384',
 | 
			
		||||
        'ECDHE-ECDSA-AES256-GCM-SHA384',
 | 
			
		||||
        'ECDHE-RSA-CHACHA20-POLY1305',
 | 
			
		||||
        'ECDHE-ECDSA-CHACHA20-POLY1305',
 | 
			
		||||
        'DHE-RSA-AES256-GCM-SHA384'
 | 
			
		||||
    ],
 | 
			
		||||
    'WEAK': [
 | 
			
		||||
        'RC4',
 | 
			
		||||
        'DES',
 | 
			
		||||
        '3DES',
 | 
			
		||||
        'MD5',
 | 
			
		||||
        'EXPORT',
 | 
			
		||||
        'NULL',
 | 
			
		||||
        'ANON',
 | 
			
		||||
        'ADH',
 | 
			
		||||
        'CBC'
 | 
			
		||||
    ],
 | 
			
		||||
    'OBSOLETE': [
 | 
			
		||||
        'SSLv2',
 | 
			
		||||
        'SSLv3'
 | 
			
		||||
    ]
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async def handle_command(room, message, bot, prefix, config):
 | 
			
		||||
    """
 | 
			
		||||
    Function to handle !sslscan command for comprehensive SSL/TLS analysis.
 | 
			
		||||
 | 
			
		||||
    Args:
 | 
			
		||||
        room (Room): The Matrix room where the command was invoked.
 | 
			
		||||
        message (RoomMessage): The message object containing the command.
 | 
			
		||||
        bot (Bot): The bot object.
 | 
			
		||||
        prefix (str): The command prefix.
 | 
			
		||||
        config (dict): Configuration parameters.
 | 
			
		||||
 | 
			
		||||
    Returns:
 | 
			
		||||
        None
 | 
			
		||||
    """
 | 
			
		||||
    match = botlib.MessageMatch(room, message, bot, prefix)
 | 
			
		||||
    if match.is_not_from_this_bot() and match.prefix() and match.command("sslscan"):
 | 
			
		||||
        logging.info("Received !sslscan command")
 | 
			
		||||
 | 
			
		||||
        args = match.args()
 | 
			
		||||
 | 
			
		||||
        if len(args) < 1:
 | 
			
		||||
            await show_usage(room, bot)
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        target = args[0].strip()
 | 
			
		||||
        port = 443
 | 
			
		||||
 | 
			
		||||
        # Parse port if provided
 | 
			
		||||
        if ':' in target:
 | 
			
		||||
            parts = target.split(':')
 | 
			
		||||
            target = parts[0]
 | 
			
		||||
            try:
 | 
			
		||||
                port = int(parts[1])
 | 
			
		||||
            except ValueError:
 | 
			
		||||
                await bot.api.send_text_message(room.room_id, "Invalid port number")
 | 
			
		||||
                return
 | 
			
		||||
 | 
			
		||||
        await perform_ssl_scan(room, bot, target, port)
 | 
			
		||||
 | 
			
		||||
async def show_usage(room, bot):
 | 
			
		||||
    """Display sslscan command usage."""
 | 
			
		||||
    usage = """
 | 
			
		||||
<strong>🔐 SSL/TLS Security Scanner</strong>
 | 
			
		||||
 | 
			
		||||
<strong>!sslscan <domain[:port]></strong> - Comprehensive SSL/TLS security analysis
 | 
			
		||||
 | 
			
		||||
<strong>Examples:</strong>
 | 
			
		||||
• <code>!sslscan example.com</code>
 | 
			
		||||
• <code>!sslscan github.com:443</code>
 | 
			
		||||
• <code>!sslscan localhost:8443</code>
 | 
			
		||||
 | 
			
		||||
<strong>Tests Performed:</strong>
 | 
			
		||||
• SSL/TLS protocol support and versions
 | 
			
		||||
• Certificate chain validation and expiration
 | 
			
		||||
• Cipher suite strength and configuration
 | 
			
		||||
• Security headers and extensions
 | 
			
		||||
• Handshake simulation and vulnerabilities
 | 
			
		||||
• PCI DSS compliance checking
 | 
			
		||||
• SSL/TLS best practices assessment
 | 
			
		||||
"""
 | 
			
		||||
    await bot.api.send_markdown_message(room.room_id, usage)
 | 
			
		||||
 | 
			
		||||
async def perform_ssl_scan(room, bot, target, port):
 | 
			
		||||
    """Perform comprehensive SSL/TLS security scan."""
 | 
			
		||||
    try:
 | 
			
		||||
        await bot.api.send_text_message(room.room_id, f"🔍 Starting comprehensive SSL/TLS scan for {target}:{port}...")
 | 
			
		||||
 | 
			
		||||
        scan_results = {
 | 
			
		||||
            'target': target,
 | 
			
		||||
            'port': port,
 | 
			
		||||
            'certificate': {},
 | 
			
		||||
            'protocols': {},
 | 
			
		||||
            'ciphers': {},
 | 
			
		||||
            'vulnerabilities': [],
 | 
			
		||||
            'recommendations': [],
 | 
			
		||||
            'security_score': 0
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        # Test basic connectivity
 | 
			
		||||
        if not await test_connectivity(target, port):
 | 
			
		||||
            await bot.api.send_text_message(room.room_id, f"❌ Cannot connect to {target}:{port}")
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        # Perform comprehensive tests
 | 
			
		||||
        await get_certificate_info(scan_results, target, port)
 | 
			
		||||
        await test_protocol_support(scan_results, target, port)
 | 
			
		||||
        await test_cipher_suites(scan_results, target, port)
 | 
			
		||||
        await check_vulnerabilities(scan_results)
 | 
			
		||||
        await calculate_security_score(scan_results)
 | 
			
		||||
        await generate_recommendations(scan_results)
 | 
			
		||||
 | 
			
		||||
        # Format and send results
 | 
			
		||||
        output = await format_ssl_scan_results(scan_results)
 | 
			
		||||
        await bot.api.send_markdown_message(room.room_id, output)
 | 
			
		||||
 | 
			
		||||
        logging.info(f"Completed SSL scan for {target}:{port}")
 | 
			
		||||
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        await bot.api.send_text_message(room.room_id, f"Error performing SSL scan: {str(e)}")
 | 
			
		||||
        logging.error(f"Error in perform_ssl_scan: {e}")
 | 
			
		||||
 | 
			
		||||
async def test_connectivity(target, port):
 | 
			
		||||
    """Test basic connectivity to the target."""
 | 
			
		||||
    try:
 | 
			
		||||
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | 
			
		||||
        sock.settimeout(10)
 | 
			
		||||
        result = sock.connect_ex((target, port))
 | 
			
		||||
        sock.close()
 | 
			
		||||
        return result == 0
 | 
			
		||||
    except:
 | 
			
		||||
        return False
 | 
			
		||||
 | 
			
		||||
async def get_certificate_info(scan_results, target, port):
 | 
			
		||||
    """Get comprehensive certificate information."""
 | 
			
		||||
    try:
 | 
			
		||||
        context = ssl.create_default_context()
 | 
			
		||||
        context.check_hostname = False
 | 
			
		||||
        context.verify_mode = ssl.CERT_NONE
 | 
			
		||||
 | 
			
		||||
        with socket.create_connection((target, port), timeout=10) as sock:
 | 
			
		||||
            with context.wrap_socket(sock, server_hostname=target) as ssock:
 | 
			
		||||
                cert_bin = ssock.getpeercert(binary_form=True)
 | 
			
		||||
                cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert_bin)
 | 
			
		||||
 | 
			
		||||
                # Basic certificate info
 | 
			
		||||
                subject = cert.get_subject()
 | 
			
		||||
                issuer = cert.get_issuer()
 | 
			
		||||
 | 
			
		||||
                scan_results['certificate'] = {
 | 
			
		||||
                    'subject': {
 | 
			
		||||
                        'common_name': subject.CN,
 | 
			
		||||
                        'organization': subject.O,
 | 
			
		||||
                        'organizational_unit': subject.OU,
 | 
			
		||||
                        'country': subject.C,
 | 
			
		||||
                        'state': subject.ST,
 | 
			
		||||
                        'locality': subject.L
 | 
			
		||||
                    },
 | 
			
		||||
                    'issuer': {
 | 
			
		||||
                        'common_name': issuer.CN,
 | 
			
		||||
                        'organization': issuer.O,
 | 
			
		||||
                        'organizational_unit': issuer.OU
 | 
			
		||||
                    },
 | 
			
		||||
                    'serial_number': cert.get_serial_number(),
 | 
			
		||||
                    'version': cert.get_version(),
 | 
			
		||||
                    'not_before': cert.get_notBefore().decode('utf-8'),
 | 
			
		||||
                    'not_after': cert.get_notAfter().decode('utf-8'),
 | 
			
		||||
                    'signature_algorithm': cert.get_signature_algorithm().decode('utf-8'),
 | 
			
		||||
                    'extensions': []
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                # Parse extensions
 | 
			
		||||
                for i in range(cert.get_extension_count()):
 | 
			
		||||
                    ext = cert.get_extension(i)
 | 
			
		||||
                    scan_results['certificate']['extensions'].append({
 | 
			
		||||
                        'name': ext.get_short_name().decode('utf-8'),
 | 
			
		||||
                        'value': str(ext)
 | 
			
		||||
                    })
 | 
			
		||||
 | 
			
		||||
                # Calculate days until expiration
 | 
			
		||||
                not_after = datetime.datetime.strptime(scan_results['certificate']['not_after'], '%Y%m%d%H%M%SZ')
 | 
			
		||||
                days_until_expiry = (not_after - datetime.datetime.utcnow()).days
 | 
			
		||||
                scan_results['certificate']['days_until_expiry'] = days_until_expiry
 | 
			
		||||
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        scan_results['certificate_error'] = str(e)
 | 
			
		||||
 | 
			
		||||
async def test_protocol_support(scan_results, target, port):
 | 
			
		||||
    """Test support for various SSL/TLS protocols."""
 | 
			
		||||
    protocols = {
 | 
			
		||||
        'SSLv2': False,
 | 
			
		||||
        'SSLv3': False,
 | 
			
		||||
        'TLSv1': False,
 | 
			
		||||
        'TLSv1.1': False,
 | 
			
		||||
        'TLSv1.2': False,
 | 
			
		||||
        'TLSv1.3': False
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    # Test available protocols
 | 
			
		||||
    for protocol_name in protocols.keys():
 | 
			
		||||
        try:
 | 
			
		||||
            if protocol_name in TLS_VERSIONS:
 | 
			
		||||
                context = ssl.SSLContext(TLS_VERSIONS[protocol_name])
 | 
			
		||||
            else:
 | 
			
		||||
                # For protocols not available in this Python version, assume False
 | 
			
		||||
                protocols[protocol_name] = False
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
            context.check_hostname = False
 | 
			
		||||
            context.verify_mode = ssl.CERT_NONE
 | 
			
		||||
 | 
			
		||||
            with socket.create_connection((target, port), timeout=5) as sock:
 | 
			
		||||
                with context.wrap_socket(sock, server_hostname=target) as ssock:
 | 
			
		||||
                    protocols[protocol_name] = True
 | 
			
		||||
                    # Get negotiated protocol
 | 
			
		||||
                    if hasattr(ssock, 'version'):
 | 
			
		||||
                        scan_results['negotiated_protocol'] = ssock.version()
 | 
			
		||||
        except:
 | 
			
		||||
            protocols[protocol_name] = False
 | 
			
		||||
 | 
			
		||||
    scan_results['protocols'] = protocols
 | 
			
		||||
 | 
			
		||||
async def test_cipher_suites(scan_results, target, port):
 | 
			
		||||
    """Test supported cipher suites."""
 | 
			
		||||
    try:
 | 
			
		||||
        context = ssl.create_default_context()
 | 
			
		||||
        context.check_hostname = False
 | 
			
		||||
        context.verify_mode = ssl.CERT_NONE
 | 
			
		||||
 | 
			
		||||
        # Get default cipher suites
 | 
			
		||||
        context.set_ciphers('ALL:COMPLEMENTOFALL')
 | 
			
		||||
 | 
			
		||||
        with socket.create_connection((target, port), timeout=10) as sock:
 | 
			
		||||
            with context.wrap_socket(sock, server_hostname=target) as ssock:
 | 
			
		||||
                cipher = ssock.cipher()
 | 
			
		||||
                scan_results['ciphers'] = {
 | 
			
		||||
                    'negotiated_cipher': cipher[0] if cipher else 'Unknown',
 | 
			
		||||
                    'supported_ciphers': await get_supported_ciphers(target, port),
 | 
			
		||||
                    'weak_ciphers': [],
 | 
			
		||||
                    'strong_ciphers': []
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        scan_results['cipher_error'] = str(e)
 | 
			
		||||
 | 
			
		||||
async def get_supported_ciphers(target, port):
 | 
			
		||||
    """Get list of supported cipher suites."""
 | 
			
		||||
    supported_ciphers = []
 | 
			
		||||
 | 
			
		||||
    # Test common cipher suites
 | 
			
		||||
    test_ciphers = [
 | 
			
		||||
        'ECDHE-RSA-AES256-GCM-SHA384',
 | 
			
		||||
        'ECDHE-ECDSA-AES256-GCM-SHA384',
 | 
			
		||||
        'ECDHE-RSA-AES256-SHA384',
 | 
			
		||||
        'ECDHE-ECDSA-AES256-SHA384',
 | 
			
		||||
        'ECDHE-RSA-AES256-SHA',
 | 
			
		||||
        'ECDHE-ECDSA-AES256-SHA',
 | 
			
		||||
        'AES256-GCM-SHA384',
 | 
			
		||||
        'AES256-SHA256',
 | 
			
		||||
        'AES256-SHA',
 | 
			
		||||
        'CAMELLIA256-SHA',
 | 
			
		||||
        'PSK-AES256-CBC-SHA',
 | 
			
		||||
        'ECDHE-RSA-AES128-GCM-SHA256',
 | 
			
		||||
        'ECDHE-ECDSA-AES128-GCM-SHA256',
 | 
			
		||||
        'ECDHE-RSA-AES128-SHA256',
 | 
			
		||||
        'ECDHE-ECDSA-AES128-SHA256',
 | 
			
		||||
        'ECDHE-RSA-AES128-SHA',
 | 
			
		||||
        'ECDHE-ECDSA-AES128-SHA',
 | 
			
		||||
        'AES128-GCM-SHA256',
 | 
			
		||||
        'AES128-SHA256',
 | 
			
		||||
        'AES128-SHA',
 | 
			
		||||
        'CAMELLIA128-SHA',
 | 
			
		||||
        'PSK-AES128-CBC-SHA',
 | 
			
		||||
        'DES-CBC3-SHA',
 | 
			
		||||
        'RC4-SHA',
 | 
			
		||||
        'RC4-MD5'
 | 
			
		||||
    ]
 | 
			
		||||
 | 
			
		||||
    for cipher in test_ciphers:
 | 
			
		||||
        try:
 | 
			
		||||
            context = ssl.create_default_context()
 | 
			
		||||
            context.check_hostname = False
 | 
			
		||||
            context.verify_mode = ssl.CERT_NONE
 | 
			
		||||
            context.set_ciphers(cipher)
 | 
			
		||||
 | 
			
		||||
            with socket.create_connection((target, port), timeout=5) as sock:
 | 
			
		||||
                with context.wrap_socket(sock, server_hostname=target) as ssock:
 | 
			
		||||
                    if ssock.cipher():
 | 
			
		||||
                        supported_ciphers.append(cipher)
 | 
			
		||||
        except:
 | 
			
		||||
            pass
 | 
			
		||||
 | 
			
		||||
    return supported_ciphers
 | 
			
		||||
 | 
			
		||||
async def check_vulnerabilities(scan_results):
 | 
			
		||||
    """Check for common SSL/TLS vulnerabilities."""
 | 
			
		||||
    vulnerabilities = []
 | 
			
		||||
 | 
			
		||||
    # Check for weak protocols (these will be False in modern Python, which is good)
 | 
			
		||||
    if scan_results['protocols'].get('SSLv2', False):
 | 
			
		||||
        vulnerabilities.append({
 | 
			
		||||
            'name': 'SSLv2 Support',
 | 
			
		||||
            'severity': 'CRITICAL',
 | 
			
		||||
            'description': 'SSLv2 is obsolete and contains critical vulnerabilities',
 | 
			
		||||
            'cve': 'Multiple CVEs'
 | 
			
		||||
        })
 | 
			
		||||
 | 
			
		||||
    if scan_results['protocols'].get('SSLv3', False):
 | 
			
		||||
        vulnerabilities.append({
 | 
			
		||||
            'name': 'SSLv3 Support',
 | 
			
		||||
            'severity': 'HIGH',
 | 
			
		||||
            'description': 'SSLv3 is vulnerable to POODLE attack',
 | 
			
		||||
            'cve': 'CVE-2014-3566'
 | 
			
		||||
        })
 | 
			
		||||
 | 
			
		||||
    # Check certificate expiration
 | 
			
		||||
    cert = scan_results.get('certificate', {})
 | 
			
		||||
    if cert.get('days_until_expiry', 0) < 30:
 | 
			
		||||
        vulnerabilities.append({
 | 
			
		||||
            'name': 'Certificate Expiring Soon',
 | 
			
		||||
            'severity': 'MEDIUM',
 | 
			
		||||
            'description': f"Certificate expires in {cert['days_until_expiry']} days",
 | 
			
		||||
            'cve': 'N/A'
 | 
			
		||||
        })
 | 
			
		||||
 | 
			
		||||
    # Check for weak ciphers
 | 
			
		||||
    supported_ciphers = scan_results.get('ciphers', {}).get('supported_ciphers', [])
 | 
			
		||||
    weak_ciphers_found = []
 | 
			
		||||
 | 
			
		||||
    for cipher in supported_ciphers:
 | 
			
		||||
        if any(weak in cipher.upper() for weak in CIPHER_CATEGORIES['WEAK']):
 | 
			
		||||
            weak_ciphers_found.append(cipher)
 | 
			
		||||
 | 
			
		||||
    if weak_ciphers_found:
 | 
			
		||||
        vulnerabilities.append({
 | 
			
		||||
            'name': 'Weak Cipher Suites',
 | 
			
		||||
            'severity': 'HIGH',
 | 
			
		||||
            'description': f'Weak ciphers supported: {", ".join(weak_ciphers_found[:3])}',
 | 
			
		||||
            'cve': 'Multiple CVEs'
 | 
			
		||||
        })
 | 
			
		||||
 | 
			
		||||
    # Check for missing modern protocols
 | 
			
		||||
    if not scan_results['protocols'].get('TLSv1.2', False):
 | 
			
		||||
        vulnerabilities.append({
 | 
			
		||||
            'name': 'TLS 1.2 Not Supported',
 | 
			
		||||
            'severity': 'HIGH',
 | 
			
		||||
            'description': 'TLS 1.2 is required for modern security',
 | 
			
		||||
            'cve': 'N/A'
 | 
			
		||||
        })
 | 
			
		||||
 | 
			
		||||
    if not scan_results['protocols'].get('TLSv1.3', False):
 | 
			
		||||
        vulnerabilities.append({
 | 
			
		||||
            'name': 'TLS 1.3 Not Supported',
 | 
			
		||||
            'severity': 'MEDIUM',
 | 
			
		||||
            'description': 'TLS 1.3 provides improved security and performance',
 | 
			
		||||
            'cve': 'N/A'
 | 
			
		||||
        })
 | 
			
		||||
 | 
			
		||||
    scan_results['vulnerabilities'] = vulnerabilities
 | 
			
		||||
 | 
			
		||||
async def calculate_security_score(scan_results):
 | 
			
		||||
    """Calculate overall security score."""
 | 
			
		||||
    score = 100
 | 
			
		||||
 | 
			
		||||
    # Protocol penalties (in modern Python, SSLv2/SSLv3 will be False, which is good)
 | 
			
		||||
    if scan_results['protocols'].get('SSLv2', False):
 | 
			
		||||
        score -= 30
 | 
			
		||||
    if scan_results['protocols'].get('SSLv3', False):
 | 
			
		||||
        score -= 20
 | 
			
		||||
    if not scan_results['protocols'].get('TLSv1.2', False):
 | 
			
		||||
        score -= 15
 | 
			
		||||
    if not scan_results['protocols'].get('TLSv1.3', False):
 | 
			
		||||
        score -= 10
 | 
			
		||||
 | 
			
		||||
    # Certificate penalties
 | 
			
		||||
    cert = scan_results.get('certificate', {})
 | 
			
		||||
    if cert.get('days_until_expiry', 0) < 30:
 | 
			
		||||
        score -= 10
 | 
			
		||||
    if cert.get('days_until_expiry', 0) < 7:
 | 
			
		||||
        score -= 20
 | 
			
		||||
 | 
			
		||||
    # Cipher penalties
 | 
			
		||||
    supported_ciphers = scan_results.get('ciphers', {}).get('supported_ciphers', [])
 | 
			
		||||
    weak_cipher_count = sum(1 for cipher in supported_ciphers
 | 
			
		||||
                           if any(weak in cipher.upper() for weak in CIPHER_CATEGORIES['WEAK']))
 | 
			
		||||
    score -= min(weak_cipher_count * 5, 25)
 | 
			
		||||
 | 
			
		||||
    # Vulnerability penalties
 | 
			
		||||
    for vuln in scan_results.get('vulnerabilities', []):
 | 
			
		||||
        if vuln['severity'] == 'CRITICAL':
 | 
			
		||||
            score -= 20
 | 
			
		||||
        elif vuln['severity'] == 'HIGH':
 | 
			
		||||
            score -= 15
 | 
			
		||||
        elif vuln['severity'] == 'MEDIUM':
 | 
			
		||||
            score -= 10
 | 
			
		||||
        elif vuln['severity'] == 'LOW':
 | 
			
		||||
            score -= 5
 | 
			
		||||
 | 
			
		||||
    scan_results['security_score'] = max(0, score)
 | 
			
		||||
 | 
			
		||||
async def generate_recommendations(scan_results):
 | 
			
		||||
    """Generate security recommendations."""
 | 
			
		||||
    recommendations = []
 | 
			
		||||
 | 
			
		||||
    # Protocol recommendations
 | 
			
		||||
    if scan_results['protocols'].get('SSLv2', False):
 | 
			
		||||
        recommendations.append("🔴 IMMEDIATELY disable SSLv2 - critically vulnerable")
 | 
			
		||||
    if scan_results['protocols'].get('SSLv3', False):
 | 
			
		||||
        recommendations.append("🔴 Disable SSLv3 - vulnerable to POODLE attack")
 | 
			
		||||
    if not scan_results['protocols'].get('TLSv1.3', False):
 | 
			
		||||
        recommendations.append("🟡 Enable TLSv1.3 for best security and performance")
 | 
			
		||||
 | 
			
		||||
    # Certificate recommendations
 | 
			
		||||
    cert = scan_results.get('certificate', {})
 | 
			
		||||
    if cert.get('days_until_expiry', 0) < 30:
 | 
			
		||||
        recommendations.append("🟡 Renew SSL certificate - expiring soon")
 | 
			
		||||
 | 
			
		||||
    # Cipher recommendations
 | 
			
		||||
    supported_ciphers = scan_results.get('ciphers', {}).get('supported_ciphers', [])
 | 
			
		||||
    weak_ciphers = [c for c in supported_ciphers
 | 
			
		||||
                   if any(weak in c.upper() for weak in CIPHER_CATEGORIES['WEAK'])]
 | 
			
		||||
 | 
			
		||||
    if weak_ciphers:
 | 
			
		||||
        recommendations.append("🔴 Remove weak cipher suites (RC4, DES, 3DES, NULL)")
 | 
			
		||||
 | 
			
		||||
    # General recommendations
 | 
			
		||||
    if scan_results['security_score'] < 80:
 | 
			
		||||
        recommendations.append("🛡️ Implement modern TLS configuration following Mozilla guidelines")
 | 
			
		||||
 | 
			
		||||
    if not any('ECDHE' in c for c in supported_ciphers):
 | 
			
		||||
        recommendations.append("🟡 Enable Forward Secrecy with ECDHE cipher suites")
 | 
			
		||||
 | 
			
		||||
    # Add note about Python version limitations
 | 
			
		||||
    recommendations.append("ℹ️ Note: SSLv2/SSLv3 testing limited by Python security features")
 | 
			
		||||
 | 
			
		||||
    scan_results['recommendations'] = recommendations
 | 
			
		||||
 | 
			
		||||
async def format_ssl_scan_results(scan_results):
 | 
			
		||||
    """Format comprehensive SSL scan results."""
 | 
			
		||||
    output = f"<strong>🔐 SSL/TLS Security Scan: {scan_results['target']}:{scan_results['port']}</strong><br><br>"
 | 
			
		||||
 | 
			
		||||
    # Security Score
 | 
			
		||||
    score = scan_results['security_score']
 | 
			
		||||
    if score >= 90:
 | 
			
		||||
        score_emoji, rating = "🟢", "Excellent"
 | 
			
		||||
    elif score >= 80:
 | 
			
		||||
        score_emoji, rating = "🟡", "Good"
 | 
			
		||||
    elif score >= 60:
 | 
			
		||||
        score_emoji, rating = "🟠", "Fair"
 | 
			
		||||
    else:
 | 
			
		||||
        score_emoji, rating = "🔴", "Poor"
 | 
			
		||||
 | 
			
		||||
    output += f"<strong>{score_emoji} Security Score: {score}/100 ({rating})</strong><br><br>"
 | 
			
		||||
 | 
			
		||||
    # Certificate Information
 | 
			
		||||
    cert = scan_results.get('certificate', {})
 | 
			
		||||
    if cert:
 | 
			
		||||
        output += "<strong>📜 Certificate Information</strong><br>"
 | 
			
		||||
        output += f"  • <strong>Subject:</strong> {cert.get('subject', {}).get('common_name', 'N/A')}<br>"
 | 
			
		||||
        output += f"  • <strong>Issuer:</strong> {cert.get('issuer', {}).get('common_name', 'N/A')}<br>"
 | 
			
		||||
        output += f"  • <strong>Valid From:</strong> {format_cert_date(cert.get('not_before', ''))}<br>"
 | 
			
		||||
        output += f"  • <strong>Valid Until:</strong> {format_cert_date(cert.get('not_after', ''))}<br>"
 | 
			
		||||
        output += f"  • <strong>Expires In:</strong> {cert.get('days_until_expiry', 'N/A')} days<br>"
 | 
			
		||||
        output += f"  • <strong>Signature Algorithm:</strong> {cert.get('signature_algorithm', 'N/A')}<br>"
 | 
			
		||||
        output += "<br>"
 | 
			
		||||
 | 
			
		||||
    # Protocol Support
 | 
			
		||||
    output += "<strong>🔌 Protocol Support</strong><br>"
 | 
			
		||||
    protocols = scan_results.get('protocols', {})
 | 
			
		||||
    for proto, supported in protocols.items():
 | 
			
		||||
        # Handle protocols that can't be tested in this Python version
 | 
			
		||||
        if proto in ['SSLv2', 'SSLv3'] and proto not in TLS_VERSIONS:
 | 
			
		||||
            emoji = "⚫"
 | 
			
		||||
            status = "Cannot test (Python security)"
 | 
			
		||||
        else:
 | 
			
		||||
            emoji = "✅" if supported else "❌"
 | 
			
		||||
 | 
			
		||||
            # Highlight insecure protocols
 | 
			
		||||
            if proto in ['SSLv2', 'SSLv3'] and supported:
 | 
			
		||||
                emoji = "🔴"
 | 
			
		||||
            elif proto in ['TLSv1.3'] and supported:
 | 
			
		||||
                emoji = "✅"
 | 
			
		||||
 | 
			
		||||
        output += f"  • {emoji} <strong>{proto}:</strong> {status if 'status' in locals() else 'Supported' if supported else 'Not Supported'}<br>"
 | 
			
		||||
    output += "<br>"
 | 
			
		||||
 | 
			
		||||
    # Cipher Information
 | 
			
		||||
    ciphers = scan_results.get('ciphers', {})
 | 
			
		||||
    if ciphers.get('supported_ciphers'):
 | 
			
		||||
        output += "<strong>🔐 Cipher Suites</strong><br>"
 | 
			
		||||
        output += f"  • <strong>Negotiated:</strong> {ciphers.get('negotiated_cipher', 'Unknown')}<br>"
 | 
			
		||||
        output += f"  • <strong>Total Supported:</strong> {len(ciphers['supported_ciphers'])}<br>"
 | 
			
		||||
 | 
			
		||||
        # Show weak ciphers if any
 | 
			
		||||
        weak_ciphers = [c for c in ciphers['supported_ciphers']
 | 
			
		||||
                       if any(weak in c.upper() for weak in CIPHER_CATEGORIES['WEAK'])]
 | 
			
		||||
        if weak_ciphers:
 | 
			
		||||
            output += f"  • <strong>Weak Ciphers:</strong> {len(weak_ciphers)} found<br>"
 | 
			
		||||
            for cipher in weak_ciphers[:3]:
 | 
			
		||||
                output += f"    └─ 🔴 {cipher}<br>"
 | 
			
		||||
 | 
			
		||||
        # Show strong ciphers if any
 | 
			
		||||
        strong_ciphers = [c for c in ciphers['supported_ciphers']
 | 
			
		||||
                         if any(strong in c.upper() for strong in CIPHER_CATEGORIES['STRONG'])]
 | 
			
		||||
        if strong_ciphers:
 | 
			
		||||
            output += f"  • <strong>Strong Ciphers:</strong> {len(strong_ciphers)} found<br>"
 | 
			
		||||
        output += "<br>"
 | 
			
		||||
 | 
			
		||||
    # Vulnerabilities
 | 
			
		||||
    vulnerabilities = scan_results.get('vulnerabilities', [])
 | 
			
		||||
    if vulnerabilities:
 | 
			
		||||
        output += "<strong>⚠️ Security Vulnerabilities</strong><br>"
 | 
			
		||||
        for vuln in vulnerabilities[:5]:  # Show top 5
 | 
			
		||||
            severity_emoji = "🔴" if vuln['severity'] == 'CRITICAL' else "🟠" if vuln['severity'] == 'HIGH' else "🟡"
 | 
			
		||||
            output += f"  • {severity_emoji} <strong>{vuln['name']}</strong> ({vuln['severity']})<br>"
 | 
			
		||||
            output += f"    └─ {vuln['description']}<br>"
 | 
			
		||||
        output += "<br>"
 | 
			
		||||
 | 
			
		||||
    # Recommendations
 | 
			
		||||
    recommendations = scan_results.get('recommendations', [])
 | 
			
		||||
    if recommendations:
 | 
			
		||||
        output += "<strong>💡 Security Recommendations</strong><br>"
 | 
			
		||||
        for rec in recommendations[:8]:
 | 
			
		||||
            output += f"  • {rec}<br>"
 | 
			
		||||
        output += "<br>"
 | 
			
		||||
 | 
			
		||||
    # Quick Assessment
 | 
			
		||||
    output += "<strong>📊 Quick Assessment</strong><br>"
 | 
			
		||||
    if score >= 90:
 | 
			
		||||
        output += "  • ✅ Excellent TLS configuration<br>"
 | 
			
		||||
        output += "  • ✅ Modern protocols and ciphers<br>"
 | 
			
		||||
        output += "  • ✅ Good certificate management<br>"
 | 
			
		||||
    elif score >= 70:
 | 
			
		||||
        output += "  • ⚠️ Good configuration with minor issues<br>"
 | 
			
		||||
        output += "  • 🔧 Some improvements recommended<br>"
 | 
			
		||||
    else:
 | 
			
		||||
        output += "  • 🚨 Significant security issues found<br>"
 | 
			
		||||
        output += "  • 🔴 Immediate action required<br>"
 | 
			
		||||
 | 
			
		||||
    # Add note about testing limitations
 | 
			
		||||
    output += "<br><em>ℹ️ Note: Some protocol tests limited by Python security features</em>"
 | 
			
		||||
 | 
			
		||||
    # Always wrap in collapsible due to comprehensive output
 | 
			
		||||
    output = f"<details><summary><strong>🔐 SSL/TLS Scan: {scan_results['target']}:{scan_results['port']} (Score: {score}/100)</strong></summary>{output}</details>"
 | 
			
		||||
 | 
			
		||||
    return output
 | 
			
		||||
 | 
			
		||||
def format_cert_date(date_str):
 | 
			
		||||
    """Format certificate date string for display."""
 | 
			
		||||
    try:
 | 
			
		||||
        if date_str:
 | 
			
		||||
            dt = datetime.datetime.strptime(date_str, '%Y%m%d%H%M%SZ')
 | 
			
		||||
            return dt.strftime('%Y-%m-%d %H:%M:%S UTC')
 | 
			
		||||
    except:
 | 
			
		||||
        pass
 | 
			
		||||
    return date_str
 | 
			
		||||
		Reference in New Issue
	
	Block a user