diff --git a/plugins/common.py b/plugins/common.py new file mode 100644 index 0000000..797ccc2 --- /dev/null +++ b/plugins/common.py @@ -0,0 +1,57 @@ +""" +Shared utilities for FunguyBot plugins. +""" +import html +import ipaddress +import socket +import logging + +logger = logging.getLogger(__name__) + +# Networks considered unsafe for outbound connections +_PRIVATE_RANGES = [ + ipaddress.ip_network('10.0.0.0/8'), + ipaddress.ip_network('172.16.0.0/12'), + ipaddress.ip_network('192.168.0.0/16'), + ipaddress.ip_network('127.0.0.0/8'), + ipaddress.ip_network('169.254.0.0/16'), + ipaddress.ip_network('0.0.0.0/8'), + ipaddress.ip_network('::1/128'), + ipaddress.ip_network('fc00::/7'), + ipaddress.ip_network('fe80::/10'), + ipaddress.ip_network('::/128'), +] + +def html_escape(text: str) -> str: + """Escape HTML special characters for safe embedding in messages.""" + return html.escape(str(text), quote=False) + +def collapsible_summary(title: str, body: str, expanded: bool = False) -> str: + """Wrap content in a collapsible HTML details block.""" + open_attr = ' open' if expanded else '' + return f"\n{title}\n{body}\n" + +def is_public_destination(target: str) -> bool: + """ + Returns True if `target` (hostname or IP) does NOT resolve to any + private, loopback, or link‑local address. + """ + try: + addr = ipaddress.ip_address(target) + if any(addr in net for net in _PRIVATE_RANGES): + return False + return True + except ValueError: + pass + + try: + addrinfo = socket.getaddrinfo(target, None) + for _, _, _, _, sockaddr in addrinfo: + ip = sockaddr[0] + addr = ipaddress.ip_address(ip) + if any(addr in net for net in _PRIVATE_RANGES): + return False + return True + except Exception as e: + logger.warning(f"Cannot resolve {target}: {e}") + return False diff --git a/plugins/encode.py b/plugins/encode.py new file mode 100644 index 0000000..d951dad --- /dev/null +++ b/plugins/encode.py @@ -0,0 +1,1203 @@ +#!/usr/bin/env python3 +""" +Funguy Bot - CyberChef-style plugin (encode.py) +================================================ +A production-ready, offline, modular text & data manipulation plugin. +Provides a wide range of encoding, cryptographic, compression, forensic +and networking operations – all controllable via the !encode command. + +Dependencies: + Standard library + Optional (if missing, related operations will gracefully report an error): + cryptography (AES, RSA, ChaCha20, PEM/DER, SHA3/HMAC variants) + bcrypt (bcrypt hashing) + argon2-cffi (Argon2 hashing) + yara-python (YARA scanning) + asn1crypto (ASN.1 parsing) + PyYAML (YAML formatting) + lxml (better XML formatting, fallback to xml.etree) + +Author: Funguy Bot +Version: 2.0.1 +""" + +import asyncio +import base64 +import binascii +import codecs +import collections +import csv +import gzip +import io +import ipaddress +import json +import logging +import lzma +import math +import os +import re +import struct +import time +import urllib.parse +import xml.etree.ElementTree as ET +import zlib +from typing import Any, Callable, Dict, List, Optional, Tuple, Union + +# ---------- Optional dependency detection ---------- +try: + from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + from cryptography.hazmat.primitives import hashes, hmac, padding, serialization + from cryptography.hazmat.primitives.asymmetric import rsa, padding as asym_padding + from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC + from cryptography.hazmat.primitives.kdf.scrypt import Scrypt + from cryptography.hazmat.primitives import constant_time + HAS_CRYPTOGRAPHY = True +except ImportError: + HAS_CRYPTOGRAPHY = False + +try: + import bcrypt + HAS_BCRYPT = True +except ImportError: + HAS_BCRYPT = False + +try: + import argon2 + HAS_ARGON2 = True +except ImportError: + HAS_ARGON2 = False + +try: + import yara + HAS_YARA = True +except ImportError: + HAS_YARA = False + +try: + import asn1crypto.core as asn1core + import asn1crypto.x509 as asn1x509 + HAS_ASN1 = True +except ImportError: + HAS_ASN1 = False + +try: + import yaml + HAS_YAML = True +except ImportError: + HAS_YAML = False + +try: + from lxml import etree as lxml_etree + HAS_LXML = True +except ImportError: + HAS_LXML = False + +# For HMAC with stdlib +try: + import hashlib + import hmac as stdlib_hmac +except ImportError: + pass + +# ---------- Constants ---------- +MAX_INPUT_SIZE = 1 * 1024 * 1024 # 1 MB +DEFAULT_TIMEOUT = 10 # seconds +RECIPE_MARKER = "recipe" + +# ---------- Operation registry ---------- +class Operation: + """Describes a single processing operation.""" + __slots__ = ("name", "description", "category", + "arg_names", "func", "timeout") + def __init__(self, name: str, description: str, category: str, + arg_names: List[str], func: Callable, timeout: int = DEFAULT_TIMEOUT): + self.name = name + self.description = description + self.category = category + self.arg_names = arg_names # positional arguments before the data + self.func = func + self.timeout = timeout + +OPERATIONS: Dict[str, Operation] = {} + +def register_op(name: str, description: str, category: str, + arg_names: Optional[List[str]] = None, timeout: int = DEFAULT_TIMEOUT): + """Decorator to register an operation.""" + def decorator(func: Callable): + if not asyncio.iscoroutinefunction(func): + # Wrap sync functions + async def wrapper(*args, **kwargs): + return await asyncio.to_thread(func, *args, **kwargs) + func_to_reg = wrapper + else: + func_to_reg = func + OPERATIONS[name] = Operation(name, description, category, + arg_names or [], func_to_reg, timeout) + return func_to_reg + return decorator + +# ---------- Helper utilities ---------- +def validate_input(data: str, max_size: int = MAX_INPUT_SIZE) -> bytes: + """Convert string to bytes with size limit. Returns raw bytes.""" + if len(data) > max_size: + raise ValueError(f"Input too long ({len(data)} bytes); max {max_size}") + return data.encode("utf-8", errors="replace") + +def bytes_to_safe_string(b: bytes) -> str: + """Return printable representation, using hex if necessary.""" + try: + text = b.decode("utf-8", errors="strict") + if all(32 <= ord(c) <= 126 or c in '\n\r\t' for c in text): + return text + except UnicodeDecodeError: + pass + return b.hex() + +def shannon_entropy(data: bytes) -> float: + """Calculate Shannon entropy of byte sequence.""" + if not data: + return 0.0 + freq = collections.Counter(data) + length = len(data) + entropy = 0.0 + for count in freq.values(): + prob = count / length + if prob > 0: + entropy -= prob * math.log2(prob) + return entropy + +# ---------- Base encoding/decoding operations ---------- +@register_op("base64", "Base64 encode / decode", "Encoding", arg_names=["subcmd"]) +async def op_base64(subcmd: str, text: str) -> str: + sub = subcmd.lower() + raw = validate_input(text) + if sub in ("encode", "enc", "e"): + return base64.b64encode(raw).decode() + elif sub in ("decode", "dec", "d"): + try: + return base64.b64decode(raw).decode("utf-8", errors="replace") + except Exception as e: + return f"Base64 decode error: {e}" + else: + raise ValueError("Use 'encode' or 'decode'") + +@register_op("base32", "Base32 encode / decode", "Encoding", arg_names=["subcmd"]) +async def op_base32(subcmd: str, text: str) -> str: + sub = subcmd.lower() + raw = validate_input(text) + if sub in ("encode", "enc", "e"): + return base64.b32encode(raw).decode() + elif sub in ("decode", "dec", "d"): + try: + # Add padding automatically if missing + processed = raw.rstrip(b"=") + processed += b"=" * ((8 - len(processed) % 8) % 8) + return base64.b32decode(processed).decode("utf-8", errors="replace") + except Exception as e: + return f"Base32 decode error: {e}" + else: + raise ValueError("Use 'encode' or 'decode'") + +@register_op("hex", "Hex encode / decode", "Encoding", arg_names=["subcmd"]) +async def op_hex(subcmd: str, text: str) -> str: + sub = subcmd.lower() + raw = validate_input(text) + if sub in ("encode", "enc", "e"): + return raw.hex() + elif sub in ("decode", "dec", "d"): + try: + clean = text.replace(" ", "").replace("\n", "") + return bytes.fromhex(clean).decode("utf-8", errors="replace") + except Exception as e: + return f"Hex decode error: {e}" + else: + raise ValueError("Use 'encode' or 'decode'") + +@register_op("url", "URL encode / decode", "Encoding", arg_names=["subcmd"]) +async def op_url(subcmd: str, text: str) -> str: + sub = subcmd.lower() + if sub in ("encode", "enc", "e"): + return urllib.parse.quote(text, safe="") + elif sub in ("decode", "dec", "d"): + return urllib.parse.unquote(text) + else: + raise ValueError("Use 'encode' or 'decode'") + +@register_op("html", "HTML entity encode / decode", "Encoding", arg_names=["subcmd"]) +async def op_html(subcmd: str, text: str) -> str: + sub = subcmd.lower() + if sub in ("encode", "enc", "e"): + return text.replace("&", "&").replace("<", "<").replace(">", ">").replace('"', """).replace("'", "'") + elif sub in ("decode", "dec", "d"): + import html + return html.unescape(text) + else: + raise ValueError("Use 'encode' or 'decode'") + +@register_op("unicode", "Unicode escape / unescape", "Encoding", arg_names=["subcmd"]) +async def op_unicode(subcmd: str, text: str) -> str: + sub = subcmd.lower() + if sub in ("encode", "enc", "e"): + return text.encode("unicode_escape").decode("utf-8") + elif sub in ("decode", "dec", "d"): + return text.encode().decode("unicode_escape") + else: + raise ValueError("Use 'encode' or 'decode'") + +@register_op("binary", "Text to binary string", "Encoding", arg_names=["subcmd"]) +async def op_binary(subcmd: str, text: str) -> str: + sub = subcmd.lower() + if sub in ("encode", "enc", "e"): + return ' '.join(format(ord(c), '08b') for c in text) + elif sub in ("decode", "dec", "d"): + try: + bin_strs = text.split() + chars = [chr(int(b, 2)) for b in bin_strs if set(b).issubset({"0", "1"})] + return ''.join(chars) + except Exception: + return "Invalid binary string" + else: + raise ValueError("Use 'encode' or 'decode'") + +@register_op("rot13", "Apply ROT13 (Caesar cipher)", "Encoding") +async def op_rot13(text: str) -> str: + return codecs.encode(text, 'rot13') + +@register_op("morse", "Morse code encode / decode", "Encoding", arg_names=["subcmd"]) +async def op_morse(subcmd: str, text: str) -> str: + MORSE_ENCODE = { + 'A': '.-', 'B': '-...', 'C': '-.-.', 'D': '-..', 'E': '.', 'F': '..-.', + 'G': '--.', 'H': '....', 'I': '..', 'J': '.---', 'K': '-.-', 'L': '.-..', + 'M': '--', 'N': '-.', 'O': '---', 'P': '.--.', 'Q': '--.-', 'R': '.-.', + 'S': '...', 'T': '-', 'U': '..-', 'V': '...-', 'W': '.--', 'X': '-..-', + 'Y': '-.--', 'Z': '--..', + '0': '-----', '1': '.----', '2': '..---', '3': '...--', '4': '....-', + '5': '.....', '6': '-....', '7': '--...', '8': '---..', '9': '----.', + ' ': '/' + } + MORSE_DECODE = {v: k for k, v in MORSE_ENCODE.items()} + sub = subcmd.lower() + if sub in ("encode", "enc", "e"): + return ' '.join(MORSE_ENCODE.get(c.upper(), '?') for c in text) + elif sub in ("decode", "dec", "d"): + try: + symbols = text.strip().split() + return ''.join(MORSE_DECODE.get(s, '?') for s in symbols) + except Exception: + return "Invalid Morse code" + else: + raise ValueError("Use 'encode' or 'decode'") + +# ---------- Cryptography ---------- +@register_op("xor", "XOR with a single-byte key (hex)", "Cryptography", arg_names=["key_hex"]) +async def op_xor(key_hex: str, text: str) -> str: + try: + key = int(key_hex, 16) & 0xFF + except ValueError: + raise ValueError("Key must be a hex byte (00-FF)") + raw = validate_input(text) + result = bytes(b ^ key for b in raw) + return bytes_to_safe_string(result) + +@register_op("aes", "AES encrypt/decrypt (CBC mode, PKCS7, key+IV from hex)", "Cryptography", + arg_names=["subcmd", "key_hex", "iv_hex"]) +async def op_aes(subcmd: str, key_hex: str, iv_hex: str, text: str) -> str: + if not HAS_CRYPTOGRAPHY: + return "Error: cryptography library not installed" + try: + key = bytes.fromhex(key_hex) + iv = bytes.fromhex(iv_hex) + if len(key) not in (16, 24, 32): + raise ValueError("AES key must be 16/24/32 bytes (32/48/64 hex chars)") + if len(iv) != 16: + raise ValueError("IV must be 16 bytes (32 hex chars)") + except Exception as e: + return f"Key/IV error: {e}" + algorithm = algorithms.AES(key) + cipher = Cipher(algorithm, modes.CBC(iv)) + sub = subcmd.lower() + raw = validate_input(text) if sub in ("encrypt", "enc") else bytes.fromhex(text) + if sub in ("encrypt", "enc"): + padder = padding.PKCS7(128).padder() + padded = padder.update(raw) + padder.finalize() + encryptor = cipher.encryptor() + ct = encryptor.update(padded) + encryptor.finalize() + return ct.hex() + elif sub in ("decrypt", "dec"): + try: + decryptor = cipher.decryptor() + padded = decryptor.update(raw) + decryptor.finalize() + unpadder = padding.PKCS7(128).unpadder() + data = unpadder.update(padded) + unpadder.finalize() + return data.decode("utf-8", errors="replace") + except Exception as e: + return f"AES decrypt error: {e}" + else: + raise ValueError("Use 'encrypt' or 'decrypt'") + +@register_op("chacha20", "ChaCha20 encrypt / decrypt (key+nonce from hex)", "Cryptography", + arg_names=["subcmd", "key_hex", "nonce_hex"]) +async def op_chacha20(subcmd: str, key_hex: str, nonce_hex: str, text: str) -> str: + if not HAS_CRYPTOGRAPHY: + return "Error: cryptography library not installed" + try: + key = bytes.fromhex(key_hex) + nonce = bytes.fromhex(nonce_hex) + if len(key) != 32: + raise ValueError("Key must be 32 bytes (64 hex)") + if len(nonce) != 16: + raise ValueError("Nonce must be 16 bytes (32 hex)") + except Exception as e: + return f"Key/nonce error: {e}" + algorithm = algorithms.ChaCha20(key, nonce) + cipher = Cipher(algorithm, mode=None) + sub = subcmd.lower() + raw = validate_input(text) if sub in ("encrypt", "enc") else bytes.fromhex(text) + if sub in ("encrypt", "enc"): + encryptor = cipher.encryptor() + ct = encryptor.update(raw) + return ct.hex() + elif sub in ("decrypt", "dec"): + decryptor = cipher.decryptor() + return decryptor.update(raw).decode("utf-8", errors="replace") + else: + raise ValueError("Use 'encrypt' or 'decrypt'") + +@register_op("rsa", "RSA encrypt/decrypt (PEM key as separate arg, OAEP SHA256)", "Cryptography", + arg_names=["subcmd", "key_pem"]) +async def op_rsa(subcmd: str, key_pem: str, text: str) -> str: + if not HAS_CRYPTOGRAPHY: + return "Error: cryptography library not installed" + sub = subcmd.lower() + try: + if sub in ("encrypt", "enc"): + pub_key = serialization.load_pem_public_key(key_pem.encode()) + raw = validate_input(text) + ct = pub_key.encrypt( + raw, + asym_padding.OAEP( + mgf=asym_padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None + ) + ) + return base64.b64encode(ct).decode() + elif sub in ("decrypt", "dec"): + priv_key = serialization.load_pem_private_key(key_pem.encode(), password=None) + raw = base64.b64decode(validate_input(text)) + pt = priv_key.decrypt( + raw, + asym_padding.OAEP( + mgf=asym_padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None + ) + ) + return pt.decode("utf-8", errors="replace") + else: + raise ValueError("Use 'encrypt' or 'decrypt'") + except Exception as e: + return f"RSA error: {e}" + +# Hash functions (sync, very fast) +def _hash_generic(data: str, algo: str) -> str: + raw = validate_input(data) + h = hashlib.new(algo) + h.update(raw) + return h.hexdigest() + +for name, algo in [("md5", "md5"), ("sha1", "sha1"), ("sha256", "sha256"), + ("sha512", "sha512"), ("sha3-256", "sha3_256"), ("sha3-512", "sha3_512")]: + @register_op(name, f"{name.upper()} hash", "Cryptography") + async def _op_hash(data: str, algo=algo) -> str: + return _hash_generic(data, algo) + +@register_op("hmac", "HMAC with given algorithm (key in hex)", "Cryptography", + arg_names=["algo", "key_hex"]) +async def op_hmac(algo: str, key_hex: str, data: str) -> str: + raw = validate_input(data) + try: + key = bytes.fromhex(key_hex) + except ValueError: + return "Invalid key hex" + algo_map = {"md5": "md5", "sha1": "sha1", "sha256": "sha256", "sha512": "sha512"} + if algo not in algo_map: + return f"Supported algos: {', '.join(algo_map)}" + h = stdlib_hmac.new(key, raw, algo_map[algo]) + return h.hexdigest() + +@register_op("bcrypt", "Bcrypt hash / verify (prefix: $2b$)", "Cryptography", + arg_names=["subcmd", "hash_or_rounds"]) +async def op_bcrypt(subcmd: str, hash_or_rounds: str, text: str) -> str: + if not HAS_BCRYPT: + return "Error: bcrypt library not installed" + sub = subcmd.lower() + if sub in ("hash", "h"): + try: + rounds = int(hash_or_rounds) + except ValueError: + return "Provide number of rounds (e.g. 12)" + salt = bcrypt.gensalt(rounds=rounds) + return bcrypt.hashpw(text.encode(), salt).decode() + elif sub in ("verify", "v"): + try: + ok = bcrypt.checkpw(text.encode(), hash_or_rounds.encode()) + return "Match" if ok else "No match" + except Exception as e: + return f"Bcrypt verify error: {e}" + else: + raise ValueError("Use 'hash' or 'verify'") + +@register_op("argon2", "Argon2 hash / verify (phc format)", "Cryptography", + arg_names=["subcmd", "params_or_hash"]) +async def op_argon2(subcmd: str, params_or_hash: str, text: str) -> str: + if not HAS_ARGON2: + return "Error: argon2-cffi not installed" + from argon2 import PasswordHasher + ph = PasswordHasher() + sub = subcmd.lower() + if sub in ("hash", "h"): + # params_or_hash could be e.g. "time_cost=3,memory_cost=65536,parallelism=4" + kwargs = {} + for part in params_or_hash.split(","): + if "=" in part: + k, v = part.split("=", 1) + kwargs[k.strip()] = int(v.strip()) + try: + return ph.hash(text, **kwargs) + except Exception as e: + return f"Argon2 hash error: {e}" + elif sub in ("verify", "v"): + try: + ph.verify(params_or_hash, text) + return "Match" + except Exception: + return "No match" + else: + raise ValueError("Use 'hash' or 'verify'") + +@register_op("pbkdf2", "PBKDF2 derive key (hex output)", "Cryptography", + arg_names=["salt_hex", "iterations", "dklen", "algo"]) +async def op_pbkdf2(salt_hex: str, iterations: str, dklen: str, algo: str, text: str) -> str: + salt = bytes.fromhex(salt_hex) + try: + rounds = int(iterations) + length = int(dklen) + except ValueError: + return "Invalid iterations/length" + if algo not in ("sha256", "sha512"): + return "algo must be sha256 or sha512" + dk = hashlib.pbkdf2_hmac(algo, text.encode(), salt, rounds, dklen=length) + return dk.hex() + +# ---------- Compression ---------- +@register_op("gzip", "gzip compress / decompress", "Compression", arg_names=["subcmd"]) +async def op_gzip(subcmd: str, text: str) -> str: + sub = subcmd.lower() + raw = validate_input(text) + if sub in ("compress", "comp", "c"): + return base64.b64encode(gzip.compress(raw)).decode() + elif sub in ("decompress", "decomp", "d"): + try: + decompressed = gzip.decompress(base64.b64decode(raw)) + return decompressed.decode("utf-8", errors="replace") + except Exception as e: + return f"gzip decompress error: {e}" + else: + raise ValueError("Use 'compress' or 'decompress'") + +@register_op("zlib", "zlib compress / decompress", "Compression", arg_names=["subcmd"]) +async def op_zlib(subcmd: str, text: str) -> str: + sub = subcmd.lower() + raw = validate_input(text) + if sub in ("compress", "comp", "c"): + return base64.b64encode(zlib.compress(raw)).decode() + elif sub in ("decompress", "decomp", "d"): + try: + decompressed = zlib.decompress(base64.b64decode(raw)) + return decompressed.decode("utf-8", errors="replace") + except Exception as e: + return f"zlib decompress error: {e}" + else: + raise ValueError("Use 'compress' or 'decompress'") + +@register_op("bzip2", "bzip2 compress / decompress", "Compression", arg_names=["subcmd"]) +async def op_bzip2(subcmd: str, text: str) -> str: + import bz2 + sub = subcmd.lower() + raw = validate_input(text) + if sub in ("compress", "comp", "c"): + return base64.b64encode(bz2.compress(raw)).decode() + elif sub in ("decompress", "decomp", "d"): + try: + decompressed = bz2.decompress(base64.b64decode(raw)) + return decompressed.decode("utf-8", errors="replace") + except Exception as e: + return f"bzip2 decompress error: {e}" + else: + raise ValueError("Use 'compress' or 'decompress'") + +@register_op("lzma", "LZMA compress / decompress", "Compression", arg_names=["subcmd"]) +async def op_lzma(subcmd: str, text: str) -> str: + sub = subcmd.lower() + raw = validate_input(text) + if sub in ("compress", "comp", "c"): + return base64.b64encode(lzma.compress(raw)).decode() + elif sub in ("decompress", "decomp", "d"): + try: + decompressed = lzma.decompress(base64.b64decode(raw)) + return decompressed.decode("utf-8", errors="replace") + except Exception as e: + return f"LZMA decompress error: {e}" + else: + raise ValueError("Use 'compress' or 'decompress'") + +# ---------- Data processing ---------- +@register_op("json", "JSON format / validate", "Data Processing", arg_names=["subcmd"]) +async def op_json(subcmd: str, text: str) -> str: + sub = subcmd.lower() + if sub in ("format", "pretty", "p"): + try: + parsed = json.loads(text) + return json.dumps(parsed, indent=2, ensure_ascii=False) + except json.JSONDecodeError as e: + return f"Invalid JSON: {e}" + elif sub in ("validate", "check", "v"): + try: + json.loads(text) + return "Valid JSON" + except json.JSONDecodeError as e: + return f"Invalid JSON: {e}" + else: + raise ValueError("Use 'format' or 'validate'") + +@register_op("xml", "XML format (pretty-print)", "Data Processing", arg_names=["subcmd"]) +async def op_xml(subcmd: str, text: str) -> str: + sub = subcmd.lower() + if sub not in ("format", "pretty", "p"): + raise ValueError("Use 'format'") + if HAS_LXML: + try: + parser = lxml_etree.XMLParser(remove_blank_text=True) + tree = lxml_etree.fromstring(text.encode(), parser) + return lxml_etree.tostring(tree, pretty_print=True, encoding="unicode") + except Exception as e: + return f"XML error: {e}" + else: + try: + ET.fromstring(text) # validate + # Simple pretty-print using minidom + import xml.dom.minidom as minidom + dom = minidom.parseString(text) + return dom.toprettyxml(indent=" ") + except Exception as e: + return f"XML error: {e}" + +@register_op("yaml", "YAML format / convert to JSON", "Data Processing", arg_names=["subcmd"]) +async def op_yaml(subcmd: str, text: str) -> str: + if not HAS_YAML: + return "Error: PyYAML not installed" + sub = subcmd.lower() + if sub in ("format", "pretty", "p"): + try: + data = yaml.safe_load(text) + return yaml.dump(data, default_flow_style=False, sort_keys=False) + except Exception as e: + return f"YAML error: {e}" + elif sub == "tojson": + try: + data = yaml.safe_load(text) + return json.dumps(data, indent=2, ensure_ascii=False) + except Exception as e: + return f"YAML to JSON error: {e}" + else: + raise ValueError("Use 'format' or 'tojson'") + +@register_op("csv", "CSV parse (first row as header)", "Data Processing") +async def op_csv(text: str) -> str: + try: + reader = csv.DictReader(io.StringIO(text)) + rows = list(reader) + if not rows: + return "No rows" + return f"Columns: {list(rows[0].keys())}\nFirst row: {rows[0]}" + except Exception as e: + return f"CSV error: {e}" + +@register_op("asn1", "ASN.1 parse (DER base64 input)", "Data Processing") +async def op_asn1(text: str) -> str: + if not HAS_ASN1: + return "Error: asn1crypto not installed" + try: + der = base64.b64decode(text) + parsed = asn1core.load(der) + return str(parsed) + except Exception as e: + return f"ASN.1 parse error: {e}" + +@register_op("pemder", "PEM/DER conversion (to PEM, to DER)", "Data Processing", + arg_names=["subcmd"]) +async def op_pemder(subcmd: str, text: str) -> str: + if not HAS_CRYPTOGRAPHY: + return "Error: cryptography library not installed" + sub = subcmd.lower() + try: + raw = validate_input(text) + if sub in ("topem", "pem"): + # Assume raw is DER, convert to PEM via dummy load + from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, PublicFormat, NoEncryption + try: + priv = serialization.load_der_private_key(raw, password=None) + return priv.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode() + except Exception: + try: + pub = serialization.load_der_public_key(raw) + return pub.public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo).decode() + except Exception: + return "Not a recognized DER key; wrapping as generic PEM:\n" + \ + "-----BEGIN CERTIFICATE-----\n" + base64.b64encode(raw).decode() + "\n-----END CERTIFICATE-----" + elif sub in ("toder", "der"): + # Assume PEM, strip headers and decode + lines = text.splitlines() + b64data = "".join(line.strip() for line in lines if not line.startswith("-----")) + return base64.b64decode(b64data).hex() + else: + raise ValueError("Use 'topem' or 'toder'") + except Exception as e: + return f"PEM/DER error: {e}" + +# ---------- Forensics ---------- +@register_op("entropy", "Shannon entropy of input", "Forensics") +async def op_entropy(text: str) -> str: + raw = validate_input(text) + ent = shannon_entropy(raw) + return f"Entropy: {ent:.4f} bits/byte" + +@register_op("ioc", "Extract IOCs (IPs, domains, URLs, emails)", "Forensics") +async def op_ioc(text: str) -> str: + ips = re.findall(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b', text) + domains = re.findall(r'(?:(?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,})\b', text) + urls = re.findall(r'https?://[^\s]+', text) + emails = re.findall(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', text) + hashes = re.findall(r'\b[a-fA-F0-9]{32,64}\b', text) + out = [] + if ips: out.append("IPs: " + ", ".join(set(ips))) + if domains: out.append("Domains: " + ", ".join(set(domains))) + if urls: out.append("URLs: " + ", ".join(set(urls))) + if emails: out.append("Emails: " + ", ".join(set(emails))) + if hashes: out.append("Hashes: " + ", ".join(set(hashes))) + return "\n".join(out) if out else "No IOCs found" + +@register_op("strings", "Extract ASCII strings (min 4 chars)", "Forensics") +async def op_strings(text: str) -> str: + raw = validate_input(text) + pattern = re.compile(b'[\x20-\x7E]{4,}') + found = pattern.findall(raw) + decoded = [s.decode("ascii", errors="replace") for s in found] + return "\n".join(decoded) if decoded else "No strings found" + +@register_op("filemagic", "Detect file type by magic bytes", "Forensics") +async def op_filemagic(text: str) -> str: + # Common signatures: (offset, hex bytes, description) + signatures = [ + (0, "89504E47", "PNG image"), + (0, "FFD8FF", "JPEG image"), + (0, "47494638", "GIF image"), + (0, "25504446", "PDF document"), + (0, "504B0304", "ZIP archive"), + (0, "52617221", "RAR archive"), + (0, "4D5A", "PE executable (MZ)"), + (0, "7F454C46", "ELF executable"), + (0, "CAFEBABE", "Mach-O binary"), + (0, "D0CF11E0A1B11AE1", "MS Office 97-2003"), + ] + raw = validate_input(text) + hexdata = raw.hex().upper() + for offset, magic, desc in signatures: + if hexdata.startswith(magic, offset*2): + return f"Detected: {desc}" + return "Unknown file type" + +@register_op("base64blob", "Find Base64-encoded blobs in text", "Forensics") +async def op_base64blob(text: str) -> str: + pattern = re.compile(r'(?:[A-Za-z0-9+/]{4}){10,}(?:[A-Za-z0-9+/]{2}==|[A-Za-z0-9+/]{3}=)?') + matches = pattern.findall(text) + if not matches: + return "No Base64 blobs found" + out_lines = [] + for i, m in enumerate(matches[:5], 1): + out_lines.append(f"Blob {i}: {m[:50]}...") + try: + decoded = base64.b64decode(m) + out_lines.append(f" Decoded (head 50): {decoded[:50]}") + except Exception: + out_lines.append(" (invalid base64, shown raw)") + return "\n".join(out_lines) + +@register_op("xbrute", "XOR single-byte brute force (show top 5 printable)", "Forensics") +async def op_xbrute(text: str) -> str: + raw = validate_input(text) + results = [] + for key in range(256): + xored = bytes(b ^ key for b in raw) + try: + decoded = xored.decode("utf-8", errors="strict") + printable_count = sum(1 for c in decoded if 32 <= ord(c) <= 126 or c in '\n\r\t') + if printable_count > len(decoded) * 0.8: # >80% printable + results.append((key, printable_count, decoded[:200])) + except UnicodeDecodeError: + pass + if not results: + return "No high-printable XOR keys found" + results.sort(key=lambda x: x[1], reverse=True) + top = results[:5] + out = [] + for key, cnt, sample in top: + out.append(f"Key 0x{key:02x} ({key}) - printable {cnt}/{len(raw)}:\n{sample}") + return "\n---\n".join(out) + +@register_op("yara", "Scan input with YARA rule (base64 encoded rule)", "Forensics", + arg_names=["rule_b64"]) +async def op_yara(rule_b64: str, text: str) -> str: + if not HAS_YARA: + return "Error: yara-python not installed" + try: + rule_text = base64.b64decode(rule_b64).decode() + compiled = yara.compile(source=rule_text) + raw = validate_input(text) + matches = compiled.match(data=raw) + if matches: + return "\n".join(str(m) for m in matches) + return "No matches" + except Exception as e: + return f"YARA error: {e}" + +@register_op("peinfo", "Basic PE header analysis (hex input)", "Forensics") +async def op_peinfo(text: str) -> str: + raw = validate_input(text) + if raw[:2] != b'MZ': + return "Not an MZ executable" + try: + pe_offset = struct.unpack(' str: + try: + net = ipaddress.ip_network(text.strip(), strict=False) + return (f"Network: {net.network_address}\n" + f"Netmask: {net.netmask}\n" + f"Broadcast: {net.broadcast_address}\n" + f"Hosts: {net.num_addresses}") + except Exception as e: + return f"CIDR error: {e}" + +@register_op("ipconv", "IP conversion (decimal, hex, binary)", "Networking", + arg_names=["subcmd"]) +async def op_ipconv(subcmd: str, text: str) -> str: + sub = subcmd.lower() + try: + ip = ipaddress.ip_address(text.strip()) + if sub in ("hex", "h"): + return format(int(ip), 'x') + elif sub in ("dec", "d"): + return str(int(ip)) + elif sub in ("bin", "b"): + return format(int(ip), '032b' if ip.version == 4 else '0128b') + else: + raise ValueError("Use 'hex', 'dec', or 'bin'") + except Exception as e: + return f"IP conversion error: {e}" + +@register_op("urlparse", "Parse URL components", "Networking") +async def op_urlparse(text: str) -> str: + import urllib.parse + parts = urllib.parse.urlparse(text.strip()) + return (f"Scheme: {parts.scheme}\n" + f"Host: {parts.hostname}\n" + f"Port: {parts.port}\n" + f"Path: {parts.path}\n" + f"Query: {parts.query}\n" + f"Fragment: {parts.fragment}") + +@register_op("dns", "DNS lookup (A record) [requires internet; offline may fail]", "Networking") +async def op_dns(text: str) -> str: + import socket + try: + result = socket.getaddrinfo(text.strip(), None) + ips = set(addr[4][0] for addr in result) + return "\n".join(ips) + except Exception as e: + return f"DNS error: {e}" + +# ---------- Recipe system ---------- +class Recipe: + """A sequence of operations to apply.""" + def __init__(self, steps: List[Dict[str, Any]]): + self.steps = steps + + @staticmethod + def from_json(json_str: str) -> "Recipe": + """Load recipe from JSON string. Format: {"steps": [{"op":"name","args":[...]}, ...]}""" + data = json.loads(json_str) + if "steps" not in data: + raise ValueError("JSON must contain 'steps' list") + steps = [] + for step in data["steps"]: + op_name = step["op"] + args = step.get("args", []) + steps.append({"op": op_name, "args": args}) + return Recipe(steps) + + async def run(self, input_data: str, operations: Dict[str, Operation]) -> str: + data = input_data + for step in self.steps: + op_name = step["op"] + if op_name not in operations: + raise KeyError(f"Unknown operation: {op_name}") + op = operations[op_name] + # Combine provided args with the current data as the last arg + args = step["args"] + [data] + # Ensure enough args + if len(args) < len(op.arg_names) + 1: + raise ValueError(f"Insufficient arguments for {op_name}") + try: + data = await asyncio.wait_for(op.func(*args), timeout=op.timeout) + except asyncio.TimeoutError: + data = f"Error: operation {op_name} timed out" + break + except Exception as e: + data = f"Error in {op_name}: {e}" + break + return data + +@register_op("recipe", "Run a recipe (provide JSON and data)", "Recipes", + arg_names=["subcmd"]) +async def op_recipe(subcmd: str, json_or_data: str, *extra: str) -> str: + sub = subcmd.lower() + if sub == "list": + cats = collections.defaultdict(list) + for op in OPERATIONS.values(): + cats[op.category].append(op.name) + out = [] + for cat in sorted(cats): + out.append(f"{cat}: {', '.join(cats[cat])}") + return "\n".join(out) + elif sub == "run": + # Usage: !encode recipe run '' + # Here json_or_data is the JSON string, extra[0] is the data + if not extra: + raise ValueError("Provide data after JSON recipe") + recipe_json = json_or_data + data = " ".join(extra) # extra is tuple + try: + recipe = Recipe.from_json(recipe_json) + return await recipe.run(data, OPERATIONS) + except Exception as e: + return f"Recipe error: {e}" + else: + raise ValueError("Use 'list' or 'run '") + +# ---------- Main handler (interface to the bot) ---------- +async def handle_command(room, message, bot, prefix, config): + """ + Entry point called by the bot framework. + """ + import simplematrixbotlib as botlib + match = botlib.MessageMatch(room, message, bot, prefix) + if not (match.is_not_from_this_bot() and match.prefix() and match.command("encode")): + return + + args = match.args() + if not args: + await bot.api.send_text_message( + room.room_id, + "Usage: !encode [args...] \n" + " !encode recipe list\n" + " !encode recipe run '' \n" + "Use !encode help for full list." + ) + return + + # Special help handling (now in details tags with markdown) + if args[0].lower() in ("help", "-h", "--help"): + if len(args) > 1 and args[1].lower() in OPERATIONS: + op = OPERATIONS[args[1].lower()] + html = f"
{op.name} ({op.category})" + html += f"

