""" This plugin provides comprehensive HTTP security header analysis. """ import logging import requests import simplematrixbotlib as botlib from urllib.parse import urlparse import ssl import socket async def handle_command(room, message, bot, prefix, config): """ Function to handle !headers command for HTTP security header analysis. Args: room (Room): The Matrix room where the command was invoked. message (RoomMessage): The message object containing the command. bot (Bot): The bot object. prefix (str): The command prefix. config (dict): Configuration parameters. Returns: None """ match = botlib.MessageMatch(room, message, bot, prefix) if match.is_not_from_this_bot() and match.prefix() and match.command("headers"): logging.info("Received !headers command") args = match.args() if len(args) < 1: await show_usage(room, bot) return url = args[0].strip() # Add protocol if missing if not url.startswith(('http://', 'https://')): url = 'https://' + url await analyze_headers(room, bot, url) async def show_usage(room, bot): """Display headers command usage.""" usage = """ 🔒 HTTP Security Headers Analysis !headers <url> - Comprehensive HTTP security header analysis Examples:!headers example.com!headers https://github.com!headers http://localhost:8080 Analyzes: • Security headers presence and configuration • SSL/TLS certificate information • HTTP to HTTPS redirects • Security scoring and recommendations """ await bot.api.send_markdown_message(room.room_id, usage) async def analyze_headers(room, bot, url): """Perform comprehensive HTTP security header analysis.""" try: await bot.api.send_text_message(room.room_id, f"🔍 Analyzing security headers for: {url}") results = { 'url': url, 'http_headers': {}, 'https_headers': {}, 'redirect_chain': [], 'ssl_info': {}, 'security_score': 0, 'recommendations': [] } # Test HTTP first (if HTTPS was provided, we'll still check redirects) parsed = urlparse(url) http_url = f"http://{parsed.netloc or parsed.path}" https_url = f"https://{parsed.netloc or parsed.path}" # Analyze HTTP response and redirects await analyze_http_response(results, http_url if not url.startswith('https://') else https_url) # Analyze HTTPS response if url.startswith('https://') or results.get('redirects_to_https'): await analyze_https_response(results, https_url) # Analyze SSL certificate if HTTPS if url.startswith('https://') or results.get('redirects_to_https'): await analyze_ssl_certificate(results, parsed.netloc or parsed.path) # Calculate security score await calculate_security_score(results) # Generate recommendations await generate_recommendations(results) # Format and send results output = await format_header_analysis(results) await bot.api.send_markdown_message(room.room_id, output) logging.info(f"Completed header analysis for {url}") except Exception as e: await bot.api.send_text_message(room.room_id, f"Error analyzing headers: {str(e)}") logging.error(f"Error in analyze_headers: {e}") async def analyze_http_response(results, url): """Analyze HTTP response and redirect chain.""" try: session = requests.Session() session.max_redirects = 5 response = session.get(url, timeout=10, allow_redirects=True) results['final_url'] = response.url results['status_code'] = response.status_code results['http_headers'] = dict(response.headers) # Check if redirects to HTTPS results['redirects_to_https'] = response.url.startswith('https://') # Store redirect history results['redirect_chain'] = [{ 'url': resp.url, 'status_code': resp.status_code, 'headers': dict(resp.headers) } for resp in response.history] except requests.exceptions.SSLError: results['ssl_error'] = True except requests.exceptions.RequestException as e: results['http_error'] = str(e) async def analyze_https_response(results, url): """Analyze HTTPS response headers.""" try: response = requests.get(url, timeout=10, allow_redirects=False) results['https_headers'] = dict(response.headers) results['https_status'] = response.status_code except requests.exceptions.RequestException as e: results['https_error'] = str(e) async def analyze_ssl_certificate(results, domain): """Analyze SSL certificate information.""" try: context = ssl.create_default_context() with socket.create_connection((domain, 443), timeout=10) as sock: with context.wrap_socket(sock, server_hostname=domain) as ssock: cert = ssock.getpeercert() results['ssl_info'] = { 'subject': dict(x[0] for x in cert['subject']), 'issuer': dict(x[0] for x in cert['issuer']), 'not_before': cert['notBefore'], 'not_after': cert['notAfter'], 'san': cert.get('subjectAltName', []), 'version': cert.get('version'), 'serial_number': cert.get('serialNumber') } except Exception as e: results['ssl_error'] = str(e) async def calculate_security_score(results): """Calculate overall security score based on headers and configuration.""" score = 100 missing_headers = [] # Critical security headers critical_headers = [ 'Strict-Transport-Security', 'Content-Security-Policy', 'X-Content-Type-Options', 'X-Frame-Options', 'X-XSS-Protection' ] headers = results.get('https_headers') or results.get('http_headers', {}) for header in critical_headers: if header not in headers: score -= 15 missing_headers.append(header) # Check HSTS configuration hsts = headers.get('Strict-Transport-Security', '') if 'max-age=31536000' not in hsts: score -= 10 if 'includeSubDomains' not in hsts: score -= 5 if 'preload' not in hsts: score -= 5 # Check CSP configuration csp = headers.get('Content-Security-Policy', '') if not csp: score -= 10 elif "default-src 'none'" not in csp and "default-src 'self'" not in csp: score -= 5 # Check for insecure headers insecure_headers = ['Server', 'X-Powered-By', 'X-AspNet-Version'] for header in insecure_headers: if header in headers: score -= 5 # Bonus for good practices if headers.get('Referrer-Policy'): score += 5 if headers.get('Feature-Policy') or headers.get('Permissions-Policy'): score += 5 if headers.get('X-Content-Type-Options') == 'nosniff': score += 5 if headers.get('X-Frame-Options') in ['DENY', 'SAMEORIGIN']: score += 5 # HTTPS enforcement bonus if results.get('redirects_to_https'): score += 10 results['security_score'] = max(0, score) results['missing_headers'] = missing_headers async def generate_recommendations(results): """Generate security recommendations based on analysis.""" recommendations = [] headers = results.get('https_headers') or results.get('http_headers', {}) # HSTS recommendations if 'Strict-Transport-Security' not in headers: recommendations.append("🔒 Implement HSTS header with max-age=31536000, includeSubDomains, and preload") else: hsts = headers['Strict-Transport-Security'] if 'max-age=31536000' not in hsts: recommendations.append("🔒 Increase HSTS max-age to 31536000 (1 year)") if 'includeSubDomains' not in hsts: recommendations.append("🔒 Add includeSubDomains to HSTS header") if 'preload' not in hsts: recommendations.append("🔒 Consider adding preload directive to HSTS for browser preloading") # CSP recommendations if 'Content-Security-Policy' not in headers: recommendations.append("🛡️ Implement Content Security Policy to prevent XSS attacks") else: csp = headers['Content-Security-Policy'] if "default-src 'self'" not in csp and "default-src 'none'" not in csp: recommendations.append("🛡️ Restrict CSP default-src to 'self' or specific origins") # Frame options if 'X-Frame-Options' not in headers: recommendations.append("🚫 Add X-Frame-Options header to prevent clickjacking (DENY or SAMEORIGIN)") # Content type options if 'X-Content-Type-Options' not in headers: recommendations.append("📄 Add X-Content-Type-Options: nosniff to prevent MIME type sniffing") # Referrer policy if 'Referrer-Policy' not in headers: recommendations.append("🔗 Implement Referrer-Policy to control referrer information leakage") # Feature policy if 'Feature-Policy' not in headers and 'Permissions-Policy' not in headers: recommendations.append("⚙️ Implement Feature-Policy/Permissions-Policy to restrict browser features") # Remove server information if 'Server' in headers or 'X-Powered-By' in headers: recommendations.append("🕵️ Remove Server and X-Powered-By headers to avoid information disclosure") # HTTPS enforcement if not results.get('redirects_to_https') and not results['url'].startswith('https://'): recommendations.append("🔐 Implement HTTP to HTTPS redirects") results['recommendations'] = recommendations async def format_header_analysis(results): """Format the header analysis results for display.""" output = f"🔒 Security Headers Analysis: {results['url']}

