1204 lines
46 KiB
Python
1204 lines
46 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Funguy Bot - CyberChef-style plugin (encode.py)
|
||
================================================
|
||
A production-ready, offline, modular text & data manipulation plugin.
|
||
Provides a wide range of encoding, cryptographic, compression, forensic
|
||
and networking operations – all controllable via the !encode command.
|
||
|
||
Dependencies:
|
||
Standard library
|
||
Optional (if missing, related operations will gracefully report an error):
|
||
cryptography (AES, RSA, ChaCha20, PEM/DER, SHA3/HMAC variants)
|
||
bcrypt (bcrypt hashing)
|
||
argon2-cffi (Argon2 hashing)
|
||
yara-python (YARA scanning)
|
||
asn1crypto (ASN.1 parsing)
|
||
PyYAML (YAML formatting)
|
||
lxml (better XML formatting, fallback to xml.etree)
|
||
|
||
Author: Funguy Bot
|
||
Version: 2.0.1
|
||
"""
|
||
|
||
import asyncio
|
||
import base64
|
||
import binascii
|
||
import codecs
|
||
import collections
|
||
import csv
|
||
import gzip
|
||
import io
|
||
import ipaddress
|
||
import json
|
||
import logging
|
||
import lzma
|
||
import math
|
||
import os
|
||
import re
|
||
import struct
|
||
import time
|
||
import urllib.parse
|
||
import xml.etree.ElementTree as ET
|
||
import zlib
|
||
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
|
||
|
||
# ---------- Optional dependency detection ----------
|
||
try:
|
||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||
from cryptography.hazmat.primitives import hashes, hmac, padding, serialization
|
||
from cryptography.hazmat.primitives.asymmetric import rsa, padding as asym_padding
|
||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
|
||
from cryptography.hazmat.primitives import constant_time
|
||
HAS_CRYPTOGRAPHY = True
|
||
except ImportError:
|
||
HAS_CRYPTOGRAPHY = False
|
||
|
||
try:
|
||
import bcrypt
|
||
HAS_BCRYPT = True
|
||
except ImportError:
|
||
HAS_BCRYPT = False
|
||
|
||
try:
|
||
import argon2
|
||
HAS_ARGON2 = True
|
||
except ImportError:
|
||
HAS_ARGON2 = False
|
||
|
||
try:
|
||
import yara
|
||
HAS_YARA = True
|
||
except ImportError:
|
||
HAS_YARA = False
|
||
|
||
try:
|
||
import asn1crypto.core as asn1core
|
||
import asn1crypto.x509 as asn1x509
|
||
HAS_ASN1 = True
|
||
except ImportError:
|
||
HAS_ASN1 = False
|
||
|
||
try:
|
||
import yaml
|
||
HAS_YAML = True
|
||
except ImportError:
|
||
HAS_YAML = False
|
||
|
||
try:
|
||
from lxml import etree as lxml_etree
|
||
HAS_LXML = True
|
||
except ImportError:
|
||
HAS_LXML = False
|
||
|
||
# For HMAC with stdlib
|
||
try:
|
||
import hashlib
|
||
import hmac as stdlib_hmac
|
||
except ImportError:
|
||
pass
|
||
|
||
# ---------- Constants ----------
|
||
MAX_INPUT_SIZE = 1 * 1024 * 1024 # 1 MB
|
||
DEFAULT_TIMEOUT = 10 # seconds
|
||
RECIPE_MARKER = "recipe"
|
||
|
||
# ---------- Operation registry ----------
|
||
class Operation:
|
||
"""Describes a single processing operation."""
|
||
__slots__ = ("name", "description", "category",
|
||
"arg_names", "func", "timeout")
|
||
def __init__(self, name: str, description: str, category: str,
|
||
arg_names: List[str], func: Callable, timeout: int = DEFAULT_TIMEOUT):
|
||
self.name = name
|
||
self.description = description
|
||
self.category = category
|
||
self.arg_names = arg_names # positional arguments before the data
|
||
self.func = func
|
||
self.timeout = timeout
|
||
|
||
OPERATIONS: Dict[str, Operation] = {}
|
||
|
||
def register_op(name: str, description: str, category: str,
|
||
arg_names: Optional[List[str]] = None, timeout: int = DEFAULT_TIMEOUT):
|
||
"""Decorator to register an operation."""
|
||
def decorator(func: Callable):
|
||
if not asyncio.iscoroutinefunction(func):
|
||
# Wrap sync functions
|
||
async def wrapper(*args, **kwargs):
|
||
return await asyncio.to_thread(func, *args, **kwargs)
|
||
func_to_reg = wrapper
|
||
else:
|
||
func_to_reg = func
|
||
OPERATIONS[name] = Operation(name, description, category,
|
||
arg_names or [], func_to_reg, timeout)
|
||
return func_to_reg
|
||
return decorator
|
||
|
||
# ---------- Helper utilities ----------
|
||
def validate_input(data: str, max_size: int = MAX_INPUT_SIZE) -> bytes:
|
||
"""Convert string to bytes with size limit. Returns raw bytes."""
|
||
if len(data) > max_size:
|
||
raise ValueError(f"Input too long ({len(data)} bytes); max {max_size}")
|
||
return data.encode("utf-8", errors="replace")
|
||
|
||
def bytes_to_safe_string(b: bytes) -> str:
|
||
"""Return printable representation, using hex if necessary."""
|
||
try:
|
||
text = b.decode("utf-8", errors="strict")
|
||
if all(32 <= ord(c) <= 126 or c in '\n\r\t' for c in text):
|
||
return text
|
||
except UnicodeDecodeError:
|
||
pass
|
||
return b.hex()
|
||
|
||
def shannon_entropy(data: bytes) -> float:
|
||
"""Calculate Shannon entropy of byte sequence."""
|
||
if not data:
|
||
return 0.0
|
||
freq = collections.Counter(data)
|
||
length = len(data)
|
||
entropy = 0.0
|
||
for count in freq.values():
|
||
prob = count / length
|
||
if prob > 0:
|
||
entropy -= prob * math.log2(prob)
|
||
return entropy
|
||
|
||
# ---------- Base encoding/decoding operations ----------
|
||
@register_op("base64", "Base64 encode / decode", "Encoding", arg_names=["subcmd"])
|
||
async def op_base64(subcmd: str, text: str) -> str:
|
||
sub = subcmd.lower()
|
||
raw = validate_input(text)
|
||
if sub in ("encode", "enc", "e"):
|
||
return base64.b64encode(raw).decode()
|
||
elif sub in ("decode", "dec", "d"):
|
||
try:
|
||
return base64.b64decode(raw).decode("utf-8", errors="replace")
|
||
except Exception as e:
|
||
return f"Base64 decode error: {e}"
|
||
else:
|
||
raise ValueError("Use 'encode' or 'decode'")
|
||
|
||
@register_op("base32", "Base32 encode / decode", "Encoding", arg_names=["subcmd"])
|
||
async def op_base32(subcmd: str, text: str) -> str:
|
||
sub = subcmd.lower()
|
||
raw = validate_input(text)
|
||
if sub in ("encode", "enc", "e"):
|
||
return base64.b32encode(raw).decode()
|
||
elif sub in ("decode", "dec", "d"):
|
||
try:
|
||
# Add padding automatically if missing
|
||
processed = raw.rstrip(b"=")
|
||
processed += b"=" * ((8 - len(processed) % 8) % 8)
|
||
return base64.b32decode(processed).decode("utf-8", errors="replace")
|
||
except Exception as e:
|
||
return f"Base32 decode error: {e}"
|
||
else:
|
||
raise ValueError("Use 'encode' or 'decode'")
|
||
|
||
@register_op("hex", "Hex encode / decode", "Encoding", arg_names=["subcmd"])
|
||
async def op_hex(subcmd: str, text: str) -> str:
|
||
sub = subcmd.lower()
|
||
raw = validate_input(text)
|
||
if sub in ("encode", "enc", "e"):
|
||
return raw.hex()
|
||
elif sub in ("decode", "dec", "d"):
|
||
try:
|
||
clean = text.replace(" ", "").replace("\n", "")
|
||
return bytes.fromhex(clean).decode("utf-8", errors="replace")
|
||
except Exception as e:
|
||
return f"Hex decode error: {e}"
|
||
else:
|
||
raise ValueError("Use 'encode' or 'decode'")
|
||
|
||
@register_op("url", "URL encode / decode", "Encoding", arg_names=["subcmd"])
|
||
async def op_url(subcmd: str, text: str) -> str:
|
||
sub = subcmd.lower()
|
||
if sub in ("encode", "enc", "e"):
|
||
return urllib.parse.quote(text, safe="")
|
||
elif sub in ("decode", "dec", "d"):
|
||
return urllib.parse.unquote(text)
|
||
else:
|
||
raise ValueError("Use 'encode' or 'decode'")
|
||
|
||
@register_op("html", "HTML entity encode / decode", "Encoding", arg_names=["subcmd"])
|
||
async def op_html(subcmd: str, text: str) -> str:
|
||
sub = subcmd.lower()
|
||
if sub in ("encode", "enc", "e"):
|
||
return text.replace("&", "&").replace("<", "<").replace(">", ">").replace('"', """).replace("'", "'")
|
||
elif sub in ("decode", "dec", "d"):
|
||
import html
|
||
return html.unescape(text)
|
||
else:
|
||
raise ValueError("Use 'encode' or 'decode'")
|
||
|
||
@register_op("unicode", "Unicode escape / unescape", "Encoding", arg_names=["subcmd"])
|
||
async def op_unicode(subcmd: str, text: str) -> str:
|
||
sub = subcmd.lower()
|
||
if sub in ("encode", "enc", "e"):
|
||
return text.encode("unicode_escape").decode("utf-8")
|
||
elif sub in ("decode", "dec", "d"):
|
||
return text.encode().decode("unicode_escape")
|
||
else:
|
||
raise ValueError("Use 'encode' or 'decode'")
|
||
|
||
@register_op("binary", "Text to binary string", "Encoding", arg_names=["subcmd"])
|
||
async def op_binary(subcmd: str, text: str) -> str:
|
||
sub = subcmd.lower()
|
||
if sub in ("encode", "enc", "e"):
|
||
return ' '.join(format(ord(c), '08b') for c in text)
|
||
elif sub in ("decode", "dec", "d"):
|
||
try:
|
||
bin_strs = text.split()
|
||
chars = [chr(int(b, 2)) for b in bin_strs if set(b).issubset({"0", "1"})]
|
||
return ''.join(chars)
|
||
except Exception:
|
||
return "Invalid binary string"
|
||
else:
|
||
raise ValueError("Use 'encode' or 'decode'")
|
||
|
||
@register_op("rot13", "Apply ROT13 (Caesar cipher)", "Encoding")
|
||
async def op_rot13(text: str) -> str:
|
||
return codecs.encode(text, 'rot13')
|
||
|
||
@register_op("morse", "Morse code encode / decode", "Encoding", arg_names=["subcmd"])
|
||
async def op_morse(subcmd: str, text: str) -> str:
|
||
MORSE_ENCODE = {
|
||
'A': '.-', 'B': '-...', 'C': '-.-.', 'D': '-..', 'E': '.', 'F': '..-.',
|
||
'G': '--.', 'H': '....', 'I': '..', 'J': '.---', 'K': '-.-', 'L': '.-..',
|
||
'M': '--', 'N': '-.', 'O': '---', 'P': '.--.', 'Q': '--.-', 'R': '.-.',
|
||
'S': '...', 'T': '-', 'U': '..-', 'V': '...-', 'W': '.--', 'X': '-..-',
|
||
'Y': '-.--', 'Z': '--..',
|
||
'0': '-----', '1': '.----', '2': '..---', '3': '...--', '4': '....-',
|
||
'5': '.....', '6': '-....', '7': '--...', '8': '---..', '9': '----.',
|
||
' ': '/'
|
||
}
|
||
MORSE_DECODE = {v: k for k, v in MORSE_ENCODE.items()}
|
||
sub = subcmd.lower()
|
||
if sub in ("encode", "enc", "e"):
|
||
return ' '.join(MORSE_ENCODE.get(c.upper(), '?') for c in text)
|
||
elif sub in ("decode", "dec", "d"):
|
||
try:
|
||
symbols = text.strip().split()
|
||
return ''.join(MORSE_DECODE.get(s, '?') for s in symbols)
|
||
except Exception:
|
||
return "Invalid Morse code"
|
||
else:
|
||
raise ValueError("Use 'encode' or 'decode'")
|
||
|
||
# ---------- Cryptography ----------
|
||
@register_op("xor", "XOR with a single-byte key (hex)", "Cryptography", arg_names=["key_hex"])
|
||
async def op_xor(key_hex: str, text: str) -> str:
|
||
try:
|
||
key = int(key_hex, 16) & 0xFF
|
||
except ValueError:
|
||
raise ValueError("Key must be a hex byte (00-FF)")
|
||
raw = validate_input(text)
|
||
result = bytes(b ^ key for b in raw)
|
||
return bytes_to_safe_string(result)
|
||
|
||
@register_op("aes", "AES encrypt/decrypt (CBC mode, PKCS7, key+IV from hex)", "Cryptography",
|
||
arg_names=["subcmd", "key_hex", "iv_hex"])
|
||
async def op_aes(subcmd: str, key_hex: str, iv_hex: str, text: str) -> str:
|
||
if not HAS_CRYPTOGRAPHY:
|
||
return "Error: cryptography library not installed"
|
||
try:
|
||
key = bytes.fromhex(key_hex)
|
||
iv = bytes.fromhex(iv_hex)
|
||
if len(key) not in (16, 24, 32):
|
||
raise ValueError("AES key must be 16/24/32 bytes (32/48/64 hex chars)")
|
||
if len(iv) != 16:
|
||
raise ValueError("IV must be 16 bytes (32 hex chars)")
|
||
except Exception as e:
|
||
return f"Key/IV error: {e}"
|
||
algorithm = algorithms.AES(key)
|
||
cipher = Cipher(algorithm, modes.CBC(iv))
|
||
sub = subcmd.lower()
|
||
raw = validate_input(text) if sub in ("encrypt", "enc") else bytes.fromhex(text)
|
||
if sub in ("encrypt", "enc"):
|
||
padder = padding.PKCS7(128).padder()
|
||
padded = padder.update(raw) + padder.finalize()
|
||
encryptor = cipher.encryptor()
|
||
ct = encryptor.update(padded) + encryptor.finalize()
|
||
return ct.hex()
|
||
elif sub in ("decrypt", "dec"):
|
||
try:
|
||
decryptor = cipher.decryptor()
|
||
padded = decryptor.update(raw) + decryptor.finalize()
|
||
unpadder = padding.PKCS7(128).unpadder()
|
||
data = unpadder.update(padded) + unpadder.finalize()
|
||
return data.decode("utf-8", errors="replace")
|
||
except Exception as e:
|
||
return f"AES decrypt error: {e}"
|
||
else:
|
||
raise ValueError("Use 'encrypt' or 'decrypt'")
|
||
|
||
@register_op("chacha20", "ChaCha20 encrypt / decrypt (key+nonce from hex)", "Cryptography",
|
||
arg_names=["subcmd", "key_hex", "nonce_hex"])
|
||
async def op_chacha20(subcmd: str, key_hex: str, nonce_hex: str, text: str) -> str:
|
||
if not HAS_CRYPTOGRAPHY:
|
||
return "Error: cryptography library not installed"
|
||
try:
|
||
key = bytes.fromhex(key_hex)
|
||
nonce = bytes.fromhex(nonce_hex)
|
||
if len(key) != 32:
|
||
raise ValueError("Key must be 32 bytes (64 hex)")
|
||
if len(nonce) != 16:
|
||
raise ValueError("Nonce must be 16 bytes (32 hex)")
|
||
except Exception as e:
|
||
return f"Key/nonce error: {e}"
|
||
algorithm = algorithms.ChaCha20(key, nonce)
|
||
cipher = Cipher(algorithm, mode=None)
|
||
sub = subcmd.lower()
|
||
raw = validate_input(text) if sub in ("encrypt", "enc") else bytes.fromhex(text)
|
||
if sub in ("encrypt", "enc"):
|
||
encryptor = cipher.encryptor()
|
||
ct = encryptor.update(raw)
|
||
return ct.hex()
|
||
elif sub in ("decrypt", "dec"):
|
||
decryptor = cipher.decryptor()
|
||
return decryptor.update(raw).decode("utf-8", errors="replace")
|
||
else:
|
||
raise ValueError("Use 'encrypt' or 'decrypt'")
|
||
|
||
@register_op("rsa", "RSA encrypt/decrypt (PEM key as separate arg, OAEP SHA256)", "Cryptography",
|
||
arg_names=["subcmd", "key_pem"])
|
||
async def op_rsa(subcmd: str, key_pem: str, text: str) -> str:
|
||
if not HAS_CRYPTOGRAPHY:
|
||
return "Error: cryptography library not installed"
|
||
sub = subcmd.lower()
|
||
try:
|
||
if sub in ("encrypt", "enc"):
|
||
pub_key = serialization.load_pem_public_key(key_pem.encode())
|
||
raw = validate_input(text)
|
||
ct = pub_key.encrypt(
|
||
raw,
|
||
asym_padding.OAEP(
|
||
mgf=asym_padding.MGF1(algorithm=hashes.SHA256()),
|
||
algorithm=hashes.SHA256(),
|
||
label=None
|
||
)
|
||
)
|
||
return base64.b64encode(ct).decode()
|
||
elif sub in ("decrypt", "dec"):
|
||
priv_key = serialization.load_pem_private_key(key_pem.encode(), password=None)
|
||
raw = base64.b64decode(validate_input(text))
|
||
pt = priv_key.decrypt(
|
||
raw,
|
||
asym_padding.OAEP(
|
||
mgf=asym_padding.MGF1(algorithm=hashes.SHA256()),
|
||
algorithm=hashes.SHA256(),
|
||
label=None
|
||
)
|
||
)
|
||
return pt.decode("utf-8", errors="replace")
|
||
else:
|
||
raise ValueError("Use 'encrypt' or 'decrypt'")
|
||
except Exception as e:
|
||
return f"RSA error: {e}"
|
||
|
||
# Hash functions (sync, very fast)
|
||
def _hash_generic(data: str, algo: str) -> str:
|
||
raw = validate_input(data)
|
||
h = hashlib.new(algo)
|
||
h.update(raw)
|
||
return h.hexdigest()
|
||
|
||
for name, algo in [("md5", "md5"), ("sha1", "sha1"), ("sha256", "sha256"),
|
||
("sha512", "sha512"), ("sha3-256", "sha3_256"), ("sha3-512", "sha3_512")]:
|
||
@register_op(name, f"{name.upper()} hash", "Cryptography")
|
||
async def _op_hash(data: str, algo=algo) -> str:
|
||
return _hash_generic(data, algo)
|
||
|
||
@register_op("hmac", "HMAC with given algorithm (key in hex)", "Cryptography",
|
||
arg_names=["algo", "key_hex"])
|
||
async def op_hmac(algo: str, key_hex: str, data: str) -> str:
|
||
raw = validate_input(data)
|
||
try:
|
||
key = bytes.fromhex(key_hex)
|
||
except ValueError:
|
||
return "Invalid key hex"
|
||
algo_map = {"md5": "md5", "sha1": "sha1", "sha256": "sha256", "sha512": "sha512"}
|
||
if algo not in algo_map:
|
||
return f"Supported algos: {', '.join(algo_map)}"
|
||
h = stdlib_hmac.new(key, raw, algo_map[algo])
|
||
return h.hexdigest()
|
||
|
||
@register_op("bcrypt", "Bcrypt hash / verify (prefix: $2b$)", "Cryptography",
|
||
arg_names=["subcmd", "hash_or_rounds"])
|
||
async def op_bcrypt(subcmd: str, hash_or_rounds: str, text: str) -> str:
|
||
if not HAS_BCRYPT:
|
||
return "Error: bcrypt library not installed"
|
||
sub = subcmd.lower()
|
||
if sub in ("hash", "h"):
|
||
try:
|
||
rounds = int(hash_or_rounds)
|
||
except ValueError:
|
||
return "Provide number of rounds (e.g. 12)"
|
||
salt = bcrypt.gensalt(rounds=rounds)
|
||
return bcrypt.hashpw(text.encode(), salt).decode()
|
||
elif sub in ("verify", "v"):
|
||
try:
|
||
ok = bcrypt.checkpw(text.encode(), hash_or_rounds.encode())
|
||
return "Match" if ok else "No match"
|
||
except Exception as e:
|
||
return f"Bcrypt verify error: {e}"
|
||
else:
|
||
raise ValueError("Use 'hash' or 'verify'")
|
||
|
||
@register_op("argon2", "Argon2 hash / verify (phc format)", "Cryptography",
|
||
arg_names=["subcmd", "params_or_hash"])
|
||
async def op_argon2(subcmd: str, params_or_hash: str, text: str) -> str:
|
||
if not HAS_ARGON2:
|
||
return "Error: argon2-cffi not installed"
|
||
from argon2 import PasswordHasher
|
||
ph = PasswordHasher()
|
||
sub = subcmd.lower()
|
||
if sub in ("hash", "h"):
|
||
# params_or_hash could be e.g. "time_cost=3,memory_cost=65536,parallelism=4"
|
||
kwargs = {}
|
||
for part in params_or_hash.split(","):
|
||
if "=" in part:
|
||
k, v = part.split("=", 1)
|
||
kwargs[k.strip()] = int(v.strip())
|
||
try:
|
||
return ph.hash(text, **kwargs)
|
||
except Exception as e:
|
||
return f"Argon2 hash error: {e}"
|
||
elif sub in ("verify", "v"):
|
||
try:
|
||
ph.verify(params_or_hash, text)
|
||
return "Match"
|
||
except Exception:
|
||
return "No match"
|
||
else:
|
||
raise ValueError("Use 'hash' or 'verify'")
|
||
|
||
@register_op("pbkdf2", "PBKDF2 derive key (hex output)", "Cryptography",
|
||
arg_names=["salt_hex", "iterations", "dklen", "algo"])
|
||
async def op_pbkdf2(salt_hex: str, iterations: str, dklen: str, algo: str, text: str) -> str:
|
||
salt = bytes.fromhex(salt_hex)
|
||
try:
|
||
rounds = int(iterations)
|
||
length = int(dklen)
|
||
except ValueError:
|
||
return "Invalid iterations/length"
|
||
if algo not in ("sha256", "sha512"):
|
||
return "algo must be sha256 or sha512"
|
||
dk = hashlib.pbkdf2_hmac(algo, text.encode(), salt, rounds, dklen=length)
|
||
return dk.hex()
|
||
|
||
# ---------- Compression ----------
|
||
@register_op("gzip", "gzip compress / decompress", "Compression", arg_names=["subcmd"])
|
||
async def op_gzip(subcmd: str, text: str) -> str:
|
||
sub = subcmd.lower()
|
||
raw = validate_input(text)
|
||
if sub in ("compress", "comp", "c"):
|
||
return base64.b64encode(gzip.compress(raw)).decode()
|
||
elif sub in ("decompress", "decomp", "d"):
|
||
try:
|
||
decompressed = gzip.decompress(base64.b64decode(raw))
|
||
return decompressed.decode("utf-8", errors="replace")
|
||
except Exception as e:
|
||
return f"gzip decompress error: {e}"
|
||
else:
|
||
raise ValueError("Use 'compress' or 'decompress'")
|
||
|
||
@register_op("zlib", "zlib compress / decompress", "Compression", arg_names=["subcmd"])
|
||
async def op_zlib(subcmd: str, text: str) -> str:
|
||
sub = subcmd.lower()
|
||
raw = validate_input(text)
|
||
if sub in ("compress", "comp", "c"):
|
||
return base64.b64encode(zlib.compress(raw)).decode()
|
||
elif sub in ("decompress", "decomp", "d"):
|
||
try:
|
||
decompressed = zlib.decompress(base64.b64decode(raw))
|
||
return decompressed.decode("utf-8", errors="replace")
|
||
except Exception as e:
|
||
return f"zlib decompress error: {e}"
|
||
else:
|
||
raise ValueError("Use 'compress' or 'decompress'")
|
||
|
||
@register_op("bzip2", "bzip2 compress / decompress", "Compression", arg_names=["subcmd"])
|
||
async def op_bzip2(subcmd: str, text: str) -> str:
|
||
import bz2
|
||
sub = subcmd.lower()
|
||
raw = validate_input(text)
|
||
if sub in ("compress", "comp", "c"):
|
||
return base64.b64encode(bz2.compress(raw)).decode()
|
||
elif sub in ("decompress", "decomp", "d"):
|
||
try:
|
||
decompressed = bz2.decompress(base64.b64decode(raw))
|
||
return decompressed.decode("utf-8", errors="replace")
|
||
except Exception as e:
|
||
return f"bzip2 decompress error: {e}"
|
||
else:
|
||
raise ValueError("Use 'compress' or 'decompress'")
|
||
|
||
@register_op("lzma", "LZMA compress / decompress", "Compression", arg_names=["subcmd"])
|
||
async def op_lzma(subcmd: str, text: str) -> str:
|
||
sub = subcmd.lower()
|
||
raw = validate_input(text)
|
||
if sub in ("compress", "comp", "c"):
|
||
return base64.b64encode(lzma.compress(raw)).decode()
|
||
elif sub in ("decompress", "decomp", "d"):
|
||
try:
|
||
decompressed = lzma.decompress(base64.b64decode(raw))
|
||
return decompressed.decode("utf-8", errors="replace")
|
||
except Exception as e:
|
||
return f"LZMA decompress error: {e}"
|
||
else:
|
||
raise ValueError("Use 'compress' or 'decompress'")
|
||
|
||
# ---------- Data processing ----------
|
||
@register_op("json", "JSON format / validate", "Data Processing", arg_names=["subcmd"])
|
||
async def op_json(subcmd: str, text: str) -> str:
|
||
sub = subcmd.lower()
|
||
if sub in ("format", "pretty", "p"):
|
||
try:
|
||
parsed = json.loads(text)
|
||
return json.dumps(parsed, indent=2, ensure_ascii=False)
|
||
except json.JSONDecodeError as e:
|
||
return f"Invalid JSON: {e}"
|
||
elif sub in ("validate", "check", "v"):
|
||
try:
|
||
json.loads(text)
|
||
return "Valid JSON"
|
||
except json.JSONDecodeError as e:
|
||
return f"Invalid JSON: {e}"
|
||
else:
|
||
raise ValueError("Use 'format' or 'validate'")
|
||
|
||
@register_op("xml", "XML format (pretty-print)", "Data Processing", arg_names=["subcmd"])
|
||
async def op_xml(subcmd: str, text: str) -> str:
|
||
sub = subcmd.lower()
|
||
if sub not in ("format", "pretty", "p"):
|
||
raise ValueError("Use 'format'")
|
||
if HAS_LXML:
|
||
try:
|
||
parser = lxml_etree.XMLParser(remove_blank_text=True)
|
||
tree = lxml_etree.fromstring(text.encode(), parser)
|
||
return lxml_etree.tostring(tree, pretty_print=True, encoding="unicode")
|
||
except Exception as e:
|
||
return f"XML error: {e}"
|
||
else:
|
||
try:
|
||
ET.fromstring(text) # validate
|
||
# Simple pretty-print using minidom
|
||
import xml.dom.minidom as minidom
|
||
dom = minidom.parseString(text)
|
||
return dom.toprettyxml(indent=" ")
|
||
except Exception as e:
|
||
return f"XML error: {e}"
|
||
|
||
@register_op("yaml", "YAML format / convert to JSON", "Data Processing", arg_names=["subcmd"])
|
||
async def op_yaml(subcmd: str, text: str) -> str:
|
||
if not HAS_YAML:
|
||
return "Error: PyYAML not installed"
|
||
sub = subcmd.lower()
|
||
if sub in ("format", "pretty", "p"):
|
||
try:
|
||
data = yaml.safe_load(text)
|
||
return yaml.dump(data, default_flow_style=False, sort_keys=False)
|
||
except Exception as e:
|
||
return f"YAML error: {e}"
|
||
elif sub == "tojson":
|
||
try:
|
||
data = yaml.safe_load(text)
|
||
return json.dumps(data, indent=2, ensure_ascii=False)
|
||
except Exception as e:
|
||
return f"YAML to JSON error: {e}"
|
||
else:
|
||
raise ValueError("Use 'format' or 'tojson'")
|
||
|
||
@register_op("csv", "CSV parse (first row as header)", "Data Processing")
|
||
async def op_csv(text: str) -> str:
|
||
try:
|
||
reader = csv.DictReader(io.StringIO(text))
|
||
rows = list(reader)
|
||
if not rows:
|
||
return "No rows"
|
||
return f"Columns: {list(rows[0].keys())}\nFirst row: {rows[0]}"
|
||
except Exception as e:
|
||
return f"CSV error: {e}"
|
||
|
||
@register_op("asn1", "ASN.1 parse (DER base64 input)", "Data Processing")
|
||
async def op_asn1(text: str) -> str:
|
||
if not HAS_ASN1:
|
||
return "Error: asn1crypto not installed"
|
||
try:
|
||
der = base64.b64decode(text)
|
||
parsed = asn1core.load(der)
|
||
return str(parsed)
|
||
except Exception as e:
|
||
return f"ASN.1 parse error: {e}"
|
||
|
||
@register_op("pemder", "PEM/DER conversion (to PEM, to DER)", "Data Processing",
|
||
arg_names=["subcmd"])
|
||
async def op_pemder(subcmd: str, text: str) -> str:
|
||
if not HAS_CRYPTOGRAPHY:
|
||
return "Error: cryptography library not installed"
|
||
sub = subcmd.lower()
|
||
try:
|
||
raw = validate_input(text)
|
||
if sub in ("topem", "pem"):
|
||
# Assume raw is DER, convert to PEM via dummy load
|
||
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, PublicFormat, NoEncryption
|
||
try:
|
||
priv = serialization.load_der_private_key(raw, password=None)
|
||
return priv.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode()
|
||
except Exception:
|
||
try:
|
||
pub = serialization.load_der_public_key(raw)
|
||
return pub.public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo).decode()
|
||
except Exception:
|
||
return "Not a recognized DER key; wrapping as generic PEM:\n" + \
|
||
"-----BEGIN CERTIFICATE-----\n" + base64.b64encode(raw).decode() + "\n-----END CERTIFICATE-----"
|
||
elif sub in ("toder", "der"):
|
||
# Assume PEM, strip headers and decode
|
||
lines = text.splitlines()
|
||
b64data = "".join(line.strip() for line in lines if not line.startswith("-----"))
|
||
return base64.b64decode(b64data).hex()
|
||
else:
|
||
raise ValueError("Use 'topem' or 'toder'")
|
||
except Exception as e:
|
||
return f"PEM/DER error: {e}"
|
||
|
||
# ---------- Forensics ----------
|
||
@register_op("entropy", "Shannon entropy of input", "Forensics")
|
||
async def op_entropy(text: str) -> str:
|
||
raw = validate_input(text)
|
||
ent = shannon_entropy(raw)
|
||
return f"Entropy: {ent:.4f} bits/byte"
|
||
|
||
@register_op("ioc", "Extract IOCs (IPs, domains, URLs, emails)", "Forensics")
|
||
async def op_ioc(text: str) -> str:
|
||
ips = re.findall(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b', text)
|
||
domains = re.findall(r'(?:(?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,})\b', text)
|
||
urls = re.findall(r'https?://[^\s]+', text)
|
||
emails = re.findall(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', text)
|
||
hashes = re.findall(r'\b[a-fA-F0-9]{32,64}\b', text)
|
||
out = []
|
||
if ips: out.append("IPs: " + ", ".join(set(ips)))
|
||
if domains: out.append("Domains: " + ", ".join(set(domains)))
|
||
if urls: out.append("URLs: " + ", ".join(set(urls)))
|
||
if emails: out.append("Emails: " + ", ".join(set(emails)))
|
||
if hashes: out.append("Hashes: " + ", ".join(set(hashes)))
|
||
return "\n".join(out) if out else "No IOCs found"
|
||
|
||
@register_op("strings", "Extract ASCII strings (min 4 chars)", "Forensics")
|
||
async def op_strings(text: str) -> str:
|
||
raw = validate_input(text)
|
||
pattern = re.compile(b'[\x20-\x7E]{4,}')
|
||
found = pattern.findall(raw)
|
||
decoded = [s.decode("ascii", errors="replace") for s in found]
|
||
return "\n".join(decoded) if decoded else "No strings found"
|
||
|
||
@register_op("filemagic", "Detect file type by magic bytes", "Forensics")
|
||
async def op_filemagic(text: str) -> str:
|
||
# Common signatures: (offset, hex bytes, description)
|
||
signatures = [
|
||
(0, "89504E47", "PNG image"),
|
||
(0, "FFD8FF", "JPEG image"),
|
||
(0, "47494638", "GIF image"),
|
||
(0, "25504446", "PDF document"),
|
||
(0, "504B0304", "ZIP archive"),
|
||
(0, "52617221", "RAR archive"),
|
||
(0, "4D5A", "PE executable (MZ)"),
|
||
(0, "7F454C46", "ELF executable"),
|
||
(0, "CAFEBABE", "Mach-O binary"),
|
||
(0, "D0CF11E0A1B11AE1", "MS Office 97-2003"),
|
||
]
|
||
raw = validate_input(text)
|
||
hexdata = raw.hex().upper()
|
||
for offset, magic, desc in signatures:
|
||
if hexdata.startswith(magic, offset*2):
|
||
return f"Detected: {desc}"
|
||
return "Unknown file type"
|
||
|
||
@register_op("base64blob", "Find Base64-encoded blobs in text", "Forensics")
|
||
async def op_base64blob(text: str) -> str:
|
||
pattern = re.compile(r'(?:[A-Za-z0-9+/]{4}){10,}(?:[A-Za-z0-9+/]{2}==|[A-Za-z0-9+/]{3}=)?')
|
||
matches = pattern.findall(text)
|
||
if not matches:
|
||
return "No Base64 blobs found"
|
||
out_lines = []
|
||
for i, m in enumerate(matches[:5], 1):
|
||
out_lines.append(f"Blob {i}: {m[:50]}...")
|
||
try:
|
||
decoded = base64.b64decode(m)
|
||
out_lines.append(f" Decoded (head 50): {decoded[:50]}")
|
||
except Exception:
|
||
out_lines.append(" (invalid base64, shown raw)")
|
||
return "\n".join(out_lines)
|
||
|
||
@register_op("xbrute", "XOR single-byte brute force (show top 5 printable)", "Forensics")
|
||
async def op_xbrute(text: str) -> str:
|
||
raw = validate_input(text)
|
||
results = []
|
||
for key in range(256):
|
||
xored = bytes(b ^ key for b in raw)
|
||
try:
|
||
decoded = xored.decode("utf-8", errors="strict")
|
||
printable_count = sum(1 for c in decoded if 32 <= ord(c) <= 126 or c in '\n\r\t')
|
||
if printable_count > len(decoded) * 0.8: # >80% printable
|
||
results.append((key, printable_count, decoded[:200]))
|
||
except UnicodeDecodeError:
|
||
pass
|
||
if not results:
|
||
return "No high-printable XOR keys found"
|
||
results.sort(key=lambda x: x[1], reverse=True)
|
||
top = results[:5]
|
||
out = []
|
||
for key, cnt, sample in top:
|
||
out.append(f"Key 0x{key:02x} ({key}) - printable {cnt}/{len(raw)}:\n{sample}")
|
||
return "\n---\n".join(out)
|
||
|
||
@register_op("yara", "Scan input with YARA rule (base64 encoded rule)", "Forensics",
|
||
arg_names=["rule_b64"])
|
||
async def op_yara(rule_b64: str, text: str) -> str:
|
||
if not HAS_YARA:
|
||
return "Error: yara-python not installed"
|
||
try:
|
||
rule_text = base64.b64decode(rule_b64).decode()
|
||
compiled = yara.compile(source=rule_text)
|
||
raw = validate_input(text)
|
||
matches = compiled.match(data=raw)
|
||
if matches:
|
||
return "\n".join(str(m) for m in matches)
|
||
return "No matches"
|
||
except Exception as e:
|
||
return f"YARA error: {e}"
|
||
|
||
@register_op("peinfo", "Basic PE header analysis (hex input)", "Forensics")
|
||
async def op_peinfo(text: str) -> str:
|
||
raw = validate_input(text)
|
||
if raw[:2] != b'MZ':
|
||
return "Not an MZ executable"
|
||
try:
|
||
pe_offset = struct.unpack('<I', raw[0x3C:0x40])[0]
|
||
if raw[pe_offset:pe_offset+4] != b'PE\x00\x00':
|
||
return "PE signature not found"
|
||
# COFF header
|
||
coff = raw[pe_offset+4:pe_offset+24]
|
||
machine = struct.unpack('<H', coff[0:2])[0]
|
||
num_sections = struct.unpack('<H', coff[2:4])[0]
|
||
timestamp = struct.unpack('<I', coff[4:8])[0]
|
||
# Optional header size
|
||
opt_hdr_size = struct.unpack('<H', coff[16:18])[0]
|
||
# Optional header (first bytes)
|
||
opt_start = pe_offset+24
|
||
magic = struct.unpack('<H', raw[opt_start:opt_start+2])[0]
|
||
is_pe32plus = magic == 0x20b
|
||
entry_point_rva = struct.unpack('<I', raw[opt_start+16:opt_start+20])[0] if not is_pe32plus else struct.unpack('<I', raw[opt_start+16:opt_start+20])[0] # simplified
|
||
image_base = struct.unpack('<I', raw[opt_start+28:opt_start+32])[0] if not is_pe32plus else struct.unpack('<Q', raw[opt_start+24:opt_start+32])[0]
|
||
return (f"PE signature at offset {pe_offset}\n"
|
||
f"Machine: 0x{machine:04x}\n"
|
||
f"Timestamp: {time.ctime(timestamp)}\n"
|
||
f"Sections: {num_sections}\n"
|
||
f"Optional header size: {opt_hdr_size}\n"
|
||
f"PE32+ (64-bit): {is_pe32plus}\n"
|
||
f"Entry point RVA: 0x{entry_point_rva:x}\n"
|
||
f"Image base: 0x{image_base:x}")
|
||
except Exception as e:
|
||
return f"PE parse error: {e}"
|
||
|
||
# ---------- Networking utilities ----------
|
||
@register_op("cidr", "CIDR/subnet calculator", "Networking")
|
||
async def op_cidr(text: str) -> str:
|
||
try:
|
||
net = ipaddress.ip_network(text.strip(), strict=False)
|
||
return (f"Network: {net.network_address}\n"
|
||
f"Netmask: {net.netmask}\n"
|
||
f"Broadcast: {net.broadcast_address}\n"
|
||
f"Hosts: {net.num_addresses}")
|
||
except Exception as e:
|
||
return f"CIDR error: {e}"
|
||
|
||
@register_op("ipconv", "IP conversion (decimal, hex, binary)", "Networking",
|
||
arg_names=["subcmd"])
|
||
async def op_ipconv(subcmd: str, text: str) -> str:
|
||
sub = subcmd.lower()
|
||
try:
|
||
ip = ipaddress.ip_address(text.strip())
|
||
if sub in ("hex", "h"):
|
||
return format(int(ip), 'x')
|
||
elif sub in ("dec", "d"):
|
||
return str(int(ip))
|
||
elif sub in ("bin", "b"):
|
||
return format(int(ip), '032b' if ip.version == 4 else '0128b')
|
||
else:
|
||
raise ValueError("Use 'hex', 'dec', or 'bin'")
|
||
except Exception as e:
|
||
return f"IP conversion error: {e}"
|
||
|
||
@register_op("urlparse", "Parse URL components", "Networking")
|
||
async def op_urlparse(text: str) -> str:
|
||
import urllib.parse
|
||
parts = urllib.parse.urlparse(text.strip())
|
||
return (f"Scheme: {parts.scheme}\n"
|
||
f"Host: {parts.hostname}\n"
|
||
f"Port: {parts.port}\n"
|
||
f"Path: {parts.path}\n"
|
||
f"Query: {parts.query}\n"
|
||
f"Fragment: {parts.fragment}")
|
||
|
||
@register_op("dns", "DNS lookup (A record) [requires internet; offline may fail]", "Networking")
|
||
async def op_dns(text: str) -> str:
|
||
import socket
|
||
try:
|
||
result = socket.getaddrinfo(text.strip(), None)
|
||
ips = set(addr[4][0] for addr in result)
|
||
return "\n".join(ips)
|
||
except Exception as e:
|
||
return f"DNS error: {e}"
|
||
|
||
# ---------- Recipe system ----------
|
||
class Recipe:
|
||
"""A sequence of operations to apply."""
|
||
def __init__(self, steps: List[Dict[str, Any]]):
|
||
self.steps = steps
|
||
|
||
@staticmethod
|
||
def from_json(json_str: str) -> "Recipe":
|
||
"""Load recipe from JSON string. Format: {"steps": [{"op":"name","args":[...]}, ...]}"""
|
||
data = json.loads(json_str)
|
||
if "steps" not in data:
|
||
raise ValueError("JSON must contain 'steps' list")
|
||
steps = []
|
||
for step in data["steps"]:
|
||
op_name = step["op"]
|
||
args = step.get("args", [])
|
||
steps.append({"op": op_name, "args": args})
|
||
return Recipe(steps)
|
||
|
||
async def run(self, input_data: str, operations: Dict[str, Operation]) -> str:
|
||
data = input_data
|
||
for step in self.steps:
|
||
op_name = step["op"]
|
||
if op_name not in operations:
|
||
raise KeyError(f"Unknown operation: {op_name}")
|
||
op = operations[op_name]
|
||
# Combine provided args with the current data as the last arg
|
||
args = step["args"] + [data]
|
||
# Ensure enough args
|
||
if len(args) < len(op.arg_names) + 1:
|
||
raise ValueError(f"Insufficient arguments for {op_name}")
|
||
try:
|
||
data = await asyncio.wait_for(op.func(*args), timeout=op.timeout)
|
||
except asyncio.TimeoutError:
|
||
data = f"Error: operation {op_name} timed out"
|
||
break
|
||
except Exception as e:
|
||
data = f"Error in {op_name}: {e}"
|
||
break
|
||
return data
|
||
|
||
@register_op("recipe", "Run a recipe (provide JSON and data)", "Recipes",
|
||
arg_names=["subcmd"])
|
||
async def op_recipe(subcmd: str, json_or_data: str, *extra: str) -> str:
|
||
sub = subcmd.lower()
|
||
if sub == "list":
|
||
cats = collections.defaultdict(list)
|
||
for op in OPERATIONS.values():
|
||
cats[op.category].append(op.name)
|
||
out = []
|
||
for cat in sorted(cats):
|
||
out.append(f"{cat}: {', '.join(cats[cat])}")
|
||
return "\n".join(out)
|
||
elif sub == "run":
|
||
# Usage: !encode recipe run '<json>' <data>
|
||
# Here json_or_data is the JSON string, extra[0] is the data
|
||
if not extra:
|
||
raise ValueError("Provide data after JSON recipe")
|
||
recipe_json = json_or_data
|
||
data = " ".join(extra) # extra is tuple
|
||
try:
|
||
recipe = Recipe.from_json(recipe_json)
|
||
return await recipe.run(data, OPERATIONS)
|
||
except Exception as e:
|
||
return f"Recipe error: {e}"
|
||
else:
|
||
raise ValueError("Use 'list' or 'run <json> <data>'")
|
||
|
||
# ---------- Main handler (interface to the bot) ----------
|
||
async def handle_command(room, message, bot, prefix, config):
|
||
"""
|
||
Entry point called by the bot framework.
|
||
"""
|
||
import simplematrixbotlib as botlib
|
||
match = botlib.MessageMatch(room, message, bot, prefix)
|
||
if not (match.is_not_from_this_bot() and match.prefix() and match.command("encode")):
|
||
return
|
||
|
||
args = match.args()
|
||
if not args:
|
||
await bot.api.send_text_message(
|
||
room.room_id,
|
||
"Usage: !encode <operation> [args...] <data>\n"
|
||
" !encode recipe list\n"
|
||
" !encode recipe run '<json>' <data>\n"
|
||
"Use !encode help for full list."
|
||
)
|
||
return
|
||
|
||
# Special help handling (now in details tags with markdown)
|
||
if args[0].lower() in ("help", "-h", "--help"):
|
||
if len(args) > 1 and args[1].lower() in OPERATIONS:
|
||
op = OPERATIONS[args[1].lower()]
|
||
html = f"<details><summary><strong>{op.name}</strong> ({op.category})</summary>"
|
||
html += f"<p>{op.description}</p>"
|
||
if op.arg_names:
|
||
html += f"<p>Arguments: {', '.join(op.arg_names)}</p>"
|
||
else:
|
||
html += "<p>No arguments required.</p>"
|
||
html += "</details>"
|
||
await bot.api.send_markdown_message(room.room_id, html)
|
||
else:
|
||
categories = collections.defaultdict(list)
|
||
for op in OPERATIONS.values():
|
||
categories[op.category].append(op.name)
|
||
html = "<details><summary><strong>Available operations by category</strong></summary>"
|
||
for cat in sorted(categories):
|
||
html += f"<p><b>{cat}</b></p><ul>"
|
||
for op_name in categories[cat]:
|
||
op = OPERATIONS[op_name]
|
||
html += f"<li><code>{op_name}</code>: {op.description}</li>"
|
||
html += "</ul>"
|
||
html += "<p>Use <code>!encode help <operation></code> for details.</p>"
|
||
html += "</details>"
|
||
await bot.api.send_markdown_message(room.room_id, html)
|
||
return
|
||
|
||
# Try to find operation
|
||
op_name = args[0].lower()
|
||
if op_name not in OPERATIONS:
|
||
await bot.api.send_text_message(
|
||
room.room_id,
|
||
f"Unknown operation '{op_name}'. Use !encode help for list."
|
||
)
|
||
return
|
||
|
||
operation = OPERATIONS[op_name]
|
||
# Determine how many positional args the operation needs (excluding the final data).
|
||
required_arg_count = len(operation.arg_names)
|
||
# The remaining args after the operation name. We need at least required_arg_count arguments
|
||
# plus the data. If there are fewer, show error.
|
||
provided_args = args[1:]
|
||
if len(provided_args) < required_arg_count:
|
||
await bot.api.send_text_message(
|
||
room.room_id,
|
||
f"Usage: !encode {op_name} {' '.join(f'<{a}>' for a in operation.arg_names)} <data>"
|
||
)
|
||
return
|
||
|
||
# Split into op-specific args and the data (last element gets the rest of the string)
|
||
if len(provided_args) >= required_arg_count:
|
||
pos_args = provided_args[:required_arg_count]
|
||
data_parts = provided_args[required_arg_count:]
|
||
data = " ".join(data_parts)
|
||
else:
|
||
# Not enough, handled above
|
||
return
|
||
|
||
# Enforce input size
|
||
if len(data) > MAX_INPUT_SIZE:
|
||
await bot.api.send_text_message(
|
||
room.room_id,
|
||
f"Input too large. Maximum {MAX_INPUT_SIZE} bytes."
|
||
)
|
||
return
|
||
|
||
# Execute with timeout
|
||
try:
|
||
result = await asyncio.wait_for(
|
||
operation.func(*pos_args, data),
|
||
timeout=operation.timeout
|
||
)
|
||
# Truncate huge output for chat friendliness
|
||
if len(result) > 4000:
|
||
result = result[:3990] + "\n...<truncated>"
|
||
await bot.api.send_text_message(room.room_id, result)
|
||
except asyncio.TimeoutError:
|
||
await bot.api.send_text_message(room.room_id, "Error: operation timed out.")
|
||
except Exception as e:
|
||
await bot.api.send_text_message(room.room_id, f"Error: {e}")
|
||
|
||
# Plugin Metadata
|
||
__version__ = "2.0.1"
|
||
__author__ = "Funguy Bot"
|
||
__description__ = "Comprehensive CyberChef-like encoding and analysis toolkit"
|
||
__help__ = """
|
||
<details>
|
||
<summary><strong>!encode</strong> – Comprehensive data manipulation toolkit</summary>
|
||
|
||
<p>
|
||
A CyberChef-like plugin with dozens of operations for encoding, cryptography,
|
||
compression, data processing, forensics, and networking. Fully offline.
|
||
</p>
|
||
|
||
<h3>Usage</h3>
|
||
<p><code>!encode <operation> [arguments] <data></code></p>
|
||
<p>For help on a specific operation: <code>!encode help <op></code></p>
|
||
|
||
<hr/>
|
||
|
||
<h3>Encoding</h3>
|
||
<ul>
|
||
<li><b>base64</b> – Base64 encode/decode<br>
|
||
<code>!encode base64 encode Hello World</code><br>
|
||
<code>!encode base64 decode SGVsbG8gV29ybGQ=</code></li>
|
||
|
||
<li><b>base32</b> – Base32 encode/decode<br>
|
||
<code>!encode base32 encode Hello</code><br>
|
||
<code>!encode base32 decode JBSWY3DPEBLW64TMMQQQ====</code></li>
|
||
|
||
<li><b>hex</b> – Hex encode/decode<br>
|
||
<code>!encode hex encode Secret</code><br>
|
||
<code>!encode hex decode 536563726574</code></li>
|
||
|
||
<li><b>url</b> – URL encode/decode<br>
|
||
<code>!encode url encode https://example.com/a b</code><br>
|
||
<code>!encode url decode https%3A%2F%2Fexample.com%2Fa+b</code></li>
|
||
|
||
<li><b>html</b> – HTML entity encode/decode<br>
|
||
<code>!encode html encode "<script>"</code><br>
|
||
<code>!encode html decode &lt;script&gt;</code></li>
|
||
|
||
<li><b>unicode</b> – Unicode escape/unescape<br>
|
||
<code>!encode unicode encode café</code><br>
|
||
<code>!encode unicode decode \\u0063\\u0061\\u0066\\u00e9</code></li>
|
||
|
||
<li><b>binary</b> – Text to binary and back<br>
|
||
<code>!encode binary encode Hi</code><br>
|
||
<code>!encode binary decode 01001000 01101001</code></li>
|
||
|
||
<li><b>rot13</b> – ROT13 cipher<br>
|
||
<code>!encode rot13 Uryyb Jbeyq</code></li>
|
||
|
||
<li><b>morse</b> – Morse code encode/decode<br>
|
||
<code>!encode morse encode SOS</code><br>
|
||
<code>!encode morse decode ... --- ...</code></li>
|
||
</ul>
|
||
|
||
<h3>Cryptography</h3>
|
||
<ul>
|
||
<li><b>xor</b> – XOR with single-byte key (hex)<br>
|
||
<code>!encode xor 41 Hello</code> (key = 0x41)</li>
|
||
|
||
<li><b>aes</b> – AES-CBC encrypt/decrypt (key+IV hex)<br>
|
||
<code>!encode aes encrypt 00112233445566778899aabbccddeeff 000102030405060708090a0b0c0d0e0f secret</code><br>
|
||
<code>!encode aes decrypt 00112233445566778899aabbccddeeff 000102030405060708090a0b0c0d0e0f <ciphertext hex></code></li>
|
||
|
||
<li><b>chacha20</b> – ChaCha20 encrypt/decrypt<br>
|
||
<code>!encode chacha20 encrypt <32-byte-key-hex> <16-byte-nonce-hex> message</code></li>
|
||
|
||
<li><b>rsa</b> – RSA-OAEP encrypt/decrypt (PEM key as argument)<br>
|
||
<code>!encode rsa encrypt "-----BEGIN PUBLIC KEY-----..." message</code></li>
|
||
|
||
<li><b>md5</b> – MD5 hash<br>
|
||
<code>!encode md5 hello</code></li>
|
||
<li><b>sha1</b> – SHA1 hash<br>
|
||
<code>!encode sha1 hello</code></li>
|
||
<li><b>sha256</b> – SHA256 hash<br>
|
||
<code>!encode sha256 hello</code></li>
|
||
<li><b>sha512</b> – SHA512 hash<br>
|
||
<code>!encode sha512 hello</code></li>
|
||
<li><b>sha3-256</b> – SHA3-256 hash<br>
|
||
<code>!encode sha3-256 hello</code></li>
|
||
<li><b>sha3-512</b> – SHA3-512 hash<br>
|
||
<code>!encode sha3-512 hello</code></li>
|
||
|
||
<li><b>hmac</b> – HMAC with given algorithm (key in hex)<br>
|
||
<code>!encode hmac sha256 6b6579 message</code></li>
|
||
|
||
<li><b>bcrypt</b> – Bcrypt hash/verify<br>
|
||
<code>!encode bcrypt hash 12 mypassword</code> (rounds=12)<br>
|
||
<code>!encode bcrypt verify '$2b$12$...' mypassword</code></li>
|
||
|
||
<li><b>argon2</b> – Argon2 hash/verify (PHC format)<br>
|
||
<code>!encode argon2 hash "time_cost=3,memory_cost=65536,parallelism=4" password</code><br>
|
||
<code>!encode argon2 verify 'phc$argon2id$...' password</code></li>
|
||
|
||
<li><b>pbkdf2</b> – PBKDF2 key derivation<br>
|
||
<code>!encode pbkdf2 aabbccdd 100000 32 sha256 mypassword</code></li>
|
||
</ul>
|
||
|
||
<h3>Compression</h3>
|
||
<ul>
|
||
<li><b>gzip</b> – gzip compress/decompress<br>
|
||
<code>!encode gzip compress some text</code><br>
|
||
<code>!encode gzip decompress H4sIAAAAAAAC/...</code></li>
|
||
<li><b>zlib</b> – zlib compress/decompress<br>
|
||
<code>!encode zlib compress data</code></li>
|
||
<li><b>bzip2</b> – bzip2 compress/decompress<br>
|
||
<code>!encode bzip2 compress data</code></li>
|
||
<li><b>lzma</b> – LZMA compress/decompress<br>
|
||
<code>!encode lzma compress data</code></li>
|
||
</ul>
|
||
|
||
<h3>Data Processing</h3>
|
||
<ul>
|
||
<li><b>json</b> – JSON format or validate<br>
|
||
<code>!encode json format '{"key":"value"}'</code><br>
|
||
<code>!encode json validate '...'</code></li>
|
||
<li><b>xml</b> – XML pretty-print<br>
|
||
<code>!encode xml format "<root><a>1</a></root>"</code></li>
|
||
<li><b>yaml</b> – YAML format or convert to JSON<br>
|
||
<code>!encode yaml format "key: value"</code><br>
|
||
<code>!encode yaml tojson "key: value"</code></li>
|
||
<li><b>csv</b> – CSV parse (first row as header)<br>
|
||
<code>!encode csv "name,age\nAlice,30"</code></li>
|
||
<li><b>asn1</b> – ASN.1 parse (DER base64)<br>
|
||
<code>!encode asn1 MIIB...</code></li>
|
||
<li><b>pemder</b> – PEM/DER conversion<br>
|
||
<code>!encode pemder topem <hex DER></code><br>
|
||
<code>!encode pemder toder "-----BEGIN ..."</code></li>
|
||
</ul>
|
||
|
||
<h3>Forensics</h3>
|
||
<ul>
|
||
<li><b>entropy</b> – Shannon entropy<br>
|
||
<code>!encode entropy some data</code></li>
|
||
<li><b>ioc</b> – Extract IOCs (IPs, domains, URLs, emails, hashes)<br>
|
||
<code>!encode ioc "Contact admin@example.com from 10.0.0.1"</code></li>
|
||
<li><b>strings</b> – Extract ASCII strings (min 4 chars)<br>
|
||
<code>!encode strings <hex data></code></li>
|
||
<li><b>filemagic</b> – Detect file type by magic bytes<br>
|
||
<code>!encode filemagic <hex data></code></li>
|
||
<li><b>base64blob</b> – Find Base64-encoded blobs<br>
|
||
<code>!encode base64blob "Some SGVsbG8gV29ybGQ= inside text"</code></li>
|
||
<li><b>xbrute</b> – XOR single-byte brute force<br>
|
||
<code>!encode xbrute <hex data></code></li>
|
||
<li><b>yara</b> – Scan with YARA rule (rule as base64)<br>
|
||
<code>!encode yara <base64 rule> <data></code></li>
|
||
<li><b>peinfo</b> – PE header analysis<br>
|
||
<code>!encode peinfo <hex of MZ file></code></li>
|
||
</ul>
|
||
|
||
<h3>Networking</h3>
|
||
<ul>
|
||
<li><b>cidr</b> – CIDR/subnet calculator<br>
|
||
<code>!encode cidr 192.168.1.0/24</code></li>
|
||
<li><b>ipconv</b> – IP conversion (hex, dec, binary)<br>
|
||
<code>!encode ipconv hex 192.168.1.1</code></li>
|
||
<li><b>urlparse</b> – Parse URL components<br>
|
||
<code>!encode urlparse https://user:pass@example.com:8080/path?q=1#frag</code></li>
|
||
<li><b>dns</b> – DNS lookup (A record)<br>
|
||
<code>!encode dns example.com</code></li>
|
||
</ul>
|
||
|
||
<h3>Recipes</h3>
|
||
<ul>
|
||
<li><b>recipe list</b> – List all available operations<br>
|
||
<code>!encode recipe list</code></li>
|
||
<li><b>recipe run</b> – Execute a JSON recipe on data<br>
|
||
<code>!encode recipe run '{"steps":[{"op":"base64","args":["encode"]},{"op":"hex","args":["encode"]}]}' "hello world"</code></li>
|
||
</ul>
|
||
|
||
<p>Type <code>!encode help <op></code> for detailed argument info on any operation.</p>
|
||
</details>
|
||
"""
|