{op.description}

" + if op.arg_names: + html += f"

Arguments: {', '.join(op.arg_names)}

" + else: + html += "

No arguments required.

" + html += "
" + await bot.api.send_markdown_message(room.room_id, html) + else: + categories = collections.defaultdict(list) + for op in OPERATIONS.values(): + categories[op.category].append(op.name) + html = "
Available operations by category" + for cat in sorted(categories): + html += f"

{cat}

    " + for op_name in categories[cat]: + op = OPERATIONS[op_name] + html += f"
  • {op_name}: {op.description}
  • " + html += "
" + html += "

Use !encode help <operation> for details.

" + html += "
" + await bot.api.send_markdown_message(room.room_id, html) + return + + # Try to find operation + op_name = args[0].lower() + if op_name not in OPERATIONS: + await bot.api.send_text_message( + room.room_id, + f"Unknown operation '{op_name}'. Use !encode help for list." + ) + return + + operation = OPERATIONS[op_name] + # Determine how many positional args the operation needs (excluding the final data). + required_arg_count = len(operation.arg_names) + # The remaining args after the operation name. We need at least required_arg_count arguments + # plus the data. If there are fewer, show error. + provided_args = args[1:] + if len(provided_args) < required_arg_count: + await bot.api.send_text_message( + room.room_id, + f"Usage: !encode {op_name} {' '.join(f'<{a}>' for a in operation.arg_names)} " + ) + return + + # Split into op-specific args and the data (last element gets the rest of the string) + if len(provided_args) >= required_arg_count: + pos_args = provided_args[:required_arg_count] + data_parts = provided_args[required_arg_count:] + data = " ".join(data_parts) + else: + # Not enough, handled above + return + + # Enforce input size + if len(data) > MAX_INPUT_SIZE: + await bot.api.send_text_message( + room.room_id, + f"Input too large. Maximum {MAX_INPUT_SIZE} bytes." + ) + return + + # Execute with timeout + try: + result = await asyncio.wait_for( + operation.func(*pos_args, data), + timeout=operation.timeout + ) + # Truncate huge output for chat friendliness + if len(result) > 4000: + result = result[:3990] + "\n..." + await bot.api.send_text_message(room.room_id, result) + except asyncio.TimeoutError: + await bot.api.send_text_message(room.room_id, "Error: operation timed out.") + except Exception as e: + await bot.api.send_text_message(room.room_id, f"Error: {e}") + +# Plugin Metadata +__version__ = "2.0.1" +__author__ = "Funguy Bot" +__description__ = "Comprehensive CyberChef-like encoding and analysis toolkit" +__help__ = """ +
+!encode – Comprehensive data manipulation toolkit + +

