Files
FunguyBot/plugins/sslscan.py
T

343 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Comprehensive SSL/TLS security scanning and analysis.
All blocking socket calls run in a thread pool; user input is sanitised.
Output is a clean code block with aligned columns.
"""
import asyncio
import logging
import socket
import ssl
import OpenSSL
import datetime
import simplematrixbotlib as botlib
from plugins.common import is_public_destination, html_escape, collapsible_summary, code_block
# SSL/TLS configuration handle missing protocols in modern Python
TLS_VERSIONS = {
'TLSv1.2': ssl.PROTOCOL_TLSv1_2,
'TLSv1.3': ssl.PROTOCOL_TLS
}
try:
TLS_VERSIONS['TLSv1.1'] = ssl.PROTOCOL_TLSv1_1
except AttributeError:
pass
try:
TLS_VERSIONS['TLSv1'] = ssl.PROTOCOL_TLSv1
except AttributeError:
pass
CIPHER_CATEGORIES = {
'STRONG': [
'TLS_AES_256_GCM_SHA384', 'TLS_CHACHA20_POLY1305_SHA256',
'TLS_AES_128_GCM_SHA256', 'ECDHE-RSA-AES256-GCM-SHA384',
'ECDHE-ECDSA-AES256-GCM-SHA384', 'ECDHE-RSA-CHACHA20-POLY1305',
'ECDHE-ECDSA-CHACHA20-POLY1305', 'DHE-RSA-AES256-GCM-SHA384'
],
'WEAK': ['RC4', 'DES', '3DES', 'MD5', 'EXPORT', 'NULL', 'ANON', 'ADH', 'CBC'],
}
async def handle_command(room, message, bot, prefix, config):
match = botlib.MessageMatch(room, message, bot, prefix)
if match.is_not_from_this_bot() and match.prefix() and match.command("sslscan"):
args = match.args()
if len(args) < 1:
await show_usage(room, bot)
return
target = args[0].strip()
port = 443
if ':' in target:
parts = target.split(':')
target = parts[0]
try:
port = int(parts[1])
except ValueError:
await bot.api.send_text_message(room.room_id, "Invalid port number")
return
if not is_public_destination(target):
await bot.api.send_text_message(room.room_id, "❌ Scanning of private/internal addresses is not allowed.")
return
await perform_ssl_scan(room, bot, target, port)
async def show_usage(room, bot):
usage = """<strong>🔐 SSL/TLS Security Scanner</strong>
<strong>!sslscan &lt;domain[:port]&gt;</strong> - Comprehensive SSL/TLS security analysis
<strong>Examples:</strong>
• <code>!sslscan example.com</code>
• <code>!sslscan github.com:443</code>
<strong>Tests Performed:</strong>
• SSL/TLS protocol support and versions
• Certificate chain validation and expiration
• Cipher suite strength and configuration
• Security headers and extensions
• Handshake simulation and vulnerabilities
• PCI DSS compliance checking
• SSL/TLS best practices assessment
"""
await bot.api.send_markdown_message(room.room_id, usage)
async def _run_blocking(func, *args, **kwargs):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, lambda: func(*args, **kwargs))
def _test_connectivity(target, port):
try:
with socket.create_connection((target, port), timeout=10):
return True
except:
return False
def _get_certificate_info(target, port):
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
with socket.create_connection((target, port), timeout=10) as sock:
with context.wrap_socket(sock, server_hostname=target) as ssock:
cert_bin = ssock.getpeercert(binary_form=True)
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert_bin)
subject = cert.get_subject()
issuer = cert.get_issuer()
not_before = cert.get_notBefore().decode('utf-8')
not_after = cert.get_notAfter().decode('utf-8')
sig_alg = cert.get_signature_algorithm().decode('utf-8')
not_after_dt = datetime.datetime.strptime(not_after, '%Y%m%d%H%M%SZ')
days_remaining = (not_after_dt - datetime.datetime.utcnow()).days
extensions = []
for i in range(cert.get_extension_count()):
ext = cert.get_extension(i)
extensions.append({
'name': ext.get_short_name().decode('utf-8'),
'value': str(ext)
})
return {
'subject': {
'common_name': subject.CN,
'organization': subject.O,
'organizational_unit': subject.OU,
'country': subject.C,
'state': subject.ST,
'locality': subject.L
},
'issuer': {
'common_name': issuer.CN,
'organization': issuer.O,
'organizational_unit': issuer.OU
},
'serial_number': cert.get_serial_number(),
'version': cert.get_version(),
'not_before': not_before,
'not_after': not_after,
'signature_algorithm': sig_alg,
'days_until_expiry': days_remaining,
'extensions': extensions
}
return None
def _test_protocols(target, port):
protocols = {}
for proto_name in ['SSLv2', 'SSLv3', 'TLSv1', 'TLSv1.1', 'TLSv1.2', 'TLSv1.3']:
if proto_name not in TLS_VERSIONS:
protocols[proto_name] = False
continue
try:
ctx = ssl.SSLContext(TLS_VERSIONS[proto_name])
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
with socket.create_connection((target, port), timeout=5) as sock:
with ctx.wrap_socket(sock, server_hostname=target):
protocols[proto_name] = True
except:
protocols[proto_name] = False
return protocols
def _test_cipher_suites(target, port):
test_ciphers = [
'ECDHE-RSA-AES256-GCM-SHA384', 'ECDHE-ECDSA-AES256-GCM-SHA384',
'ECDHE-RSA-AES256-SHA384', 'ECDHE-ECDSA-AES256-SHA384',
'ECDHE-RSA-AES256-SHA', 'ECDHE-ECDSA-AES256-SHA',
'AES256-GCM-SHA384', 'AES256-SHA256', 'AES256-SHA',
'CAMELLIA256-SHA', 'PSK-AES256-CBC-SHA',
'ECDHE-RSA-AES128-GCM-SHA256', 'ECDHE-ECDSA-AES128-GCM-SHA256',
'ECDHE-RSA-AES128-SHA256', 'ECDHE-ECDSA-AES128-SHA256',
'ECDHE-RSA-AES128-SHA', 'ECDHE-ECDSA-AES128-SHA',
'AES128-GCM-SHA256', 'AES128-SHA256', 'AES128-SHA',
'CAMELLIA128-SHA', 'PSK-AES128-CBC-SHA',
'DES-CBC3-SHA', 'RC4-SHA', 'RC4-MD5'
]
supported = []
for cipher in test_ciphers:
try:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
ctx.set_ciphers(cipher)
with socket.create_connection((target, port), timeout=5) as sock:
with ctx.wrap_socket(sock, server_hostname=target):
supported.append(cipher)
except:
pass
return supported
def _check_vulnerabilities(protocols, cert_info, supported_ciphers):
vulns = []
if protocols.get('SSLv2'):
vulns.append(('SSLv2 Support', 'CRITICAL'))
if protocols.get('SSLv3'):
vulns.append(('SSLv3 Support', 'HIGH'))
if cert_info and cert_info.get('days_until_expiry', 0) < 30:
vulns.append(('Certificate Expiring Soon', 'MEDIUM'))
weak_ciphers = [c for c in supported_ciphers if any(w in c.upper() for w in CIPHER_CATEGORIES['WEAK'])]
if weak_ciphers:
vulns.append(('Weak Cipher Suites', 'HIGH'))
if not protocols.get('TLSv1.2', False):
vulns.append(('TLS 1.2 Not Supported', 'HIGH'))
if not protocols.get('TLSv1.3', False):
vulns.append(('TLS 1.3 Not Supported', 'MEDIUM'))
return vulns
def _calculate_score(protocols, cert_info, supported_ciphers, vulnerabilities):
score = 100
if protocols.get('SSLv2'): score -= 30
if protocols.get('SSLv3'): score -= 20
if not protocols.get('TLSv1.2'): score -= 15
if not protocols.get('TLSv1.3'): score -= 10
if cert_info and cert_info.get('days_until_expiry', 0) < 30: score -= 10
if cert_info and cert_info.get('days_until_expiry', 0) < 7: score -= 20
weak_cipher_count = sum(1 for c in supported_ciphers if any(w in c.upper() for w in CIPHER_CATEGORIES['WEAK']))
score -= min(weak_cipher_count * 5, 25)
for name, severity in vulnerabilities:
if severity == 'CRITICAL': score -= 20
elif severity == 'HIGH': score -= 15
elif severity == 'MEDIUM': score -= 10
return max(0, score)
def _generate_recommendations(protocols, cert_info, supported_ciphers, score):
recs = []
if protocols.get('SSLv2'): recs.append("🔴 Disable SSLv2")
if protocols.get('SSLv3'): recs.append("🔴 Disable SSLv3")
if not protocols.get('TLSv1.3'): recs.append("🟡 Enable TLSv1.3")
if cert_info and cert_info.get('days_until_expiry', 0) < 30:
recs.append("🟡 Renew certificate")
weak_ciphers = [c for c in supported_ciphers if any(w in c.upper() for w in CIPHER_CATEGORIES['WEAK'])]
if weak_ciphers:
recs.append("🔴 Remove weak ciphers")
if score < 80:
recs.append("🛡️ Improve TLS configuration")
if not any('ECDHE' in c for c in supported_ciphers):
recs.append("🟡 Enable Forward Secrecy")
return recs
async def perform_ssl_scan(room, bot, target, port):
safe_target = html_escape(target)
await bot.api.send_text_message(room.room_id, f"🔍 Starting SSL/TLS scan for {safe_target}:{port}...")
if not await _run_blocking(_test_connectivity, target, port):
await bot.api.send_text_message(room.room_id, f"❌ Cannot connect to {safe_target}:{port}")
return
cert_task = _run_blocking(_get_certificate_info, target, port)
proto_task = _run_blocking(_test_protocols, target, port)
cipher_task = _run_blocking(_test_cipher_suites, target, port)
cert_info, protocols, supported_ciphers = await asyncio.gather(cert_task, proto_task, cipher_task)
vulnerabilities = _check_vulnerabilities(protocols, cert_info, supported_ciphers)
score = _calculate_score(protocols, cert_info, supported_ciphers, vulnerabilities)
recommendations = _generate_recommendations(protocols, cert_info, supported_ciphers, score)
sections = []
# Score
score_emoji = "🟢" if score >= 90 else "🟡" if score >= 80 else "🟠" if score >= 60 else "🔴"
rating = "Excellent" if score >= 90 else "Good" if score >= 80 else "Fair" if score >= 60 else "Poor"
sections.append({"title": f"{score_emoji} Security Score", "rows": [("", "Score", f"{score}/100 ({rating})")]})
# Certificate
if cert_info:
cert_rows = [
("📜", "Subject", cert_info['subject'].get('common_name', 'N/A')),
("🏢", "Issuer", cert_info['issuer'].get('common_name', 'N/A')),
("📅", "Valid Until", cert_info['not_after']),
("", "Expires In", f"{cert_info['days_until_expiry']} days"),
]
sections.append({"title": "📜 Certificate", "rows": cert_rows})
# Protocols
proto_rows = []
for proto in ['SSLv2', 'SSLv3', 'TLSv1', 'TLSv1.1', 'TLSv1.2', 'TLSv1.3']:
supported = protocols.get(proto, False)
if proto in ['SSLv2', 'SSLv3'] and supported:
emoji = "🔴"
elif proto == 'TLSv1.3' and supported:
emoji = ""
else:
emoji = "" if supported else ""
status = "Supported" if supported else "Not Supported"
if proto in ['SSLv2', 'SSLv3'] and proto not in TLS_VERSIONS:
status = "Cannot test"
emoji = ""
proto_rows.append((emoji, proto, status))
sections.append({"title": "🔌 Protocols", "rows": proto_rows})
# Cipher Suites
weak_ciphers = [c for c in supported_ciphers if any(w in c.upper() for w in CIPHER_CATEGORIES['WEAK'])]
strong_ciphers = [c for c in supported_ciphers if any(s in c.upper() for s in CIPHER_CATEGORIES['STRONG'])]
cipher_rows = [("🔢", "Total Supported", str(len(supported_ciphers)))]
if weak_ciphers:
cipher_rows.append(("🔴", "Weak Ciphers", str(len(weak_ciphers))))
for c in weak_ciphers[:3]:
cipher_rows.append(("", "", c))
if strong_ciphers:
cipher_rows.append(("🟢", "Strong Ciphers", str(len(strong_ciphers))))
sections.append({"title": "🔐 Cipher Suites", "rows": cipher_rows})
# Vulnerabilities
if vulnerabilities:
vuln_rows = []
for name, sev in vulnerabilities:
sev_emoji = "🔴" if sev == 'CRITICAL' else "🟠" if sev == 'HIGH' else "🟡"
vuln_rows.append((sev_emoji, name, sev))
sections.append({"title": "⚠️ Vulnerabilities", "rows": vuln_rows})
# Recommendations
if recommendations:
rec_rows = [("💡", "Recommendation", rec) for rec in recommendations]
sections.append({"title": "💡 Recommendations", "rows": rec_rows})
# Quick Assessment
assessment_rows = []
if score >= 90:
assessment_rows = [("", "Assessment", "✅ Excellent configuration")]
elif score >= 70:
assessment_rows = [("", "Assessment", "⚠️ Good, minor improvements possible")]
else:
assessment_rows = [("", "Assessment", "🚨 Significant issues found")]
sections.append({"title": "📊 Quick Assessment", "rows": assessment_rows})
block = code_block(f"🔐 SSL/TLS Scan: {safe_target}:{port}", sections)
output = collapsible_summary(f"🔐 SSL/TLS: {safe_target} (Score: {score}/100)", block)
await bot.api.send_markdown_message(room.room_id, output)
logging.info(f"Completed SSL scan for {target}:{port}")
# ---------------------------------------------------------------------------
# Plugin Metadata
# ---------------------------------------------------------------------------
__version__ = "1.0.2"
__author__ = "Funguy Bot"
__description__ = "SSL/TLS security scanner"
__help__ = """
<details>
<summary><strong>!sslscan</strong> SSL/TLS analysis</summary>
<p><code>!sslscan &lt;domain[:port]&gt;</code> Tests protocols, cipher suites, certificate validity, vulnerabilities.<br>
Provides a security score (0-100) and actionable recommendations.</p>
</details>
"""