" # Security Score score = results['security_score'] score_emoji = "🟢" if score >= 80 else "🟡" if score >= 60 else "🔴" output += f"{score_emoji} Security Score: {score}/100

" # Basic Information output += "📊 Basic Information
" output += f" • Final URL: {results.get('final_url', 'N/A')}
" output += f" • Status Code: {results.get('status_code', 'N/A')}
" if results.get('redirects_to_https'): output += f" • HTTPS Redirect: ✅ Enforced
" else: output += f" • HTTPS Redirect: ❌ Not enforced
" output += f" • Redirect Chain: {len(results.get('redirect_chain', []))} hops
" output += "
" # Security Headers Analysis headers = results.get('https_headers') or results.get('http_headers', {}) output += "🛡️ Security Headers Analysis
" security_headers = { 'Strict-Transport-Security': ('🔒', 'HSTS - HTTP Strict Transport Security'), 'Content-Security-Policy': ('🛡️', 'CSP - Content Security Policy'), 'X-Frame-Options': ('🚫', 'Clickjacking Protection'), 'X-Content-Type-Options': ('📄', 'MIME Type Sniffing Protection'), 'X-XSS-Protection': ('❌', 'XSS Protection (Deprecated)'), 'Referrer-Policy': ('🔗', 'Referrer Policy'), 'Feature-Policy': ('⚙️', 'Feature Policy'), 'Permissions-Policy': ('🔧', 'Permissions Policy'), } for header, (emoji, description) in security_headers.items(): if header in headers: value = headers[header] if len(value) > 100: value = value[:100] + "..." output += f" • {emoji} {header}: ✅ {value}
" else: output += f" • {emoji} {header}: ❌ Missing
" output += "
" # Other Headers (Information Disclosure) output += "📋 Other Headers
" info_headers = ['Server', 'X-Powered-By', 'X-AspNet-Version'] for header in info_headers: if header in headers: output += f" • 🔍 {header}: {headers[header]}
" output += "
" # SSL Certificate Information (if available) if results.get('ssl_info'): output += "🔐 SSL Certificate
" ssl_info = results['ssl_info'] if ssl_info.get('subject'): output += f" • Subject: {ssl_info['subject'].get('commonName', 'N/A')}
" if ssl_info.get('issuer'): output += f" • Issuer: {ssl_info['issuer'].get('organizationName', 'N/A')}
" if ssl_info.get('not_after'): output += f" • Expires: {ssl_info['not_after']}
" if ssl_info.get('san'): san_count = len([san for san in ssl_info['san'] if san[0] == 'DNS']) output += f" • SAN Entries: {san_count}
" output += "
" # Recommendations if results.get('recommendations'): output += "💡 Security Recommendations
" for rec in results['recommendations'][:8]: # Show first 8 recommendations output += f" • {rec}
" if len(results['recommendations']) > 8: output += f" • ... and {len(results['recommendations']) - 8} more recommendations
" output += "
" # Missing Headers Summary if results.get('missing_headers'): output += "⚠️ Critical Headers Missing
" for header in results['missing_headers']: output += f" • ❌ {header}
" output += "
" # Security Rating score = results['security_score'] if score >= 80: rating = "🟢 Excellent" description = "Strong security headers configuration" elif score >= 60: rating = "🟡 Good" description = "Moderate security, room for improvement" elif score >= 40: rating = "🟠 Fair" description = "Basic security, significant improvements needed" else: rating = "🔴 Poor" description = "Weak security headers configuration" output += f"📈 Security Rating: {rating}
" output += f"📝 Assessment: {description}
" # Wrap in collapsible if content is large if len(output) > 1000: output = f"
🔒 Security Headers Analysis: {results['url']}{output}
" return output