+A CyberChef-like plugin with dozens of operations for encoding, cryptography, +compression, data processing, forensics, and networking. Fully offline. +

+ +

Usage

+

!encode <operation> [arguments] <data>

+

For help on a specific operation: !encode help <op>

+ +
+ +

Encoding

+
    +
  • base64 – Base64 encode/decode
    +!encode base64 encode Hello World
    +!encode base64 decode SGVsbG8gV29ybGQ=
  • + +
  • base32 – Base32 encode/decode
    +!encode base32 encode Hello
    +!encode base32 decode JBSWY3DPEBLW64TMMQQQ====
  • + +
  • hex – Hex encode/decode
    +!encode hex encode Secret
    +!encode hex decode 536563726574
  • + +
  • url – URL encode/decode
    +!encode url encode https://example.com/a b
    +!encode url decode https%3A%2F%2Fexample.com%2Fa+b
  • + +
  • html – HTML entity encode/decode
    +!encode html encode "<script>"
    +!encode html decode &lt;script&gt;
  • + +
  • unicode – Unicode escape/unescape
    +!encode unicode encode café
    +!encode unicode decode \\u0063\\u0061\\u0066\\u00e9
  • + +
  • binary – Text to binary and back
    +!encode binary encode Hi
    +!encode binary decode 01001000 01101001
  • + +
  • rot13 – ROT13 cipher
    +!encode rot13 Uryyb Jbeyq
  • + +
  • morse – Morse code encode/decode
    +!encode morse encode SOS
    +!encode morse decode ... --- ...
  • +
