refactor: async I/O, input sanitisation, and shared utilities cleanup

This commit is contained in:
2026-05-08 22:59:31 -05:00
parent 52a9621d50
commit f822d6a450
21 changed files with 1351 additions and 2709 deletions
+256 -425
View File
@@ -1,84 +1,48 @@
"""
This plugin provides comprehensive SSL/TLS security scanning and analysis.
Comprehensive SSL/TLS security scanning and analysis.
All blocking socket calls run in a thread pool; user input is sanitised.
"""
import asyncio
import logging
import socket
import ssl
import OpenSSL
import datetime
import re
import simplematrixbotlib as botlib
from urllib.parse import urlparse
from plugins.common import is_public_destination, html_escape, collapsible_summary
from plugins.utils import is_public_destination
# SSL/TLS configuration - handle missing protocols in modern Python
# SSL/TLS configuration handle missing protocols in modern Python
TLS_VERSIONS = {
'TLSv1.2': ssl.PROTOCOL_TLSv1_2,
'TLSv1.3': ssl.PROTOCOL_TLS
}
# Try to add older protocols if available (they're removed in modern Python)
try:
TLS_VERSIONS['TLSv1.1'] = ssl.PROTOCOL_TLSv1_1
except AttributeError:
pass
try:
TLS_VERSIONS['TLSv1'] = ssl.PROTOCOL_TLSv1
except AttributeError:
pass
# Cipher suites by strength and category
CIPHER_CATEGORIES = {
'STRONG': [
'TLS_AES_256_GCM_SHA384',
'TLS_CHACHA20_POLY1305_SHA256',
'TLS_AES_128_GCM_SHA256',
'ECDHE-RSA-AES256-GCM-SHA384',
'ECDHE-ECDSA-AES256-GCM-SHA384',
'ECDHE-RSA-CHACHA20-POLY1305',
'ECDHE-ECDSA-CHACHA20-POLY1305',
'DHE-RSA-AES256-GCM-SHA384'
'TLS_AES_256_GCM_SHA384', 'TLS_CHACHA20_POLY1305_SHA256',
'TLS_AES_128_GCM_SHA256', 'ECDHE-RSA-AES256-GCM-SHA384',
'ECDHE-ECDSA-AES256-GCM-SHA384', 'ECDHE-RSA-CHACHA20-POLY1305',
'ECDHE-ECDSA-CHACHA20-POLY1305', 'DHE-RSA-AES256-GCM-SHA384'
],
'WEAK': [
'RC4',
'DES',
'3DES',
'MD5',
'EXPORT',
'NULL',
'ANON',
'ADH',
'CBC'
],
'OBSOLETE': [
'SSLv2',
'SSLv3'
]
'WEAK': ['RC4', 'DES', '3DES', 'MD5', 'EXPORT', 'NULL', 'ANON', 'ADH', 'CBC'],
}
async def handle_command(room, message, bot, prefix, config):
"""
Function to handle !sslscan command for comprehensive SSL/TLS analysis.
Args:
room (Room): The Matrix room where the command was invoked.
message (RoomMessage): The message object containing the command.
bot (Bot): The bot object.
prefix (str): The command prefix.
config (dict): Configuration parameters.
Returns:
None
Handle !sslscan command for comprehensive SSL/TLS analysis.
"""
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("sslscan"):
logging.info("Received !sslscan command")
args = match.args()
if len(args) < 1:
await show_usage(room, bot)
return
@@ -86,7 +50,6 @@ async def handle_command(room, message, bot, prefix, config):
target = args[0].strip()
port = 443
# Parse port if provided
if ':' in target:
parts = target.split(':')
target = parts[0]
@@ -96,16 +59,13 @@ async def handle_command(room, message, bot, prefix, config):
await bot.api.send_text_message(room.room_id, "Invalid port number")
return
# SSRF protection: refuse internal hosts
if not is_public_destination(target):
await bot.api.send_text_message(
room.room_id,
"❌ Scanning of private/internal addresses is not allowed."
)
await bot.api.send_text_message(room.room_id, "❌ Scanning of private/internal addresses is not allowed.")
return
await perform_ssl_scan(room, bot, target, port)
async def show_usage(room, bot):
"""Display sslscan command usage."""
usage = """
@@ -116,7 +76,6 @@ async def show_usage(room, bot):
<strong>Examples:</strong>
• <code>!sslscan example.com</code>
• <code>!sslscan github.com:443</code>
• <code>!sslscan localhost:8443</code>
<strong>Tests Performed:</strong>
• SSL/TLS protocol support and versions
@@ -129,488 +88,360 @@ async def show_usage(room, bot):
"""
await bot.api.send_markdown_message(room.room_id, usage)
async def perform_ssl_scan(room, bot, target, port):
"""Perform comprehensive SSL/TLS security scan."""
# ----- async wrappers for blocking socket calls -----
async def _run_blocking(func, *args, **kwargs):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, lambda: func(*args, **kwargs))
def _test_connectivity(target, port):
"""Test basic connectivity."""
try:
await bot.api.send_text_message(room.room_id, f"🔍 Starting comprehensive SSL/TLS scan for {target}:{port}...")
scan_results = {
'target': target,
'port': port,
'certificate': {},
'protocols': {},
'ciphers': {},
'vulnerabilities': [],
'recommendations': [],
'security_score': 0
}
# Test basic connectivity
if not await test_connectivity(target, port):
await bot.api.send_text_message(room.room_id, f"❌ Cannot connect to {target}:{port}")
return
# Perform comprehensive tests
await get_certificate_info(scan_results, target, port)
await test_protocol_support(scan_results, target, port)
await test_cipher_suites(scan_results, target, port)
await check_vulnerabilities(scan_results)
await calculate_security_score(scan_results)
await generate_recommendations(scan_results)
# Format and send results
output = await format_ssl_scan_results(scan_results)
await bot.api.send_markdown_message(room.room_id, output)
logging.info(f"Completed SSL scan for {target}:{port}")
except Exception as e:
await bot.api.send_text_message(room.room_id, f"Error performing SSL scan: {str(e)}")
logging.error(f"Error in perform_ssl_scan: {e}")
async def test_connectivity(target, port):
"""Test basic connectivity to the target."""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(10)
result = sock.connect_ex((target, port))
sock.close()
return result == 0
with socket.create_connection((target, port), timeout=10):
return True
except:
return False
async def get_certificate_info(scan_results, target, port):
"""Get comprehensive certificate information."""
try:
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
with socket.create_connection((target, port), timeout=10) as sock:
with context.wrap_socket(sock, server_hostname=target) as ssock:
cert_bin = ssock.getpeercert(binary_form=True)
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert_bin)
def _get_certificate_info(target, port):
"""Retrieve detailed certificate info."""
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
# Basic certificate info
subject = cert.get_subject()
issuer = cert.get_issuer()
with socket.create_connection((target, port), timeout=10) as sock:
with context.wrap_socket(sock, server_hostname=target) as ssock:
cert_bin = ssock.getpeercert(binary_form=True)
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert_bin)
scan_results['certificate'] = {
'subject': {
'common_name': subject.CN,
'organization': subject.O,
'organizational_unit': subject.OU,
'country': subject.C,
'state': subject.ST,
'locality': subject.L
},
'issuer': {
'common_name': issuer.CN,
'organization': issuer.O,
'organizational_unit': issuer.OU
},
'serial_number': cert.get_serial_number(),
'version': cert.get_version(),
'not_before': cert.get_notBefore().decode('utf-8'),
'not_after': cert.get_notAfter().decode('utf-8'),
'signature_algorithm': cert.get_signature_algorithm().decode('utf-8'),
'extensions': []
}
subject = cert.get_subject()
issuer = cert.get_issuer()
# Parse extensions
for i in range(cert.get_extension_count()):
ext = cert.get_extension(i)
scan_results['certificate']['extensions'].append({
'name': ext.get_short_name().decode('utf-8'),
'value': str(ext)
})
not_before = cert.get_notBefore().decode('utf-8')
not_after = cert.get_notAfter().decode('utf-8')
sig_alg = cert.get_signature_algorithm().decode('utf-8')
# Calculate days until expiration
not_after = datetime.datetime.strptime(scan_results['certificate']['not_after'], '%Y%m%d%H%M%SZ')
days_until_expiry = (not_after - datetime.datetime.utcnow()).days
scan_results['certificate']['days_until_expiry'] = days_until_expiry
not_after_dt = datetime.datetime.strptime(not_after, '%Y%m%d%H%M%SZ')
days_remaining = (not_after_dt - datetime.datetime.utcnow()).days
except Exception as e:
scan_results['certificate_error'] = str(e)
# Extensions summary
extensions = []
for i in range(cert.get_extension_count()):
ext = cert.get_extension(i)
extensions.append({
'name': ext.get_short_name().decode('utf-8'),
'value': str(ext)
})
async def test_protocol_support(scan_results, target, port):
return {
'subject': {
'common_name': subject.CN,
'organization': subject.O,
'organizational_unit': subject.OU,
'country': subject.C,
'state': subject.ST,
'locality': subject.L
},
'issuer': {
'common_name': issuer.CN,
'organization': issuer.O,
'organizational_unit': issuer.OU
},
'serial_number': cert.get_serial_number(),
'version': cert.get_version(),
'not_before': not_before,
'not_after': not_after,
'signature_algorithm': sig_alg,
'days_until_expiry': days_remaining,
'extensions': extensions
}
return None
def _test_protocols(target, port):
"""Test support for various SSL/TLS protocols."""
protocols = {
'SSLv2': False,
'SSLv3': False,
'TLSv1': False,
'TLSv1.1': False,
'TLSv1.2': False,
'TLSv1.3': False
}
# Test available protocols
for protocol_name in protocols.keys():
protocols = {}
for proto_name in ['SSLv2', 'SSLv3', 'TLSv1', 'TLSv1.1', 'TLSv1.2', 'TLSv1.3']:
if proto_name not in TLS_VERSIONS:
protocols[proto_name] = False
continue
try:
if protocol_name in TLS_VERSIONS:
context = ssl.SSLContext(TLS_VERSIONS[protocol_name])
else:
# For protocols not available in this Python version, assume False
protocols[protocol_name] = False
continue
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
ctx = ssl.SSLContext(TLS_VERSIONS[proto_name])
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
with socket.create_connection((target, port), timeout=5) as sock:
with context.wrap_socket(sock, server_hostname=target) as ssock:
protocols[protocol_name] = True
# Get negotiated protocol
if hasattr(ssock, 'version'):
scan_results['negotiated_protocol'] = ssock.version()
with ctx.wrap_socket(sock, server_hostname=target):
protocols[proto_name] = True
except:
protocols[protocol_name] = False
protocols[proto_name] = False
return protocols
scan_results['protocols'] = protocols
async def test_cipher_suites(scan_results, target, port):
"""Test supported cipher suites."""
try:
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
# Get default cipher suites
context.set_ciphers('ALL:COMPLEMENTOFALL')
with socket.create_connection((target, port), timeout=10) as sock:
with context.wrap_socket(sock, server_hostname=target) as ssock:
cipher = ssock.cipher()
scan_results['ciphers'] = {
'negotiated_cipher': cipher[0] if cipher else 'Unknown',
'supported_ciphers': await get_supported_ciphers(target, port),
'weak_ciphers': [],
'strong_ciphers': []
}
except Exception as e:
scan_results['cipher_error'] = str(e)
async def get_supported_ciphers(target, port):
"""Get list of supported cipher suites."""
supported_ciphers = []
# Test common cipher suites
def _test_cipher_suites(target, port):
"""Return list of supported cipher suite names."""
test_ciphers = [
'ECDHE-RSA-AES256-GCM-SHA384',
'ECDHE-ECDSA-AES256-GCM-SHA384',
'ECDHE-RSA-AES256-SHA384',
'ECDHE-ECDSA-AES256-SHA384',
'ECDHE-RSA-AES256-SHA',
'ECDHE-ECDSA-AES256-SHA',
'AES256-GCM-SHA384',
'AES256-SHA256',
'AES256-SHA',
'CAMELLIA256-SHA',
'PSK-AES256-CBC-SHA',
'ECDHE-RSA-AES128-GCM-SHA256',
'ECDHE-ECDSA-AES128-GCM-SHA256',
'ECDHE-RSA-AES128-SHA256',
'ECDHE-ECDSA-AES128-SHA256',
'ECDHE-RSA-AES128-SHA',
'ECDHE-ECDSA-AES128-SHA',
'AES128-GCM-SHA256',
'AES128-SHA256',
'AES128-SHA',
'CAMELLIA128-SHA',
'PSK-AES128-CBC-SHA',
'DES-CBC3-SHA',
'RC4-SHA',
'RC4-MD5'
'ECDHE-RSA-AES256-GCM-SHA384', 'ECDHE-ECDSA-AES256-GCM-SHA384',
'ECDHE-RSA-AES256-SHA384', 'ECDHE-ECDSA-AES256-SHA384',
'ECDHE-RSA-AES256-SHA', 'ECDHE-ECDSA-AES256-SHA',
'AES256-GCM-SHA384', 'AES256-SHA256', 'AES256-SHA',
'CAMELLIA256-SHA', 'PSK-AES256-CBC-SHA',
'ECDHE-RSA-AES128-GCM-SHA256', 'ECDHE-ECDSA-AES128-GCM-SHA256',
'ECDHE-RSA-AES128-SHA256', 'ECDHE-ECDSA-AES128-SHA256',
'ECDHE-RSA-AES128-SHA', 'ECDHE-ECDSA-AES128-SHA',
'AES128-GCM-SHA256', 'AES128-SHA256', 'AES128-SHA',
'CAMELLIA128-SHA', 'PSK-AES128-CBC-SHA',
'DES-CBC3-SHA', 'RC4-SHA', 'RC4-MD5'
]
supported = []
for cipher in test_ciphers:
try:
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
context.set_ciphers(cipher)
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
ctx.set_ciphers(cipher)
with socket.create_connection((target, port), timeout=5) as sock:
with context.wrap_socket(sock, server_hostname=target) as ssock:
if ssock.cipher():
supported_ciphers.append(cipher)
with ctx.wrap_socket(sock, server_hostname=target):
supported.append(cipher)
except:
pass
return supported
return supported_ciphers
async def check_vulnerabilities(scan_results):
"""Check for common SSL/TLS vulnerabilities."""
vulnerabilities = []
# ----- analysis helpers (same logic as original) -----
def _check_vulnerabilities(protocols, cert_info, supported_ciphers):
vulns = []
# Check for weak protocols
if scan_results['protocols'].get('SSLv2', False):
vulnerabilities.append({
if protocols.get('SSLv2'):
vulns.append({
'name': 'SSLv2 Support',
'severity': 'CRITICAL',
'description': 'SSLv2 is obsolete and contains critical vulnerabilities',
'cve': 'Multiple CVEs'
})
if scan_results['protocols'].get('SSLv3', False):
vulnerabilities.append({
if protocols.get('SSLv3'):
vulns.append({
'name': 'SSLv3 Support',
'severity': 'HIGH',
'description': 'SSLv3 is vulnerable to POODLE attack',
'cve': 'CVE-2014-3566'
})
# Check certificate expiration
cert = scan_results.get('certificate', {})
if cert.get('days_until_expiry', 0) < 30:
vulnerabilities.append({
if cert_info and cert_info.get('days_until_expiry', 0) < 30:
vulns.append({
'name': 'Certificate Expiring Soon',
'severity': 'MEDIUM',
'description': f"Certificate expires in {cert['days_until_expiry']} days",
'description': f"Certificate expires in {cert_info['days_until_expiry']} days",
'cve': 'N/A'
})
# Check for weak ciphers
supported_ciphers = scan_results.get('ciphers', {}).get('supported_ciphers', [])
weak_ciphers_found = []
for cipher in supported_ciphers:
if any(weak in cipher.upper() for weak in CIPHER_CATEGORIES['WEAK']):
weak_ciphers_found.append(cipher)
if weak_ciphers_found:
vulnerabilities.append({
weak_ciphers = [c for c in supported_ciphers
if any(weak in c.upper() for weak in CIPHER_CATEGORIES['WEAK'])]
if weak_ciphers:
vulns.append({
'name': 'Weak Cipher Suites',
'severity': 'HIGH',
'description': f'Weak ciphers supported: {", ".join(weak_ciphers_found[:3])}',
'description': f'Weak ciphers supported: {", ".join(weak_ciphers[:3])}',
'cve': 'Multiple CVEs'
})
# Check for missing modern protocols
if not scan_results['protocols'].get('TLSv1.2', False):
vulnerabilities.append({
if not protocols.get('TLSv1.2', False):
vulns.append({
'name': 'TLS 1.2 Not Supported',
'severity': 'HIGH',
'description': 'TLS 1.2 is required for modern security',
'cve': 'N/A'
})
if not scan_results['protocols'].get('TLSv1.3', False):
vulnerabilities.append({
if not protocols.get('TLSv1.3', False):
vulns.append({
'name': 'TLS 1.3 Not Supported',
'severity': 'MEDIUM',
'description': 'TLS 1.3 provides improved security and performance',
'cve': 'N/A'
})
scan_results['vulnerabilities'] = vulnerabilities
return vulns
async def calculate_security_score(scan_results):
"""Calculate overall security score."""
def _calculate_score(protocols, cert_info, supported_ciphers, vulnerabilities):
score = 100
# Protocol penalties
if scan_results['protocols'].get('SSLv2', False):
score -= 30
if scan_results['protocols'].get('SSLv3', False):
score -= 20
if not scan_results['protocols'].get('TLSv1.2', False):
score -= 15
if not scan_results['protocols'].get('TLSv1.3', False):
score -= 10
if protocols.get('SSLv2'): score -= 30
if protocols.get('SSLv3'): score -= 20
if not protocols.get('TLSv1.2'): score -= 15
if not protocols.get('TLSv1.3'): score -= 10
# Certificate penalties
cert = scan_results.get('certificate', {})
if cert.get('days_until_expiry', 0) < 30:
score -= 10
if cert.get('days_until_expiry', 0) < 7:
score -= 20
if cert_info and cert_info.get('days_until_expiry', 0) < 30: score -= 10
if cert_info and cert_info.get('days_until_expiry', 0) < 7: score -= 20
# Cipher penalties
supported_ciphers = scan_results.get('ciphers', {}).get('supported_ciphers', [])
weak_cipher_count = sum(1 for cipher in supported_ciphers
if any(weak in cipher.upper() for weak in CIPHER_CATEGORIES['WEAK']))
weak_cipher_count = sum(1 for c in supported_ciphers
if any(w in c.upper() for w in CIPHER_CATEGORIES['WEAK']))
score -= min(weak_cipher_count * 5, 25)
# Vulnerability penalties
for vuln in scan_results.get('vulnerabilities', []):
if vuln['severity'] == 'CRITICAL':
score -= 20
elif vuln['severity'] == 'HIGH':
score -= 15
elif vuln['severity'] == 'MEDIUM':
score -= 10
elif vuln['severity'] == 'LOW':
score -= 5
for vuln in vulnerabilities:
if vuln['severity'] == 'CRITICAL': score -= 20
elif vuln['severity'] == 'HIGH': score -= 15
elif vuln['severity'] == 'MEDIUM': score -= 10
elif vuln['severity'] == 'LOW': score -= 5
scan_results['security_score'] = max(0, score)
return max(0, score)
async def generate_recommendations(scan_results):
"""Generate security recommendations."""
recommendations = []
# Protocol recommendations
if scan_results['protocols'].get('SSLv2', False):
recommendations.append("🔴 IMMEDIATELY disable SSLv2 - critically vulnerable")
if scan_results['protocols'].get('SSLv3', False):
recommendations.append("🔴 Disable SSLv3 - vulnerable to POODLE attack")
if not scan_results['protocols'].get('TLSv1.3', False):
recommendations.append("🟡 Enable TLSv1.3 for best security and performance")
def _generate_recommendations(protocols, cert_info, supported_ciphers, score):
recs = []
if protocols.get('SSLv2'): recs.append("🔴 IMMEDIATELY disable SSLv2 - critically vulnerable")
if protocols.get('SSLv3'): recs.append("🔴 Disable SSLv3 - vulnerable to POODLE attack")
if not protocols.get('TLSv1.3'): recs.append("🟡 Enable TLSv1.3 for best security and performance")
# Certificate recommendations
cert = scan_results.get('certificate', {})
if cert.get('days_until_expiry', 0) < 30:
recommendations.append("🟡 Renew SSL certificate - expiring soon")
if cert_info and cert_info.get('days_until_expiry', 0) < 30:
recs.append("🟡 Renew SSL certificate - expiring soon")
# Cipher recommendations
supported_ciphers = scan_results.get('ciphers', {}).get('supported_ciphers', [])
weak_ciphers = [c for c in supported_ciphers
if any(weak in c.upper() for weak in CIPHER_CATEGORIES['WEAK'])]
if any(w in c.upper() for w in CIPHER_CATEGORIES['WEAK'])]
if weak_ciphers:
recommendations.append("🔴 Remove weak cipher suites (RC4, DES, 3DES, NULL)")
recs.append("🔴 Remove weak cipher suites (RC4, DES, 3DES, NULL)")
# General recommendations
if scan_results['security_score'] < 80:
recommendations.append("🛡️ Implement modern TLS configuration following Mozilla guidelines")
if score < 80:
recs.append("🛡️ Implement modern TLS configuration following Mozilla guidelines")
if not any('ECDHE' in c for c in supported_ciphers):
recommendations.append("🟡 Enable Forward Secrecy with ECDHE cipher suites")
recs.append("🟡 Enable Forward Secrecy with ECDHE cipher suites")
# Add note about Python version limitations
recommendations.append("️ Note: SSLv2/SSLv3 testing limited by Python security features")
recs.append("️ Note: SSLv2/SSLv3 testing limited by Python security features")
return recs
scan_results['recommendations'] = recommendations
async def format_ssl_scan_results(scan_results):
"""Format comprehensive SSL scan results."""
output = f"<strong>🔐 SSL/TLS Security Scan: {scan_results['target']}:{scan_results['port']}</strong><br><br>"
def _format_cert_date(date_str):
try:
dt = datetime.datetime.strptime(date_str, '%Y%m%d%H%M%SZ')
return dt.strftime('%Y-%m-%d %H:%M:%S UTC')
except:
return date_str
# Security Score
score = scan_results['security_score']
if score >= 90:
score_emoji, rating = "🟢", "Excellent"
elif score >= 80:
score_emoji, rating = "🟡", "Good"
elif score >= 60:
score_emoji, rating = "🟠", "Fair"
else:
score_emoji, rating = "🔴", "Poor"
output += f"<strong>{score_emoji} Security Score: {score}/100 ({rating})</strong><br><br>"
# ----- main scan orchestration -----
async def perform_ssl_scan(room, bot, target, port):
safe_target = html_escape(target)
await bot.api.send_text_message(room.room_id, f"🔍 Starting comprehensive SSL/TLS scan for {safe_target}:{port}...")
if not await _run_blocking(_test_connectivity, target, port):
await bot.api.send_text_message(room.room_id, f"❌ Cannot connect to {safe_target}:{port}")
return
# Run blocking checks in parallel
cert_task = _run_blocking(_get_certificate_info, target, port)
proto_task = _run_blocking(_test_protocols, target, port)
cipher_task = _run_blocking(_test_cipher_suites, target, port)
cert_info, protocols, supported_ciphers = await asyncio.gather(cert_task, proto_task, cipher_task)
vulnerabilities = _check_vulnerabilities(protocols, cert_info, supported_ciphers)
score = _calculate_score(protocols, cert_info, supported_ciphers, vulnerabilities)
recommendations = _generate_recommendations(protocols, cert_info, supported_ciphers, score)
# Build output (using safe domain/port)
output = await _format_results(target, port, cert_info, protocols, supported_ciphers,
vulnerabilities, score, recommendations)
await bot.api.send_markdown_message(room.room_id, output)
logging.info(f"Completed SSL scan for {target}:{port}")
async def _format_results(target, port, cert_info, protocols, supported_ciphers,
vulnerabilities, score, recommendations):
safe_target = html_escape(target)
score_emoji = "🟢" if score >= 90 else "🟡" if score >= 80 else "🟠" if score >= 60 else "🔴"
rating = "Excellent" if score >= 90 else "Good" if score >= 80 else "Fair" if score >= 60 else "Poor"
body = f"<strong>🔐 SSL/TLS Security Scan: {safe_target}:{port}</strong><br><br>"
body += f"<strong>{score_emoji} Security Score: {score}/100 ({rating})</strong><br><br>"
# Certificate Information
cert = scan_results.get('certificate', {})
if cert:
output += "<strong>📜 Certificate Information</strong><br>"
output += f" • <strong>Subject:</strong> {cert.get('subject', {}).get('common_name', 'N/A')}<br>"
output += f" • <strong>Issuer:</strong> {cert.get('issuer', {}).get('common_name', 'N/A')}<br>"
output += f" • <strong>Valid From:</strong> {format_cert_date(cert.get('not_before', ''))}<br>"
output += f" • <strong>Valid Until:</strong> {format_cert_date(cert.get('not_after', ''))}<br>"
output += f" • <strong>Expires In:</strong> {cert.get('days_until_expiry', 'N/A')} days<br>"
output += f" • <strong>Signature Algorithm:</strong> {cert.get('signature_algorithm', 'N/A')}<br>"
output += "<br>"
if cert_info:
body += "<strong>📜 Certificate Information</strong><br>"
body += f"<strong>Subject:</strong> {html_escape(cert_info['subject'].get('common_name', 'N/A'))}<br>"
body += f" • <strong>Issuer:</strong> {html_escape(cert_info['issuer'].get('common_name', 'N/A'))}<br>"
body += f" • <strong>Valid From:</strong> {_format_cert_date(cert_info['not_before'])}<br>"
body += f" • <strong>Valid Until:</strong> {_format_cert_date(cert_info['not_after'])}<br>"
days = cert_info.get('days_until_expiry', 'N/A')
body += f" • <strong>Expires In:</strong> {days} days<br>"
body += f" • <strong>Signature Algorithm:</strong> {html_escape(cert_info['signature_algorithm'])}<br>"
body += "<br>"
# Protocol Support
output += "<strong>🔌 Protocol Support</strong><br>"
protocols = scan_results.get('protocols', {})
for proto, supported in protocols.items():
# Handle protocols that can't be tested in this Python version
if proto in ['SSLv2', 'SSLv3'] and proto not in TLS_VERSIONS:
emoji = ""
status = "Cannot test (Python security)"
body += "<strong>🔌 Protocol Support</strong><br>"
for proto in ['SSLv2', 'SSLv3', 'TLSv1', 'TLSv1.1', 'TLSv1.2', 'TLSv1.3']:
supported = protocols.get(proto, False)
if proto in ['SSLv2', 'SSLv3'] and supported:
emoji = "🔴"
elif proto == 'TLSv1.3' and supported:
emoji = ""
else:
emoji = "" if supported else ""
status = "Supported" if supported else "Not Supported"
if proto in ['SSLv2', 'SSLv3'] and proto not in TLS_VERSIONS:
status = "Cannot test (Python security)"
emoji = ""
body += f"{emoji} <strong>{proto}:</strong> {status}<br>"
body += "<br>"
# Highlight insecure protocols
if proto in ['SSLv2', 'SSLv3'] and supported:
emoji = "🔴"
elif proto in ['TLSv1.3'] and supported:
emoji = ""
# Cipher Suites
body += "<strong>🔐 Cipher Suites</strong><br>"
body += f" • <strong>Total Supported:</strong> {len(supported_ciphers)}<br>"
output += f"{emoji} <strong>{proto}:</strong> {status if 'status' in locals() else 'Supported' if supported else 'Not Supported'}<br>"
output += "<br>"
# Cipher Information
ciphers = scan_results.get('ciphers', {})
if ciphers.get('supported_ciphers'):
output += "<strong>🔐 Cipher Suites</strong><br>"
output += f" • <strong>Negotiated:</strong> {ciphers.get('negotiated_cipher', 'Unknown')}<br>"
output += f" • <strong>Total Supported:</strong> {len(ciphers['supported_ciphers'])}<br>"
# Show weak ciphers if any
weak_ciphers = [c for c in ciphers['supported_ciphers']
if any(weak in c.upper() for weak in CIPHER_CATEGORIES['WEAK'])]
if weak_ciphers:
output += f" • <strong>Weak Ciphers:</strong> {len(weak_ciphers)} found<br>"
for cipher in weak_ciphers[:3]:
output += f" └─ 🔴 {cipher}<br>"
# Show strong ciphers if any
strong_ciphers = [c for c in ciphers['supported_ciphers']
if any(strong in c.upper() for strong in CIPHER_CATEGORIES['STRONG'])]
if strong_ciphers:
output += f" • <strong>Strong Ciphers:</strong> {len(strong_ciphers)} found<br>"
output += "<br>"
weak_ciphers = [c for c in supported_ciphers
if any(w in c.upper() for w in CIPHER_CATEGORIES['WEAK'])]
if weak_ciphers:
body += f" • <strong>Weak Ciphers:</strong> {len(weak_ciphers)} found<br>"
for cipher in weak_ciphers[:3]:
body += f" └─ 🔴 {html_escape(cipher)}<br>"
strong_ciphers = [c for c in supported_ciphers
if any(s in c.upper() for s in CIPHER_CATEGORIES['STRONG'])]
if strong_ciphers:
body += f" • <strong>Strong Ciphers:</strong> {len(strong_ciphers)} found<br>"
body += "<br>"
# Vulnerabilities
vulnerabilities = scan_results.get('vulnerabilities', [])
if vulnerabilities:
output += "<strong>⚠️ Security Vulnerabilities</strong><br>"
for vuln in vulnerabilities[:5]: # Show top 5
severity_emoji = "🔴" if vuln['severity'] == 'CRITICAL' else "🟠" if vuln['severity'] == 'HIGH' else "🟡"
output += f"{severity_emoji} <strong>{vuln['name']}</strong> ({vuln['severity']})<br>"
output += f" └─ {vuln['description']}<br>"
output += "<br>"
body += "<strong>⚠️ Security Vulnerabilities</strong><br>"
for vuln in vulnerabilities[:5]:
sev_emoji = "🔴" if vuln['severity'] == 'CRITICAL' else "🟠" if vuln['severity'] == 'HIGH' else "🟡"
body += f"{sev_emoji} <strong>{html_escape(vuln['name'])}</strong> ({vuln['severity']})<br>"
body += f" └─ {html_escape(vuln['description'])}<br>"
body += "<br>"
# Recommendations
recommendations = scan_results.get('recommendations', [])
if recommendations:
output += "<strong>💡 Security Recommendations</strong><br>"
body += "<strong>💡 Security Recommendations</strong><br>"
for rec in recommendations[:8]:
output += f"{rec}<br>"
output += "<br>"
body += f"{rec}<br>"
body += "<br>"
# Quick Assessment
output += "<strong>📊 Quick Assessment</strong><br>"
body += "<strong>📊 Quick Assessment</strong><br>"
if score >= 90:
output += " • ✅ Excellent TLS configuration<br>"
output += " • ✅ Modern protocols and ciphers<br>"
output += " • ✅ Good certificate management<br>"
body += " • ✅ Excellent TLS configuration<br>"
body += " • ✅ Modern protocols and ciphers<br>"
body += " • ✅ Good certificate management<br>"
elif score >= 70:
output += " • ⚠️ Good configuration with minor issues<br>"
output += " • 🔧 Some improvements recommended<br>"
body += " • ⚠️ Good configuration with minor issues<br>"
body += " • 🔧 Some improvements recommended<br>"
else:
output += " • 🚨 Significant security issues found<br>"
output += " • 🔴 Immediate action required<br>"
body += " • 🚨 Significant security issues found<br>"
body += " • 🔴 Immediate action required<br>"
# Add note about testing limitations
output += "<br><em>️ Note: Some protocol tests limited by Python security features</em>"
body += "<br><em>️ Note: Some protocol tests limited by Python security features</em>"
# Always wrap in collapsible due to comprehensive output
output = f"<details><summary><strong>🔐 SSL/TLS Scan: {scan_results['target']}:{scan_results['port']} (Score: {score}/100)</strong></summary>{output}</details>"
return output
def format_cert_date(date_str):
"""Format certificate date string for display."""
try:
if date_str:
dt = datetime.datetime.strptime(date_str, '%Y%m%d%H%M%SZ')
return dt.strftime('%Y-%m-%d %H:%M:%S UTC')
except:
pass
return date_str
return collapsible_summary(f"🔐 SSL/TLS Scan: {safe_target}:{port} (Score: {score}/100)", body)
# ---------------------------------------------------------------------------
# Plugin Metadata
# ---------------------------------------------------------------------------
__version__ = "1.0.1"
__version__ = "1.0.2"
__author__ = "Funguy Bot"
__description__ = "SSL/TLS security scanner (SSRFsafe)"
__description__ = "SSL/TLS security scanner (SSRFsafe, async)"
__help__ = """
<details>
<summary><strong>!sslscan</strong> SSL/TLS analysis</summary>