343 lines
14 KiB
Python
343 lines
14 KiB
Python
"""
|
||
Comprehensive SSL/TLS security scanning and analysis.
|
||
All blocking socket calls run in a thread pool; user input is sanitised.
|
||
Output is a clean code block with aligned columns.
|
||
"""
|
||
|
||
import asyncio
|
||
import logging
|
||
import socket
|
||
import ssl
|
||
import OpenSSL
|
||
import datetime
|
||
import simplematrixbotlib as botlib
|
||
from plugins.common import is_public_destination, html_escape, collapsible_summary, code_block
|
||
|
||
# SSL/TLS configuration – handle missing protocols in modern Python
|
||
TLS_VERSIONS = {
|
||
'TLSv1.2': ssl.PROTOCOL_TLSv1_2,
|
||
'TLSv1.3': ssl.PROTOCOL_TLS
|
||
}
|
||
try:
|
||
TLS_VERSIONS['TLSv1.1'] = ssl.PROTOCOL_TLSv1_1
|
||
except AttributeError:
|
||
pass
|
||
try:
|
||
TLS_VERSIONS['TLSv1'] = ssl.PROTOCOL_TLSv1
|
||
except AttributeError:
|
||
pass
|
||
|
||
CIPHER_CATEGORIES = {
|
||
'STRONG': [
|
||
'TLS_AES_256_GCM_SHA384', 'TLS_CHACHA20_POLY1305_SHA256',
|
||
'TLS_AES_128_GCM_SHA256', 'ECDHE-RSA-AES256-GCM-SHA384',
|
||
'ECDHE-ECDSA-AES256-GCM-SHA384', 'ECDHE-RSA-CHACHA20-POLY1305',
|
||
'ECDHE-ECDSA-CHACHA20-POLY1305', 'DHE-RSA-AES256-GCM-SHA384'
|
||
],
|
||
'WEAK': ['RC4', 'DES', '3DES', 'MD5', 'EXPORT', 'NULL', 'ANON', 'ADH', 'CBC'],
|
||
}
|
||
|
||
async def handle_command(room, message, bot, prefix, config):
|
||
match = botlib.MessageMatch(room, message, bot, prefix)
|
||
if match.is_not_from_this_bot() and match.prefix() and match.command("sslscan"):
|
||
args = match.args()
|
||
if len(args) < 1:
|
||
await show_usage(room, bot)
|
||
return
|
||
|
||
target = args[0].strip()
|
||
port = 443
|
||
if ':' in target:
|
||
parts = target.split(':')
|
||
target = parts[0]
|
||
try:
|
||
port = int(parts[1])
|
||
except ValueError:
|
||
await bot.api.send_text_message(room.room_id, "Invalid port number")
|
||
return
|
||
|
||
if not is_public_destination(target):
|
||
await bot.api.send_text_message(room.room_id, "❌ Scanning of private/internal addresses is not allowed.")
|
||
return
|
||
|
||
await perform_ssl_scan(room, bot, target, port)
|
||
|
||
async def show_usage(room, bot):
|
||
usage = """<strong>🔐 SSL/TLS Security Scanner</strong>
|
||
<strong>!sslscan <domain[:port]></strong> - Comprehensive SSL/TLS security analysis
|
||
|
||
<strong>Examples:</strong>
|
||
• <code>!sslscan example.com</code>
|
||
• <code>!sslscan github.com:443</code>
|
||
|
||
<strong>Tests Performed:</strong>
|
||
• SSL/TLS protocol support and versions
|
||
• Certificate chain validation and expiration
|
||
• Cipher suite strength and configuration
|
||
• Security headers and extensions
|
||
• Handshake simulation and vulnerabilities
|
||
• PCI DSS compliance checking
|
||
• SSL/TLS best practices assessment
|
||
"""
|
||
await bot.api.send_markdown_message(room.room_id, usage)
|
||
|
||
async def _run_blocking(func, *args, **kwargs):
|
||
loop = asyncio.get_running_loop()
|
||
return await loop.run_in_executor(None, lambda: func(*args, **kwargs))
|
||
|
||
def _test_connectivity(target, port):
|
||
try:
|
||
with socket.create_connection((target, port), timeout=10):
|
||
return True
|
||
except:
|
||
return False
|
||
|
||
def _get_certificate_info(target, port):
|
||
context = ssl.create_default_context()
|
||
context.check_hostname = False
|
||
context.verify_mode = ssl.CERT_NONE
|
||
with socket.create_connection((target, port), timeout=10) as sock:
|
||
with context.wrap_socket(sock, server_hostname=target) as ssock:
|
||
cert_bin = ssock.getpeercert(binary_form=True)
|
||
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert_bin)
|
||
|
||
subject = cert.get_subject()
|
||
issuer = cert.get_issuer()
|
||
not_before = cert.get_notBefore().decode('utf-8')
|
||
not_after = cert.get_notAfter().decode('utf-8')
|
||
sig_alg = cert.get_signature_algorithm().decode('utf-8')
|
||
not_after_dt = datetime.datetime.strptime(not_after, '%Y%m%d%H%M%SZ')
|
||
days_remaining = (not_after_dt - datetime.datetime.utcnow()).days
|
||
|
||
extensions = []
|
||
for i in range(cert.get_extension_count()):
|
||
ext = cert.get_extension(i)
|
||
extensions.append({
|
||
'name': ext.get_short_name().decode('utf-8'),
|
||
'value': str(ext)
|
||
})
|
||
|
||
return {
|
||
'subject': {
|
||
'common_name': subject.CN,
|
||
'organization': subject.O,
|
||
'organizational_unit': subject.OU,
|
||
'country': subject.C,
|
||
'state': subject.ST,
|
||
'locality': subject.L
|
||
},
|
||
'issuer': {
|
||
'common_name': issuer.CN,
|
||
'organization': issuer.O,
|
||
'organizational_unit': issuer.OU
|
||
},
|
||
'serial_number': cert.get_serial_number(),
|
||
'version': cert.get_version(),
|
||
'not_before': not_before,
|
||
'not_after': not_after,
|
||
'signature_algorithm': sig_alg,
|
||
'days_until_expiry': days_remaining,
|
||
'extensions': extensions
|
||
}
|
||
return None
|
||
|
||
def _test_protocols(target, port):
|
||
protocols = {}
|
||
for proto_name in ['SSLv2', 'SSLv3', 'TLSv1', 'TLSv1.1', 'TLSv1.2', 'TLSv1.3']:
|
||
if proto_name not in TLS_VERSIONS:
|
||
protocols[proto_name] = False
|
||
continue
|
||
try:
|
||
ctx = ssl.SSLContext(TLS_VERSIONS[proto_name])
|
||
ctx.check_hostname = False
|
||
ctx.verify_mode = ssl.CERT_NONE
|
||
with socket.create_connection((target, port), timeout=5) as sock:
|
||
with ctx.wrap_socket(sock, server_hostname=target):
|
||
protocols[proto_name] = True
|
||
except:
|
||
protocols[proto_name] = False
|
||
return protocols
|
||
|
||
def _test_cipher_suites(target, port):
|
||
test_ciphers = [
|
||
'ECDHE-RSA-AES256-GCM-SHA384', 'ECDHE-ECDSA-AES256-GCM-SHA384',
|
||
'ECDHE-RSA-AES256-SHA384', 'ECDHE-ECDSA-AES256-SHA384',
|
||
'ECDHE-RSA-AES256-SHA', 'ECDHE-ECDSA-AES256-SHA',
|
||
'AES256-GCM-SHA384', 'AES256-SHA256', 'AES256-SHA',
|
||
'CAMELLIA256-SHA', 'PSK-AES256-CBC-SHA',
|
||
'ECDHE-RSA-AES128-GCM-SHA256', 'ECDHE-ECDSA-AES128-GCM-SHA256',
|
||
'ECDHE-RSA-AES128-SHA256', 'ECDHE-ECDSA-AES128-SHA256',
|
||
'ECDHE-RSA-AES128-SHA', 'ECDHE-ECDSA-AES128-SHA',
|
||
'AES128-GCM-SHA256', 'AES128-SHA256', 'AES128-SHA',
|
||
'CAMELLIA128-SHA', 'PSK-AES128-CBC-SHA',
|
||
'DES-CBC3-SHA', 'RC4-SHA', 'RC4-MD5'
|
||
]
|
||
supported = []
|
||
for cipher in test_ciphers:
|
||
try:
|
||
ctx = ssl.create_default_context()
|
||
ctx.check_hostname = False
|
||
ctx.verify_mode = ssl.CERT_NONE
|
||
ctx.set_ciphers(cipher)
|
||
with socket.create_connection((target, port), timeout=5) as sock:
|
||
with ctx.wrap_socket(sock, server_hostname=target):
|
||
supported.append(cipher)
|
||
except:
|
||
pass
|
||
return supported
|
||
|
||
def _check_vulnerabilities(protocols, cert_info, supported_ciphers):
|
||
vulns = []
|
||
if protocols.get('SSLv2'):
|
||
vulns.append(('SSLv2 Support', 'CRITICAL'))
|
||
if protocols.get('SSLv3'):
|
||
vulns.append(('SSLv3 Support', 'HIGH'))
|
||
if cert_info and cert_info.get('days_until_expiry', 0) < 30:
|
||
vulns.append(('Certificate Expiring Soon', 'MEDIUM'))
|
||
weak_ciphers = [c for c in supported_ciphers if any(w in c.upper() for w in CIPHER_CATEGORIES['WEAK'])]
|
||
if weak_ciphers:
|
||
vulns.append(('Weak Cipher Suites', 'HIGH'))
|
||
if not protocols.get('TLSv1.2', False):
|
||
vulns.append(('TLS 1.2 Not Supported', 'HIGH'))
|
||
if not protocols.get('TLSv1.3', False):
|
||
vulns.append(('TLS 1.3 Not Supported', 'MEDIUM'))
|
||
return vulns
|
||
|
||
def _calculate_score(protocols, cert_info, supported_ciphers, vulnerabilities):
|
||
score = 100
|
||
if protocols.get('SSLv2'): score -= 30
|
||
if protocols.get('SSLv3'): score -= 20
|
||
if not protocols.get('TLSv1.2'): score -= 15
|
||
if not protocols.get('TLSv1.3'): score -= 10
|
||
if cert_info and cert_info.get('days_until_expiry', 0) < 30: score -= 10
|
||
if cert_info and cert_info.get('days_until_expiry', 0) < 7: score -= 20
|
||
weak_cipher_count = sum(1 for c in supported_ciphers if any(w in c.upper() for w in CIPHER_CATEGORIES['WEAK']))
|
||
score -= min(weak_cipher_count * 5, 25)
|
||
for name, severity in vulnerabilities:
|
||
if severity == 'CRITICAL': score -= 20
|
||
elif severity == 'HIGH': score -= 15
|
||
elif severity == 'MEDIUM': score -= 10
|
||
return max(0, score)
|
||
|
||
def _generate_recommendations(protocols, cert_info, supported_ciphers, score):
|
||
recs = []
|
||
if protocols.get('SSLv2'): recs.append("🔴 Disable SSLv2")
|
||
if protocols.get('SSLv3'): recs.append("🔴 Disable SSLv3")
|
||
if not protocols.get('TLSv1.3'): recs.append("🟡 Enable TLSv1.3")
|
||
if cert_info and cert_info.get('days_until_expiry', 0) < 30:
|
||
recs.append("🟡 Renew certificate")
|
||
weak_ciphers = [c for c in supported_ciphers if any(w in c.upper() for w in CIPHER_CATEGORIES['WEAK'])]
|
||
if weak_ciphers:
|
||
recs.append("🔴 Remove weak ciphers")
|
||
if score < 80:
|
||
recs.append("🛡️ Improve TLS configuration")
|
||
if not any('ECDHE' in c for c in supported_ciphers):
|
||
recs.append("🟡 Enable Forward Secrecy")
|
||
return recs
|
||
|
||
async def perform_ssl_scan(room, bot, target, port):
|
||
safe_target = html_escape(target)
|
||
await bot.api.send_text_message(room.room_id, f"🔍 Starting SSL/TLS scan for {safe_target}:{port}...")
|
||
|
||
if not await _run_blocking(_test_connectivity, target, port):
|
||
await bot.api.send_text_message(room.room_id, f"❌ Cannot connect to {safe_target}:{port}")
|
||
return
|
||
|
||
cert_task = _run_blocking(_get_certificate_info, target, port)
|
||
proto_task = _run_blocking(_test_protocols, target, port)
|
||
cipher_task = _run_blocking(_test_cipher_suites, target, port)
|
||
|
||
cert_info, protocols, supported_ciphers = await asyncio.gather(cert_task, proto_task, cipher_task)
|
||
|
||
vulnerabilities = _check_vulnerabilities(protocols, cert_info, supported_ciphers)
|
||
score = _calculate_score(protocols, cert_info, supported_ciphers, vulnerabilities)
|
||
recommendations = _generate_recommendations(protocols, cert_info, supported_ciphers, score)
|
||
|
||
sections = []
|
||
|
||
# Score
|
||
score_emoji = "🟢" if score >= 90 else "🟡" if score >= 80 else "🟠" if score >= 60 else "🔴"
|
||
rating = "Excellent" if score >= 90 else "Good" if score >= 80 else "Fair" if score >= 60 else "Poor"
|
||
sections.append({"title": f"{score_emoji} Security Score", "rows": [("", "Score", f"{score}/100 ({rating})")]})
|
||
|
||
# Certificate
|
||
if cert_info:
|
||
cert_rows = [
|
||
("📜", "Subject", cert_info['subject'].get('common_name', 'N/A')),
|
||
("🏢", "Issuer", cert_info['issuer'].get('common_name', 'N/A')),
|
||
("📅", "Valid Until", cert_info['not_after']),
|
||
("⏳", "Expires In", f"{cert_info['days_until_expiry']} days"),
|
||
]
|
||
sections.append({"title": "📜 Certificate", "rows": cert_rows})
|
||
|
||
# Protocols
|
||
proto_rows = []
|
||
for proto in ['SSLv2', 'SSLv3', 'TLSv1', 'TLSv1.1', 'TLSv1.2', 'TLSv1.3']:
|
||
supported = protocols.get(proto, False)
|
||
if proto in ['SSLv2', 'SSLv3'] and supported:
|
||
emoji = "🔴"
|
||
elif proto == 'TLSv1.3' and supported:
|
||
emoji = "✅"
|
||
else:
|
||
emoji = "✅" if supported else "❌"
|
||
status = "Supported" if supported else "Not Supported"
|
||
if proto in ['SSLv2', 'SSLv3'] and proto not in TLS_VERSIONS:
|
||
status = "Cannot test"
|
||
emoji = "⚫"
|
||
proto_rows.append((emoji, proto, status))
|
||
sections.append({"title": "🔌 Protocols", "rows": proto_rows})
|
||
|
||
# Cipher Suites
|
||
weak_ciphers = [c for c in supported_ciphers if any(w in c.upper() for w in CIPHER_CATEGORIES['WEAK'])]
|
||
strong_ciphers = [c for c in supported_ciphers if any(s in c.upper() for s in CIPHER_CATEGORIES['STRONG'])]
|
||
cipher_rows = [("🔢", "Total Supported", str(len(supported_ciphers)))]
|
||
if weak_ciphers:
|
||
cipher_rows.append(("🔴", "Weak Ciphers", str(len(weak_ciphers))))
|
||
for c in weak_ciphers[:3]:
|
||
cipher_rows.append(("", "", c))
|
||
if strong_ciphers:
|
||
cipher_rows.append(("🟢", "Strong Ciphers", str(len(strong_ciphers))))
|
||
sections.append({"title": "🔐 Cipher Suites", "rows": cipher_rows})
|
||
|
||
# Vulnerabilities
|
||
if vulnerabilities:
|
||
vuln_rows = []
|
||
for name, sev in vulnerabilities:
|
||
sev_emoji = "🔴" if sev == 'CRITICAL' else "🟠" if sev == 'HIGH' else "🟡"
|
||
vuln_rows.append((sev_emoji, name, sev))
|
||
sections.append({"title": "⚠️ Vulnerabilities", "rows": vuln_rows})
|
||
|
||
# Recommendations
|
||
if recommendations:
|
||
rec_rows = [("💡", "Recommendation", rec) for rec in recommendations]
|
||
sections.append({"title": "💡 Recommendations", "rows": rec_rows})
|
||
|
||
# Quick Assessment
|
||
assessment_rows = []
|
||
if score >= 90:
|
||
assessment_rows = [("", "Assessment", "✅ Excellent configuration")]
|
||
elif score >= 70:
|
||
assessment_rows = [("", "Assessment", "⚠️ Good, minor improvements possible")]
|
||
else:
|
||
assessment_rows = [("", "Assessment", "🚨 Significant issues found")]
|
||
sections.append({"title": "📊 Quick Assessment", "rows": assessment_rows})
|
||
|
||
block = code_block(f"🔐 SSL/TLS Scan: {safe_target}:{port}", sections)
|
||
output = collapsible_summary(f"🔐 SSL/TLS: {safe_target} (Score: {score}/100)", block)
|
||
await bot.api.send_markdown_message(room.room_id, output)
|
||
logging.info(f"Completed SSL scan for {target}:{port}")
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Plugin Metadata
|
||
# ---------------------------------------------------------------------------
|
||
__version__ = "1.0.2"
|
||
__author__ = "Funguy Bot"
|
||
__description__ = "SSL/TLS security scanner"
|
||
__help__ = """
|
||
<details>
|
||
<summary><strong>!sslscan</strong> – SSL/TLS analysis</summary>
|
||
<p><code>!sslscan <domain[:port]></code> – Tests protocols, cipher suites, certificate validity, vulnerabilities.<br>
|
||
Provides a security score (0-100) and actionable recommendations.</p>
|
||
</details>
|
||
"""
|