+ +

Cryptography

+
    +
  • xor – XOR with single-byte key (hex)
    +!encode xor 41 Hello (key = 0x41)
  • + +
  • aes – AES-CBC encrypt/decrypt (key+IV hex)
    +!encode aes encrypt 00112233445566778899aabbccddeeff 000102030405060708090a0b0c0d0e0f secret
    +!encode aes decrypt 00112233445566778899aabbccddeeff 000102030405060708090a0b0c0d0e0f <ciphertext hex>
  • + +
  • chacha20 – ChaCha20 encrypt/decrypt
    +!encode chacha20 encrypt <32-byte-key-hex> <16-byte-nonce-hex> message
  • + +
  • rsa – RSA-OAEP encrypt/decrypt (PEM key as argument)
    +!encode rsa encrypt "-----BEGIN PUBLIC KEY-----..." message
  • + +
  • md5 – MD5 hash
    +!encode md5 hello
  • +
  • sha1 – SHA1 hash
    +!encode sha1 hello
  • +
  • sha256 – SHA256 hash
    +!encode sha256 hello
  • +
  • sha512 – SHA512 hash
    +!encode sha512 hello
  • +
  • sha3-256 – SHA3-256 hash
    +!encode sha3-256 hello
  • +
  • sha3-512 – SHA3-512 hash
    +!encode sha3-512 hello
  • + +
  • hmac – HMAC with given algorithm (key in hex)
    +!encode hmac sha256 6b6579 message
  • + +
  • bcrypt – Bcrypt hash/verify
    +!encode bcrypt hash 12 mypassword (rounds=12)
    +!encode bcrypt verify '$2b$12$...' mypassword
  • + +
  • argon2 – Argon2 hash/verify (PHC format)
    +!encode argon2 hash "time_cost=3,memory_cost=65536,parallelism=4" password
    +!encode argon2 verify 'phc$argon2id$...' password
  • + +
  • pbkdf2 – PBKDF2 key derivation
    +!encode pbkdf2 aabbccdd 100000 32 sha256 mypassword
  • +
