Files
FunguyBot/plugins/headers.py
T

209 lines
8.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
HTTP security header analysis plugin.
Outputs a structured code block with perfectly aligned columns.
"""
import logging
import aiohttp
import asyncio
import simplematrixbotlib as botlib
from urllib.parse import urlparse
import ssl
import socket
import datetime
from plugins.common import is_public_destination, html_escape, collapsible_summary, code_block
async def _run_in_thread(func, *args, **kwargs):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, lambda: func(*args, **kwargs))
async def analyze_http_response(url):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, allow_redirects=True, timeout=aiohttp.ClientTimeout(total=10)) as resp:
return str(resp.url), resp.status, dict(resp.headers), resp.url.scheme == 'https'
except aiohttp.ClientError as e:
logging.warning(f"HTTP analysis error: {e}")
return url, None, {}, False
async def analyze_https_response(url):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, allow_redirects=False, timeout=aiohttp.ClientTimeout(total=10)) as resp:
return resp.status, dict(resp.headers)
except aiohttp.ClientError as e:
logging.warning(f"HTTPS analysis error: {e}")
return None, {}
def _get_cert_info(domain):
try:
context = ssl.create_default_context()
with socket.create_connection((domain, 443), timeout=10) as sock:
with context.wrap_socket(sock, server_hostname=domain) as ssock:
cert = ssock.getpeercert()
return {
'subject': dict(x[0] for x in cert['subject']),
'issuer': dict(x[0] for x in cert['issuer']),
'not_before': cert['notBefore'],
'not_after': cert['notAfter'],
'san': cert.get('subjectAltName', []),
}
except Exception as e:
logging.warning(f"SSL cert error: {e}")
return None
def calculate_score(headers, redirects_to_https, cert_info):
score = 100
if 'Strict-Transport-Security' not in headers: score -= 15
if 'Content-Security-Policy' not in headers: score -= 15
if 'X-Content-Type-Options' not in headers: score -= 15
if 'X-Frame-Options' not in headers: score -= 15
if 'X-XSS-Protection' not in headers: score -= 15
hsts = headers.get('Strict-Transport-Security', '')
if 'max-age=31536000' not in hsts: score -= 10
if 'includeSubDomains' not in hsts: score -= 5
if 'preload' not in hsts: score -= 5
if headers.get('Referrer-Policy'): score += 5
if headers.get('Feature-Policy') or headers.get('Permissions-Policy'): score += 5
if headers.get('X-Content-Type-Options') == 'nosniff': score += 5
if headers.get('X-Frame-Options') in ['DENY', 'SAMEORIGIN']: score += 5
if redirects_to_https: score += 10
if cert_info and cert_info.get('not_after'):
try:
expires = datetime.datetime.strptime(cert_info['not_after'], '%b %d %H:%M:%S %Y %Z')
if (expires - datetime.datetime.utcnow()).days < 30: score -= 10
except: pass
return max(0, score)
def generate_recommendations(headers, redirects_to_https):
recs = []
if 'Strict-Transport-Security' not in headers:
recs.append("🔒 Implement HSTS with max-age=31536000, includeSubDomains, preload")
if 'Content-Security-Policy' not in headers:
recs.append("🛡️ Add Content-Security-Policy")
if 'X-Frame-Options' not in headers:
recs.append("🚫 Add X-Frame-Options (DENY or SAMEORIGIN)")
if 'X-Content-Type-Options' not in headers:
recs.append("📄 Add X-Content-Type-Options: nosniff")
if not redirects_to_https:
recs.append("🔐 Redirect HTTP to HTTPS")
if 'Server' in headers or 'X-Powered-By' in headers:
recs.append("🕵️ Remove info disclosure headers (Server, X-Powered-By)")
return recs
async def handle_command(room, message, bot, prefix, config):
match = botlib.MessageMatch(room, message, bot, prefix)
if not (match.is_not_from_this_bot() and match.prefix() and match.command("headers")):
return
args = match.args()
if len(args) < 1:
await bot.api.send_markdown_message(room.room_id,
"<strong>🔒 HTTP Security Headers Analysis</strong>\n<code>!headers &lt;url&gt;</code>")
return
original_input = args[0].strip()
url = original_input
if not url.startswith(('http://', 'https://')):
url = 'https://' + url
parsed = urlparse(url)
host = parsed.hostname
if not is_public_destination(host):
await bot.api.send_text_message(room.room_id, "❌ Private/internal addresses are not allowed.")
return
safe_input = html_escape(original_input)
safe_host = html_escape(host)
await bot.api.send_text_message(room.room_id, f"🔍 Analyzing security headers for: {safe_input}...")
final_url, status_code, http_headers, redirects_to_https = await analyze_http_response(url)
_, https_headers = await analyze_https_response(url) if url.startswith('https://') else (None, {})
headers = https_headers or http_headers
cert_info = None
if url.startswith('https://'):
cert_info = await _run_in_thread(_get_cert_info, host)
score = calculate_score(headers, redirects_to_https, cert_info)
recommendations = generate_recommendations(headers, redirects_to_https)
sections = []
# Score
score_emoji = "🟢" if score >= 80 else "🟡" if score >= 60 else "🔴"
sections.append({
"title": f"{score_emoji} Security Score",
"rows": [("", "Score", f"{score}/100")]
})
# Basic Information
basic_rows = [
("🌐", "Final URL", final_url),
("📊", "Status Code", str(status_code) if status_code else "N/A"),
("🔐", "HTTPS Redirect", "✅ Yes" if redirects_to_https else "❌ No"),
]
sections.append({"title": "📊 Basic Information", "rows": basic_rows})
# Security Headers
security_headers = {
'Strict-Transport-Security': ('🔒', 'HSTS'),
'Content-Security-Policy': ('🛡️', 'CSP'),
'X-Frame-Options': ('🚫', 'Frame Options'),
'X-Content-Type-Options': ('📄', 'Content Type'),
'X-XSS-Protection': ('', 'XSS Protection'),
'Referrer-Policy': ('🔗', 'Referrer Policy'),
'Permissions-Policy': ('🔧', 'Permissions Policy'),
'Feature-Policy': ('⚙️', 'Feature Policy'),
}
header_rows = []
for hdr, (emoji, label) in security_headers.items():
if hdr in headers:
val = headers[hdr][:100]
header_rows.append((emoji, label, f"{val}"))
else:
header_rows.append((emoji, label, "❌ Missing"))
sections.append({"title": "🛡️ Security Headers", "rows": header_rows})
# Other Headers
other_rows = []
for hdr in ['Server', 'X-Powered-By']:
if hdr in headers:
other_rows.append(("🔍", hdr, headers[hdr]))
if other_rows:
sections.append({"title": "📋 Other Headers", "rows": other_rows})
# SSL Certificate
if cert_info:
ssl_rows = [
("📜", "Subject", cert_info['subject'].get('commonName', 'N/A')),
("🏢", "Issuer", cert_info['issuer'].get('organizationName', 'N/A')),
("📅", "Expires", cert_info.get('not_after', 'N/A')),
]
san = [san[1] for san in cert_info.get('san', []) if san[0] == 'DNS']
if san:
ssl_rows.append(("🌐", "SANs", ", ".join(san[:5])))
sections.append({"title": "🔐 SSL Certificate", "rows": ssl_rows})
# Recommendations
if recommendations:
rec_rows = [("💡", "Recommendation", rec) for rec in recommendations]
sections.append({"title": "💡 Recommendations", "rows": rec_rows})
block = code_block(f"🔒 Security Headers: {safe_host}", sections)
output = collapsible_summary(f"🔒 Headers: {safe_host}", block)
await bot.api.send_markdown_message(room.room_id, output)
# ---------------------------------------------------------------------------
# Plugin Metadata
# ---------------------------------------------------------------------------
__version__ = "1.1.2"
__author__ = "Funguy Bot"
__description__ = "HTTP security header analysis"
__help__ = """
<details>
<summary><strong>!headers</strong> HTTP security headers analysis</summary>
<p><code>!headers &lt;url&gt;</code> Analyzes security headers, SSL cert, gives score and recommendations in a clean, aligned table.</p>
</details>
"""