+ +

Compression

+
    +
  • gzip – gzip compress/decompress
    +!encode gzip compress some text
    +!encode gzip decompress H4sIAAAAAAAC/...
  • +
  • zlib – zlib compress/decompress
    +!encode zlib compress data
  • +
  • bzip2 – bzip2 compress/decompress
    +!encode bzip2 compress data
  • +
  • lzma – LZMA compress/decompress
    +!encode lzma compress data
  • +
+ +

Data Processing

+
    +
  • json – JSON format or validate
    +!encode json format '{"key":"value"}'
    +!encode json validate '...'
  • +
  • xml – XML pretty-print
    +!encode xml format "<root><a>1</a></root>"
  • +
  • yaml – YAML format or convert to JSON
    +!encode yaml format "key: value"
    +!encode yaml tojson "key: value"
  • +
  • csv – CSV parse (first row as header)
    +!encode csv "name,age\nAlice,30"
  • +
  • asn1 – ASN.1 parse (DER base64)
    +!encode asn1 MIIB...
  • +
  • pemder – PEM/DER conversion
    +!encode pemder topem <hex DER>
    +!encode pemder toder "-----BEGIN ..."
  • +
+ +

Forensics

+
    +
  • entropy – Shannon entropy
    +!encode entropy some data
  • +
  • ioc – Extract IOCs (IPs, domains, URLs, emails, hashes)
    +!encode ioc "Contact admin@example.com from 10.0.0.1"
  • +
  • strings – Extract ASCII strings (min 4 chars)
    +!encode strings <hex data>
  • +
  • filemagic – Detect file type by magic bytes
    +!encode filemagic <hex data>
  • +
  • base64blob – Find Base64-encoded blobs
    +!encode base64blob "Some SGVsbG8gV29ybGQ= inside text"
  • +
  • xbrute – XOR single-byte brute force
    +!encode xbrute <hex data>
  • +
  • yara – Scan with YARA rule (rule as base64)
    +!encode yara <base64 rule> <data>
  • +
  • peinfo – PE header analysis
    +!encode peinfo <hex of MZ file>
  • +
+ +

Networking

+
    +
  • cidr – CIDR/subnet calculator
    +!encode cidr 192.168.1.0/24
  • +
  • ipconv – IP conversion (hex, dec, binary)
    +!encode ipconv hex 192.168.1.1
  • +
  • urlparse – Parse URL components
    +!encode urlparse https://user:pass@example.com:8080/path?q=1#frag
  • +
  • dns – DNS lookup (A record)
    +!encode dns example.com
  • +
+ +

Recipes

+
    +
  • recipe list – List all available operations
    +!encode recipe list
  • +
  • recipe run – Execute a JSON recipe on data
    +!encode recipe run '{"steps":[{"op":"base64","args":["encode"]},{"op":"hex","args":["encode"]}]}' "hello world"
  • +
+ +

Type !encode help <op> for detailed argument info on any operation.

+
+""" diff --git a/plugins/stable-diffusion.py b/plugins/stable-diffusion.py index a334f48..0f915da 100644 --- a/plugins/stable-diffusion.py +++ b/plugins/stable-diffusion.py @@ -1,5 +1,8 @@ +#!/usr/bin/env python3 """ Plugin for generating images using self-hosted Stable Diffusion and sending them to a Matrix chat room. + +Now supports a `--seed` parameter to control deterministic generation. """ import requests @@ -90,8 +93,13 @@ async def handle_command(room, message, bot, prefix, config): parser.add_argument('--cfg', type=int, default=2, help='CFG scale, default=2') parser.add_argument('--h', type=int, default=512, help='Height of the image, default=512') parser.add_argument('--w', type=int, default=512, help='Width of the image, default=512') - parser.add_argument('--neg', type=str, nargs='+', default=['((((ugly)))), (((duplicate))), ((morbid)), ((mutilated)), out of frame, extra fingers, mutated hands, ((poorly drawn hands)), ((poorly drawn face)), (((mutation))), (((deformed))), ((ugly)), blurry, ((bad anatomy)), (((bad proportions))), ((extra limbs)), cloned face, (((disfigured))), out of frame, ugly, extra limbs, (bad anatomy), gross proportions, (malformed limbs), ((missing arms)), ((missing legs)), (((extra arms))), (((extra legs))), mutated hands, (fused fingers), (too many fingers), (((long neck)))'], help='Negative prompt') - parser.add_argument('--sampler', type=str, nargs='*', default=['DPM++', 'SDE'], help='Sampler name, default=DPM++ SDE') + parser.add_argument('--neg', type=str, nargs='+', + default=['((((ugly)))), (((duplicate))), ((morbid)), ((mutilated)), out of frame, extra fingers, mutated hands, ((poorly drawn hands)), ((poorly drawn face)), (((mutation))), (((deformed))), ((ugly)), blurry, ((bad anatomy)), (((bad proportions))), ((extra limbs)), cloned face, (((disfigured))), out of frame, ugly, extra limbs, (bad anatomy), gross proportions, (malformed limbs), ((missing arms)), ((missing legs)), (((extra arms))), (((extra legs))), mutated hands, (fused fingers), (too many fingers), (((long neck)))'], + help='Negative prompt') + parser.add_argument('--sampler', type=str, nargs='*', default=['DPM++', 'SDE Karras'], + help='Sampler name, default=DPM++ SDE') + parser.add_argument('--seed', type=int, default=None, + help='Seed for deterministic generation (omit for random)') parser.add_argument('prompt', type=str, nargs='*', help='Prompt for the image') args = parser.parse_args(message.body.split()[1:]) # skip command prefix @@ -112,6 +120,9 @@ async def handle_command(room, message, bot, prefix, config): "width": args.w, "height": args.h, } + # Add seed only if explicitly provided + if args.seed is not None: + payload["seed"] = args.seed url = "http://127.0.0.1:7860/sdapi/v1/txt2img" response = requests.post(url=url, json=payload, timeout=600) @@ -127,7 +138,14 @@ async def handle_command(room, message, bot, prefix, config): # Optional: send info about generated image neg_prompt_clean = neg_prompt.replace(" ", "") - info_msg = f"""
🔍 Image InfoPrompt: {prompt[:100]}
Steps: {args.steps}
Dimensions: {args.h}x{args.w}
Sampler: {sampler_name}
CFG Scale: {args.cfg}
Negative Prompt: {neg_prompt_clean}
""" + seed_info = f"
Seed: {args.seed}" if args.seed is not None else "" + info_msg = f"""
🔍 Image Info +Prompt: {prompt[:100]}
+Steps: {args.steps}
+Dimensions: {args.h}x{args.w}
+Sampler: {sampler_name}
+CFG Scale: {args.cfg}{seed_info}
+Negative Prompt: {neg_prompt_clean}
""" # await bot.api.send_markdown_message(room.room_id, info_msg) # Clean up temp file @@ -167,6 +185,7 @@ def print_help():
  • --w W - Width of the image, default=512
  • --neg NEG - Negative prompt, default=((((ugly)))), (((duplicate))), ((morbid)), ((mutilated)), out of frame, extra fingers, mutated hands, ((poorly drawn hands)), ((poorly drawn face)), (((mutation))), (((deformed))), ((ugly)), blurry, ((bad anatomy)), (((bad proportions))), ((extra limbs)), cloned face, (((disfigured))), out of frame, ugly, extra limbs, (bad anatomy), gross proportions, (malformed limbs), ((missing arms)), ((missing legs)), (((extra arms))), (((extra legs))), mutated hands, (fused fingers), (too many fingers), (((long neck)))
  • --sampler SAMPLER - Sampler name, default=DPM++ SDE
  • +
  • --seed SEED - Seed for deterministic generation (omit for random)
  • LORA List:

    @@ -191,9 +210,9 @@ def print_help(): # Plugin Metadata # --------------------------------------------------------------------------- -__version__ = "1.0.0" +__version__ = "1.1.0" __author__ = "Funguy Bot" -__description__ = "Stable Diffusion image generation" +__description__ = "Stable Diffusion image generation (supports --seed)" __help__ = """
    !sd – Generate images via Stable Diffusion @@ -204,6 +223,7 @@ __help__ = """
  • --h H --w W – Image dimensions (default 512)
  • --neg <negative prompt>
  • --sampler SAMPLER – Sampler name (default DPM++ SDE)
  • +
  • --seed SEED – Deterministic seed (optional)
  • Requires a locally running Stable Diffusion API.

    diff --git a/plugins/subnet.py b/plugins/subnet.py new file mode 100644 index 0000000..aa0f04a --- /dev/null +++ b/plugins/subnet.py @@ -0,0 +1,257 @@ +#!/usr/bin/env python3 +""" +plugins/subnet.py – Subnet calculator and network splitting plugin for Funguy Bot. + +Provides the following commands: + !subnet info – Show detailed info about a network + !subnet split --prefix – Split network into smaller subnets (new prefix length) + !subnet split --diff – Split network into equal subnets (prefixlen delta) + !subnet adjacent – Show given network and next adjacent ones + !subnet help – Display this help + +Examples: + !subnet info 192.168.4.0/26 + !subnet split 192.168.4.0/24 --prefix 26 + !subnet split 10.0.0.0/16 --diff 2 + !subnet adjacent 192.168.4.0/26 3 +""" + +import ipaddress +import sys +from typing import Union + +# ------------------------------- helper functions -------------------------------- + +def _fmt_subnet_info(net: Union[ipaddress.IPv4Network, ipaddress.IPv6Network]) -> str: + """Return a human‑readable string with all relevant subnet details.""" + nw = net.network_address + bc = net.broadcast_address if hasattr(net, "broadcast_address") else None + total = net.num_addresses + + if net.version == 4: + if net.prefixlen == 32: + usable_count = 1 + first = last = nw + elif net.prefixlen == 31: + usable_count = 2 + first = nw + last = bc + else: + usable_count = max(0, total - 2) + first = nw + 1 if usable_count > 0 else None + last = bc - 1 if usable_count > 0 else None + else: + hosts_iter = net.hosts() + try: + first = next(hosts_iter) + last = net.network_address + (total - 1) + usable_count = total + except StopIteration: + first = last = None + usable_count = 0 + + lines = [ + f"CIDR: {net.with_prefixlen}", + f"Network: {nw}", + f"Broadcast: {bc if bc is not None else 'N/A'}", + f"Netmask: {net.netmask if hasattr(net, 'netmask') else 'N/A'}", + f"Wildcard Mask: {net.hostmask if hasattr(net, 'hostmask') else 'N/A'}", + f"Total IPs: {total}", + f"Usable Hosts: {usable_count}", + ] + if first is not None and last is not None: + lines.append(f"First Usable: {first}") + lines.append(f"Last Usable: {last}") + lines.append(f"Usable Range: {first} - {last}") + return "\n".join(lines) + + +def _split_by_prefix(net, new_prefix: int) -> str: + if new_prefix < net.prefixlen: + return f"[!] New prefix /{new_prefix} is smaller than current prefix /{net.prefixlen}. Cannot split." + out = [f"# Splitting {net.with_prefixlen} into /{new_prefix} subnets:"] + for i, sub in enumerate(net.subnets(new_prefix=new_prefix)): + out.append(f"\n-- Subnet #{i+1} --") + out.append(_fmt_subnet_info(sub)) + return "\n".join(out) + + +def _split_by_diff(net, diff: int) -> str: + new_prefix = net.prefixlen + diff + return _split_by_prefix(net, new_prefix) + + +def _adjacent_networks(net, count: int) -> str: + out = [f"# Adjacent networks of size /{net.prefixlen} (starting at {net.with_prefixlen}):"] + current = net + for i in range(count + 1): + out.append(f"\n-- Adjacent #{i} --") + out.append(_fmt_subnet_info(current)) + try: + next_net_addr = current.network_address + current.num_addresses + current = ipaddress.ip_network(f"{next_net_addr}/{current.prefixlen}", strict=True) + except ValueError: + out.append("[!] Reached address space limit.") + break + return "\n".join(out) + + +# ------------------------------- bot plugin entry ------------------------------- + +async def handle_command(room, message, bot, prefix, config): + import simplematrixbotlib as botlib + match = botlib.MessageMatch(room, message, bot, prefix) + + if not (match.is_not_from_this_bot() and match.prefix() and match.command("subnet")): + return + + args = match.args() + if not args: + await bot.api.send_text_message( + room.room_id, + "Usage: !subnet ...\n" + " !subnet help – show full help" + ) + return + + subcmd = args[0].lower() + + # --- help --- + if subcmd in ("help", "-h", "--help"): + # Send nicely formatted HTML in a details tag via markdown + html = "
    !subnet – Subnet calculator and exploration\n" + html += "

    Calculate subnet details, split networks, or enumerate adjacent subnets.

    \n" + html += "

    Commands

    \n" + html += "
      \n" + html += "
    • info – Show detailed info for a network
      \n" + html += "!subnet info <CIDR>
      \n" + html += "Example: !subnet info 192.168.1.0/24
    • \n" + html += "
    • split – Split a network into smaller subnets
      \n" + html += "!subnet split <CIDR> --prefix <new_prefix>
      \n" + html += "Example: !subnet split 192.168.1.0/24 --prefix 26
      \n" + html += "Alternatively, use --diff to split by prefix delta:
      \n" + html += "!subnet split <CIDR> --diff <delta>
      \n" + html += "Example: !subnet split 10.0.0.0/16 --diff 2 (creates 4 subnets)
    • \n" + html += "
    • adjacent – Show the current network and adjacent ones
      \n" + html += "!subnet adjacent <CIDR> <count>
      \n" + html += "Example: !subnet adjacent 192.168.4.0/26 3
    • \n" + html += "
    \n" + html += "

    Notes

    \n" + html += "
      \n" + html += "
    • IPv4 /31 and /32 networks show both addresses as usable (RFC 3021).
    • \n" + html += "
    • IPv6 networks list all addresses as hosts (no broadcast).
    • \n" + html += "
    \n" + html += "
    " + await bot.api.send_markdown_message(room.room_id, html) + return + + # --- info (or a CIDR passed directly) --- + if subcmd == "info" or "/" in subcmd: + cidr = args[1] if subcmd == "info" else subcmd + try: + net = ipaddress.ip_network(cidr, strict=False) + except ValueError as e: + await bot.api.send_text_message(room.room_id, f"[!] Invalid CIDR: {e}") + return + await bot.api.send_text_message(room.room_id, _fmt_subnet_info(net)) + return + + # --- split --- + if subcmd == "split": + if len(args) < 2: + await bot.api.send_text_message( + room.room_id, + "Usage: !subnet split --prefix OR --diff " + ) + return + cidr = args[1] + try: + net = ipaddress.ip_network(cidr, strict=False) + except ValueError as e: + await bot.api.send_text_message(room.room_id, f"[!] Invalid CIDR: {e}") + return + + if "--prefix" in args: + try: + idx = args.index("--prefix") + new_prefix = int(args[idx + 1]) + except (ValueError, IndexError): + await bot.api.send_text_message( + room.room_id, + "Usage: !subnet split --prefix " + ) + return + result = _split_by_prefix(net, new_prefix) + elif "--diff" in args: + try: + idx = args.index("--diff") + diff = int(args[idx + 1]) + except (ValueError, IndexError): + await bot.api.send_text_message( + room.room_id, + "Usage: !subnet split --diff " + ) + return + result = _split_by_diff(net, diff) + else: + await bot.api.send_text_message( + room.room_id, + "You must provide either --prefix or --diff for split." + ) + return + await bot.api.send_text_message(room.room_id, result) + return + + # --- adjacent --- + if subcmd == "adjacent": + if len(args) < 3: + await bot.api.send_text_message( + room.room_id, + "Usage: !subnet adjacent " + ) + return + cidr = args[1] + try: + net = ipaddress.ip_network(cidr, strict=False) + except ValueError as e: + await bot.api.send_text_message(room.room_id, f"[!] Invalid CIDR: {e}") + return + try: + count = int(args[2]) + except ValueError: + await bot.api.send_text_message( + room.room_id, + "Count must be an integer." + ) + return + result = _adjacent_networks(net, count) + await bot.api.send_text_message(room.room_id, result) + return + + # Unknown subcommand + await bot.api.send_text_message( + room.room_id, + f"Unknown subcommand '{subcmd}'. Use !subnet help to see available commands." + ) + + +# Plugin metadata +__version__ = "1.0.1" +__author__ = "Funguy Bot" +__description__ = "Subnet calculator, splitter, and adjacent network enumerator" +__help__ = """ +
    +!subnet – Subnet calculator and exploration +

    Calculate subnet details, split networks, or enumerate adjacent subnets.

    +
      +
    • !subnet info <CIDR> – Show detailed info for a network
      + Example: !subnet info 192.168.1.0/24
    • +
    • !subnet split <CIDR> --prefix <new_prefix> – Split into smaller subnets
      + Example: !subnet split 192.168.1.0/24 --prefix 26
    • +
    • !subnet split <CIDR> --diff <delta> – Split by prefix delta
      + Example: !subnet split 10.0.0.0/16 --diff 2
    • +
    • !subnet adjacent <CIDR> <count> – Show adjacent networks
      + Example: !subnet adjacent 192.168.4.0/26 3
    • +
    +
    +""" diff --git a/requirements.txt b/requirements.txt index 3ab2d59..40614f7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,4 +23,11 @@ pytz ddgs playwright lxml -beautifulsoup4 \ No newline at end of file +beautifulsoup4 +cryptography +bcrypt +argon2-cffi +yara-python +asn1crypto +PyYAML +lxml