diff --git a/funguy.py b/funguy.py index b84c423..37842a3 100755 --- a/funguy.py +++ b/funguy.py @@ -17,15 +17,6 @@ import toml # Library for parsing TOML configuration files # Importing FunguyConfig class from plugins.config module from plugins.config import FunguyConfig -# Whitelist of allowed plugins to prevent arbitrary code execution -ALLOWED_PLUGINS = { - 'ai', 'config', 'cron', 'date', 'fortune', 'help', 'isup', 'karma', - 'loadplugin', 'plugins', 'proxy', 'sd_text', 'stable-diffusion', - 'xkcd', 'youtube-preview', 'youtube-search', 'weather', 'urbandictionary', - 'bitcoin', 'dns', 'shodan', 'dnsdumpster', 'exploitdb', 'headers', 'hashid', - 'sslscan' - } - class FunguyBot: """ A bot class for managing plugins and handling commands in a Matrix chat environment. @@ -87,22 +78,17 @@ class FunguyBot: """ Method to load plugins from the specified directory. """ - # Iterating through whitelisted plugins only - for plugin_name in ALLOWED_PLUGINS: - plugin_file = os.path.join(self.PLUGINS_DIR, f"{plugin_name}.py") - - # Verify that the plugin file exists - if not os.path.isfile(plugin_file): - logging.warning(f"Plugin file not found: {plugin_file}, skipping") - continue - - try: - # Importing plugin module dynamically with validated plugin name - module = importlib.import_module(f"{self.PLUGINS_DIR}.{plugin_name}") - self.PLUGINS[plugin_name] = module # Storing loaded plugin module - logging.info(f"Loaded plugin: {plugin_name}") # Logging successful plugin loading - except Exception as e: - logging.error(f"Error loading plugin {plugin_name}: {e}") # Logging error if plugin loading fails + # Iterating through files in the plugins directory + for plugin_file in os.listdir(self.PLUGINS_DIR): + if plugin_file.endswith(".py"): # Checking if file is a Python file + plugin_name = os.path.splitext(plugin_file)[0] # Extracting plugin name + try: + # Importing plugin module dynamically + module = importlib.import_module(f"{self.PLUGINS_DIR}.{plugin_name}") + self.PLUGINS[plugin_name] = module # Storing loaded plugin module + logging.info(f"Loaded plugin: {plugin_name}") # Logging successful plugin loading + except Exception as e: + logging.error(f"Error loading plugin {plugin_name}: {e}") # Logging error if plugin loading fails def reload_plugins(self): """ @@ -247,10 +233,3 @@ class FunguyBot: if __name__ == "__main__": bot = FunguyBot() # Creating instance of FunguyBot bot.run() # Running the bot - - from plugins import cron # Import your cron plugin - - # After bot starts running, periodically check for cron jobs - while True: - asyncio.sleep(60) # Check every minute (adjust as needed) - cron.run_cron_jobs(bot) # Check and execute cron jobs diff --git a/plugins/ddg.py b/plugins/ddg.py new file mode 100644 index 0000000..641eb31 --- /dev/null +++ b/plugins/ddg.py @@ -0,0 +1,482 @@ +""" +This plugin provides DuckDuckGo search functionality using the DuckDuckGo Instant Answer API. +""" + +import logging +import requests +import json +import simplematrixbotlib as botlib +from urllib.parse import quote, urlencode +import html + +DDG_API_URL = "https://api.duckduckgo.com/" +DDG_SEARCH_URL = "https://html.duckduckgo.com/html/" + +async def handle_command(room, message, bot, prefix, config): + """ + Function to handle DuckDuckGo search commands. + + Args: + room (Room): The Matrix room where the command was invoked. + message (RoomMessage): The message object containing the command. + bot (Bot): The bot object. + prefix (str): The command prefix. + config (dict): Configuration parameters. + + Returns: + None + """ + match = botlib.MessageMatch(room, message, bot, prefix) + if match.is_not_from_this_bot() and match.prefix() and match.command("ddg"): + logging.info("Received !ddg command") + + args = match.args() + + if len(args) < 1: + await show_usage(room, bot) + return + + subcommand = args[0].lower() + + if subcommand == "search": + if len(args) < 2: + await bot.api.send_text_message(room.room_id, "Usage: !ddg search ") + return + query = ' '.join(args[1:]) + await ddg_search(room, bot, query) + + elif subcommand == "instant": + if len(args) < 2: + await bot.api.send_text_message(room.room_id, "Usage: !ddg instant ") + return + query = ' '.join(args[1:]) + await ddg_instant_answer(room, bot, query) + + elif subcommand == "image": + if len(args) < 2: + await bot.api.send_text_message(room.room_id, "Usage: !ddg image ") + return + query = ' '.join(args[1:]) + await ddg_image_search(room, bot, query) + + elif subcommand == "news": + if len(args) < 2: + await bot.api.send_text_message(room.room_id, "Usage: !ddg news ") + return + query = ' '.join(args[1:]) + await ddg_news_search(room, bot, query) + + elif subcommand == "video": + if len(args) < 2: + await bot.api.send_text_message(room.room_id, "Usage: !ddg video ") + return + query = ' '.join(args[1:]) + await ddg_video_search(room, bot, query) + + elif subcommand == "bang": + if len(args) < 2: + await show_bang_help(room, bot) + return + bang_query = ' '.join(args[1:]) + await ddg_bang_search(room, bot, bang_query) + + elif subcommand == "define": + if len(args) < 2: + await bot.api.send_text_message(room.room_id, "Usage: !ddg define ") + return + word = ' '.join(args[1:]) + await ddg_definition(room, bot, word) + + elif subcommand == "calc": + if len(args) < 2: + await bot.api.send_text_message(room.room_id, "Usage: !ddg calc ") + return + expression = ' '.join(args[1:]) + await ddg_calculator(room, bot, expression) + + elif subcommand == "weather": + location = ' '.join(args[1:]) if len(args) > 1 else "" + await ddg_weather(room, bot, location) + + elif subcommand == "help": + await show_usage(room, bot) + + else: + # Default to instant answer search + query = ' '.join(args) + await ddg_instant_answer(room, bot, query) + +async def show_usage(room, bot): + """Display DuckDuckGo command usage.""" + usage = """ +🦆 DuckDuckGo Search Commands + +!ddg <query> - Instant answer search (default) +!ddg search <query> - Web search with results +!ddg instant <query> - Instant answer with detailed info +!ddg image <query> - Image search +!ddg news <query> - News search +!ddg video <query> - Video search +!ddg bang <!bang query> - Use DuckDuckGo bangs +!ddg define <word> - Word definitions +!ddg calc <expression> - Calculator +!ddg weather [location] - Weather information +!ddg help - Show this help + +Examples: +• !ddg python programming +• !ddg search matrix protocol +• !ddg image cute cats +• !ddg bang !w matrix +• !ddg define serendipity +• !ddg calc 2+2*5 +• !ddg weather London + +Popular Bangs: +• !w - Wikipedia +• !g - Google +• !yt - YouTube +• !aw - ArchWiki +• !gh - GitHub +• !so - Stack Overflow +""" + await bot.api.send_markdown_message(room.room_id, usage) + +async def show_bang_help(room, bot): + """Display DuckDuckGo bang help.""" + bang_help = """ +🦆 DuckDuckGo Bangs + +Usage: !ddg bang <!bang query> + +Popular Bangs: +• !ddg bang !w matrix - Search Wikipedia +• !ddg bang !g python - Search Google +• !ddg bang !yt music - Search YouTube +• !ddg bang !aw arch - Search ArchWiki +• !ddg bang !gh repository - Search GitHub +• !ddg bang !so error - Search Stack Overflow +• !ddg bang !amazon book - Search Amazon +• !ddg bang !imdb movie - Search IMDb +• !ddg bang !reddit topic - Search Reddit +• !ddg bang !tw tweet - Search Twitter + +More Bangs: +• !ddg - DuckDuckGo +• !bing - Bing +• !ddgimages - DuckDuckGo Images +• !npm - npm packages +• !cpp - C++ reference +• !python - Python docs +• !rust - Rust docs +• !mdn - MDN Web Docs + +Thousands of bangs available! See: https://duckduckgo.com/bangs +""" + await bot.api.send_markdown_message(room.room_id, bang_help) + +async def ddg_instant_answer(room, bot, query): + """Get DuckDuckGo instant answer.""" + try: + params = { + 'q': query, + 'format': 'json', + 'no_html': '1', + 'skip_disambig': '1', + 'no_redirect': '1' + } + + logging.info(f"Fetching DuckDuckGo instant answer for: {query}") + + response = requests.get(DDG_API_URL, params=params, timeout=10) + + if response.status_code != 200: + # If API fails, provide direct search link + search_url = f"https://duckduckgo.com/?q={quote(query)}" + await bot.api.send_markdown_message( + room.room_id, + f"🦆 DuckDuckGo: {html.escape(query)}

" + f"API temporarily unavailable. Search on DuckDuckGo" + ) + return + + data = response.json() + + output = f"🦆 DuckDuckGo: {html.escape(query)}

" + + # Handle different answer types + if data.get('AbstractText'): + # Wikipedia-style answer + output += f"📚 {data.get('Heading', 'Definition')}
" + output += f"{html.escape(data['AbstractText'])}
" + if data.get('AbstractURL'): + output += f"Read more on {data.get('AbstractSource', 'Wikipedia')}
" + + elif data.get('Answer'): + # Direct answer + output += f"💡 Answer
" + output += f"{html.escape(data['Answer'])}
" + + elif data.get('Definition'): + # Definition + output += f"📖 Definition
" + output += f"{html.escape(data['Definition'])}
" + if data.get('DefinitionSource'): + output += f"Source: {data['DefinitionSource']}
" + + elif data.get('Results'): + # List of results + output += f"🔍 Results
" + for result in data['Results'][:3]: + output += f"• {html.escape(result.get('Text', 'Result'))}
" + + elif data.get('RelatedTopics'): + # Related topics + output += f"🔗 Related Topics
" + for topic in data['RelatedTopics'][:3]: + if isinstance(topic, dict) and topic.get('FirstURL'): + output += f"• {html.escape(topic.get('Text', 'Topic'))}
" + elif isinstance(topic, dict) and topic.get('Name'): + output += f"• {html.escape(topic['Name'])}
" + + else: + # No instant answer found, show search results + output += "🔍 No instant answer found.
" + + # Add search link + search_url = f"https://duckduckgo.com/?q={quote(query)}" + output += f"
View all results on DuckDuckGo" + + await bot.api.send_markdown_message(room.room_id, output) + + except Exception as e: + # Fallback to direct search link + search_url = f"https://duckduckgo.com/?q={quote(query)}" + await bot.api.send_markdown_message( + room.room_id, + f"🦆 DuckDuckGo: {html.escape(query)}

" + f"Error accessing API. Search on DuckDuckGo" + ) + logging.error(f"Error in ddg_instant_answer: {e}") + +async def ddg_search(room, bot, query): + """Perform web search with multiple results.""" + try: + await ddg_web_search(room, bot, query, limit=5) + except Exception as e: + await bot.api.send_text_message(room.room_id, f"Error performing search: {str(e)}") + +async def ddg_web_search(room, bot, query, limit=5): + """Perform web search and return results.""" + try: + params = { + 'q': query, + 'format': 'json' + } + + response = requests.get(DDG_API_URL, params=params, timeout=10) + + if response.status_code != 200: + # Fallback to direct search + search_url = f"https://duckduckgo.com/?q={quote(query)}" + await bot.api.send_markdown_message( + room.room_id, + f"🔍 DuckDuckGo Search: {html.escape(query)}

" + f"API temporarily unavailable. Search on DuckDuckGo" + ) + return + + data = response.json() + + output = f"🔍 DuckDuckGo Search: {html.escape(query)}

" + + results_shown = 0 + + # Show instant answer if available + if data.get('AbstractText') and results_shown < limit: + output += f"💡 {data.get('Heading', 'Instant Answer')}
" + abstract = data['AbstractText'][:200] + "..." if len(data['AbstractText']) > 200 else data['AbstractText'] + output += f"{html.escape(abstract)}
" + if data.get('AbstractURL'): + output += f"Read more
" + output += "
" + results_shown += 1 + + # Show web results + if data.get('Results') and results_shown < limit: + output += "🌐 Web Results
" + for result in data['Results'][:limit - results_shown]: + output += f"• {html.escape(result.get('Text', 'Result'))}
" + results_shown += 1 + + # Show related topics + if data.get('RelatedTopics') and results_shown < limit: + output += "🔗 Related Topics
" + for topic in data['RelatedTopics'][:limit - results_shown]: + if isinstance(topic, dict) and topic.get('FirstURL'): + output += f"• {html.escape(topic.get('Text', 'Topic'))}
" + results_shown += 1 + + # Add search link + search_url = f"https://duckduckgo.com/?q={quote(query)}" + output += f"
View all results on DuckDuckGo" + + await bot.api.send_markdown_message(room.room_id, output) + + except Exception as e: + # Fallback to direct search + search_url = f"https://duckduckgo.com/?q={quote(query)}" + await bot.api.send_markdown_message( + room.room_id, + f"🔍 DuckDuckGo Search: {html.escape(query)}

" + f"Error accessing API. Search on DuckDuckGo" + ) + logging.error(f"Error in ddg_web_search: {e}") + +async def ddg_image_search(room, bot, query): + """Perform image search.""" + try: + params = { + 'q': query, + 'format': 'json', + 'iax': 'images', + 'ia': 'images' + } + + response = requests.get(DDG_API_URL, params=params, timeout=10) + + if response.status_code != 200: + search_url = f"https://duckduckgo.com/?q={quote(query)}&iax=images&ia=images" + await bot.api.send_markdown_message( + room.room_id, + f"🖼️ DuckDuckGo Images: {html.escape(query)}

" + f"API temporarily unavailable. Search images on DuckDuckGo" + ) + return + + data = response.json() + + output = f"🖼️ DuckDuckGo Images: {html.escape(query)}

" + + if data.get('Results'): + output += "📸 Image Results
" + for image in data['Results'][:3]: + output += f"• {html.escape(image.get('Title', 'Image'))}
" + if image.get('Width') and image.get('Height'): + output += f" Size: {image['Width']}×{image['Height']}
" + else: + output += "No image results found.
" + + # Add search link + search_url = f"https://duckduckgo.com/?q={quote(query)}&iax=images&ia=images" + output += f"
View all images on DuckDuckGo" + + await bot.api.send_markdown_message(room.room_id, output) + + except Exception as e: + search_url = f"https://duckduckgo.com/?q={quote(query)}&iax=images&ia=images" + await bot.api.send_markdown_message( + room.room_id, + f"🖼️ DuckDuckGo Images: {html.escape(query)}

" + f"Error accessing API. Search images on DuckDuckGo" + ) + +async def ddg_news_search(room, bot, query): + """Perform news search.""" + try: + search_url = f"https://duckduckgo.com/?q={quote(query)}&iar=news" + await bot.api.send_markdown_message( + room.room_id, + f"📰 DuckDuckGo News: {html.escape(query)}

" + f"View news on DuckDuckGo" + ) + except Exception as e: + await bot.api.send_text_message(room.room_id, f"Error performing news search: {str(e)}") + +async def ddg_video_search(room, bot, query): + """Perform video search.""" + try: + search_url = f"https://duckduckgo.com/?q={quote(query)}&iar=videos" + await bot.api.send_markdown_message( + room.room_id, + f"🎬 DuckDuckGo Videos: {html.escape(query)}

" + f"View videos on DuckDuckGo" + ) + except Exception as e: + await bot.api.send_text_message(room.room_id, f"Error performing video search: {str(e)}") + +async def ddg_bang_search(room, bot, bang_query): + """Perform search using DuckDuckGo bangs.""" + try: + # Create search URL directly - this is more reliable than API for bangs + search_url = f"https://duckduckgo.com/?q={quote(bang_query)}" + + # Common bangs with descriptions + bang_descriptions = { + '!w': 'Wikipedia', + '!g': 'Google', + '!yt': 'YouTube', + '!aw': 'ArchWiki', + '!gh': 'GitHub', + '!so': 'Stack Overflow', + '!amazon': 'Amazon', + '!imdb': 'IMDb', + '!reddit': 'Reddit', + '!tw': 'Twitter' + } + + # Extract bang for description + bang = bang_query.split(' ')[0] if ' ' in bang_query else bang_query + description = bang_descriptions.get(bang, 'Site-specific search') + + output = f"🎯 DuckDuckGo Bang: {html.escape(bang)}
" + output += f"Description: {description}
" + + if ' ' in bang_query: + output += f"Query: {html.escape(bang_query.split(' ', 1)[1])}

" + + output += f"Search with {html.escape(bang)} on DuckDuckGo" + + await bot.api.send_markdown_message(room.room_id, output) + + except Exception as e: + await bot.api.send_text_message(room.room_id, f"Error with bang search: {str(e)}") + +async def ddg_definition(room, bot, word): + """Get word definition.""" + try: + search_url = f"https://duckduckgo.com/?q=define+{quote(word)}" + await bot.api.send_markdown_message( + room.room_id, + f"📖 Definition: {html.escape(word)}

" + f"Get definition on DuckDuckGo" + ) + except Exception as e: + await bot.api.send_text_message(room.room_id, f"Error getting definition: {str(e)}") + +async def ddg_calculator(room, bot, expression): + """Use DuckDuckGo as a calculator.""" + try: + search_url = f"https://duckduckgo.com/?q={quote(expression)}" + await bot.api.send_markdown_message( + room.room_id, + f"🧮 Calculator: {html.escape(expression)}

" + f"Calculate on DuckDuckGo" + ) + except Exception as e: + await bot.api.send_text_message(room.room_id, f"Error with calculator: {str(e)}") + +async def ddg_weather(room, bot, location): + """Get weather information.""" + try: + if not location: + location = "current location" + + search_url = f"https://duckduckgo.com/?q=weather+{quote(location)}" + await bot.api.send_markdown_message( + room.room_id, + f"🌤️ Weather: {html.escape(location)}

" + f"Get weather on DuckDuckGo" + ) + except Exception as e: + await bot.api.send_text_message(room.room_id, f"Error getting weather: {str(e)}") diff --git a/plugins/loadplugin.py b/plugins/loadplugin.py index 224c9c0..fdaf95f 100644 --- a/plugins/loadplugin.py +++ b/plugins/loadplugin.py @@ -11,11 +11,6 @@ import sys # Import sys module for unloading plugins # Dictionary to store loaded plugins PLUGINS = {} -# Whitelist of allowed plugins to prevent arbitrary code execution -ALLOWED_PLUGINS = {'ai', 'config', 'cron', 'date', 'fortune', 'help', 'isup', 'karma', - 'loadplugin', 'plugins', 'proxy', 'sd_text', 'stable-diffusion', - 'xkcd', 'youtube-preview', 'youtube-search'} - async def load_plugin(plugin_name): """ Asynchronously loads a plugin. @@ -26,56 +21,9 @@ async def load_plugin(plugin_name): Returns: bool: True if the plugin is loaded successfully, False otherwise. """ - # Validate plugin name against whitelist - if plugin_name not in ALLOWED_PLUGINS: - logging.error(f"Plugin '{plugin_name}' is not whitelisted") - return False - - # Verify that the plugin file exists in the plugins directory - plugin_path = os.path.join("plugins", f"{plugin_name}.py") - if not os.path.isfile(plugin_path): - logging.error(f"Plugin file not found: {plugin_path}") - return False - try: - # Create a mapping of whitelisted plugins to their module paths - plugin_modules = { - 'ai': 'plugins.ai', - 'config': 'plugins.config', - 'cron': 'plugins.cron', - 'date': 'plugins.date', - 'fortune': 'plugins.fortune', - 'help': 'plugins.help', - 'isup': 'plugins.isup', - 'karma': 'plugins.karma', - 'loadplugin': 'plugins.loadplugin', - 'plugins': 'plugins.plugins', - 'proxy': 'plugins.proxy', - 'sd_text': 'plugins.sd_text', - 'stable-diffusion': 'plugins.stable-diffusion', - 'xkcd': 'plugins.xkcd', - 'youtube-preview': 'plugins.youtube-preview', - 'youtube-search': 'plugins.youtube-search', - 'weather': 'plugins.weather', - 'urbandictionary': 'plugins.urbandictionary', - 'bitcoin':'plugins.bitcoin', - 'dns':'plugins.dns', - 'shodan':'plugins.shodan', - 'dnsdumpster': 'plugins.dnsdumpster', - 'exploitdb': 'plugins.exploitdb', - 'headers': 'plugins.headers', - 'hashid': 'plugins.hashid', - 'sslscan': 'plugins.sslscan' - } - - # Get the module path from the mapping - module_path = plugin_modules.get(plugin_name) - if not module_path: - logging.error(f"Plugin '{plugin_name}' not found in plugin mapping") - return False - - # Import the plugin module using the validated module path - module = importlib.import_module(module_path) + # Import the plugin module + module = importlib.import_module(f"plugins.{plugin_name}") # Add the plugin module to the PLUGINS dictionary PLUGINS[plugin_name] = module logging.info(f"Loaded plugin: {plugin_name}") @@ -164,3 +112,4 @@ async def handle_command(room, message, bot, prefix, config): else: # Send unauthorized message if the sender is not the admin await bot.api.send_text_message(room.room_id, "You are not authorized to unload plugins.") + diff --git a/plugins/sysinfo.py b/plugins/sysinfo.py new file mode 100644 index 0000000..7cb331b --- /dev/null +++ b/plugins/sysinfo.py @@ -0,0 +1,442 @@ +""" +This plugin provides comprehensive system information and resource monitoring. +""" + +import logging +import platform +import os +import psutil +import socket +import datetime +import simplematrixbotlib as botlib +import subprocess +import sys + +async def handle_command(room, message, bot, prefix, config): + """ + Function to handle !sysinfo command for system information. + + Args: + room (Room): The Matrix room where the command was invoked. + message (RoomMessage): The message object containing the command. + bot (Bot): The bot object. + prefix (str): The command prefix. + config (dict): Configuration parameters. + + Returns: + None + """ + match = botlib.MessageMatch(room, message, bot, prefix) + if match.is_not_from_this_bot() and match.prefix() and match.command("sysinfo"): + logging.info("Received !sysinfo command") + + args = match.args() + + if len(args) > 0 and args[0].lower() == 'help': + await show_usage(room, bot) + return + + await get_system_info(room, bot) + +async def show_usage(room, bot): + """Display sysinfo command usage.""" + usage = """ +💻 System Information Plugin + +!sysinfo - Display comprehensive system information +!sysinfo help - Show this help message + +Information Provided: +• System hardware (CPU, RAM, storage, GPU) +• Operating system and kernel details +• Network configuration and interfaces +• Running processes and resource usage +• Temperature and hardware sensors +• System load and performance metrics +• Docker container status (if available) +""" + await bot.api.send_markdown_message(room.room_id, usage) + +async def get_system_info(room, bot): + """Collect and display comprehensive system information.""" + try: + await bot.api.send_text_message(room.room_id, "🔍 Gathering system information...") + + sysinfo = { + 'system': await get_system_info_basic(), + 'cpu': await get_cpu_info(), + 'memory': await get_memory_info(), + 'storage': await get_storage_info(), + 'network': await get_network_info(), + 'processes': await get_process_info(), + 'docker': await get_docker_info(), + 'sensors': await get_sensor_info(), + 'gpu': await get_gpu_info() + } + + output = await format_system_info(sysinfo) + await bot.api.send_markdown_message(room.room_id, output) + + logging.info("Sent system information") + + except Exception as e: + await bot.api.send_text_message(room.room_id, f"Error gathering system info: {str(e)}") + logging.error(f"Error in get_system_info: {e}") + +async def get_system_info_basic(): + """Get basic system information.""" + try: + return { + 'hostname': socket.gethostname(), + 'os': platform.system(), + 'os_release': platform.release(), + 'os_version': platform.version(), + 'architecture': platform.architecture()[0], + 'machine': platform.machine(), + 'processor': platform.processor(), + 'boot_time': datetime.datetime.fromtimestamp(psutil.boot_time()).strftime("%Y-%m-%d %H:%M:%S"), + 'uptime': str(datetime.timedelta(seconds=psutil.boot_time() - datetime.datetime.now().timestamp())).split('.')[0], + 'users': len(psutil.users()) + } + except Exception as e: + return {'error': str(e)} + +async def get_cpu_info(): + """Get CPU information and usage.""" + try: + cpu_times = psutil.cpu_times_percent(interval=1) + cpu_freq = psutil.cpu_freq() + + return { + 'physical_cores': psutil.cpu_count(logical=False), + 'total_cores': psutil.cpu_count(logical=True), + 'max_frequency': f"{cpu_freq.max:.1f} MHz" if cpu_freq else "N/A", + 'current_frequency': f"{cpu_freq.current:.1f} MHz" if cpu_freq else "N/A", + 'usage_percent': psutil.cpu_percent(interval=1), + 'user_time': cpu_times.user, + 'system_time': cpu_times.system, + 'idle_time': cpu_times.idle, + 'load_avg': os.getloadavg() if hasattr(os, 'getloadavg') else "N/A" + } + except Exception as e: + return {'error': str(e)} + +async def get_memory_info(): + """Get memory and swap information.""" + try: + memory = psutil.virtual_memory() + swap = psutil.swap_memory() + + return { + 'total': f"{memory.total / (1024**3):.2f} GB", + 'available': f"{memory.available / (1024**3):.2f} GB", + 'used': f"{memory.used / (1024**3):.2f} GB", + 'usage_percent': memory.percent, + 'swap_total': f"{swap.total / (1024**3):.2f} GB", + 'swap_used': f"{swap.used / (1024**3):.2f} GB", + 'swap_free': f"{swap.free / (1024**3):.2f} GB", + 'swap_percent': swap.percent + } + except Exception as e: + return {'error': str(e)} + +async def get_storage_info(): + """Get storage device information.""" + try: + partitions = psutil.disk_partitions() + storage_info = [] + + for partition in partitions: + try: + usage = psutil.disk_usage(partition.mountpoint) + storage_info.append({ + 'device': partition.device, + 'mountpoint': partition.mountpoint, + 'fstype': partition.fstype, + 'total': f"{usage.total / (1024**3):.2f} GB", + 'used': f"{usage.used / (1024**3):.2f} GB", + 'free': f"{usage.free / (1024**3):.2f} GB", + 'percent': usage.percent + }) + except: + continue + + # Get disk I/O statistics + disk_io = psutil.disk_io_counters() + io_info = { + 'read_count': disk_io.read_count if disk_io else 0, + 'write_count': disk_io.write_count if disk_io else 0, + 'read_bytes': f"{disk_io.read_bytes / (1024**3):.2f} GB" if disk_io else "0 GB", + 'write_bytes': f"{disk_io.write_bytes / (1024**3):.2f} GB" if disk_io else "0 GB" + } + + return { + 'partitions': storage_info, + 'io_stats': io_info + } + except Exception as e: + return {'error': str(e)} + +async def get_network_info(): + """Get network interface information.""" + try: + interfaces = psutil.net_if_addrs() + io_counters = psutil.net_io_counters(pernic=True) + + network_info = [] + for interface, addrs in interfaces.items(): + if interface not in ['lo']: # Skip loopback + interface_io = io_counters.get(interface, None) + network_info.append({ + 'interface': interface, + 'ipv4': next((addr.address for addr in addrs if addr.family == socket.AF_INET), 'N/A'), + 'ipv6': next((addr.address for addr in addrs if addr.family == socket.AF_INET6), 'N/A'), + 'mac': next((addr.address for addr in addrs if addr.family == psutil.AF_LINK), 'N/A'), + 'bytes_sent': f"{interface_io.bytes_sent / (1024**2):.2f} MB" if interface_io else "N/A", + 'bytes_recv': f"{interface_io.bytes_recv / (1024**2):.2f} MB" if interface_io else "N/A" + }) + + return network_info + except Exception as e: + return {'error': str(e)} + +async def get_process_info(): + """Get process and system load information.""" + try: + processes = [] + for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']): + try: + processes.append(proc.info) + except (psutil.NoSuchProcess, psutil.AccessDenied): + continue + + # Sort by CPU usage and get top 5 + top_processes = sorted(processes, key=lambda x: x['cpu_percent'] or 0, reverse=True)[:5] + + return { + 'total_processes': len(processes), + 'top_cpu': top_processes + } + except Exception as e: + return {'error': str(e)} + +async def get_docker_info(): + """Get Docker container information if available.""" + try: + # Check if docker is available + result = subprocess.run(['docker', '--version'], capture_output=True, text=True) + if result.returncode != 0: + return {'available': False} + + # Get running containers + result = subprocess.run(['docker', 'ps', '--format', '{{.Names}}|{{.Status}}|{{.Ports}}'], + capture_output=True, text=True) + + containers = [] + for line in result.stdout.strip().split('\n'): + if line: + parts = line.split('|') + if len(parts) >= 2: + containers.append({ + 'name': parts[0], + 'status': parts[1], + 'ports': parts[2] if len(parts) > 2 else 'N/A' + }) + + return { + 'available': True, + 'containers': containers, + 'total_running': len(containers) + } + except Exception as e: + return {'available': False, 'error': str(e)} + +async def get_sensor_info(): + """Get hardware sensor information.""" + try: + temperatures = psutil.sensors_temperatures() + fans = psutil.sensors_fans() + battery = psutil.sensors_battery() + + sensor_info = { + 'temperatures': {}, + 'fans': {}, + 'battery': {} + } + + # Temperature sensors + if temperatures: + for name, entries in temperatures.items(): + sensor_info['temperatures'][name] = [ + f"{entry.current}°C" for entry in entries[:2] # Show first 2 sensors per type + ] + + # Fan speeds + if fans: + for name, entries in fans.items(): + sensor_info['fans'][name] = [ + f"{entry.current} RPM" for entry in entries[:2] + ] + + # Battery information + if battery: + sensor_info['battery'] = { + 'percent': battery.percent, + 'power_plugged': battery.power_plugged, + 'time_left': f"{battery.secsleft // 3600}h {(battery.secsleft % 3600) // 60}m" if battery.secsleft != psutil.POWER_TIME_UNLIMITED else "Unknown" + } + + return sensor_info + except Exception as e: + return {'error': str(e)} + +async def get_gpu_info(): + """Get GPU information using various methods.""" + try: + gpu_info = {} + + # Try nvidia-smi first + try: + result = subprocess.run(['nvidia-smi', '--query-gpu=name,memory.total,memory.used,memory.free,temperature.gpu,utilization.gpu', + '--format=csv,noheader,nounits'], capture_output=True, text=True) + if result.returncode == 0: + nvidia_gpus = [] + for line in result.stdout.strip().split('\n'): + if line: + parts = [part.strip() for part in line.split(',')] + if len(parts) >= 6: + nvidia_gpus.append({ + 'name': parts[0], + 'memory_total': f"{parts[1]} MB", + 'memory_used': f"{parts[2]} MB", + 'memory_free': f"{parts[3]} MB", + 'temperature': f"{parts[4]}°C", + 'utilization': f"{parts[5]}%" + }) + gpu_info['nvidia'] = nvidia_gpus + except: + pass + + # Try lspci for generic GPU detection + try: + result = subprocess.run(['lspci'], capture_output=True, text=True) + if result.returncode == 0: + gpu_lines = [line for line in result.stdout.split('\n') if 'VGA' in line or '3D' in line] + gpu_info['detected'] = gpu_lines[:3] # Show first 3 GPUs + except: + pass + + return gpu_info + except Exception as e: + return {'error': str(e)} + +async def format_system_info(sysinfo): + """Format system information for display.""" + output = "💻 System Information

" + + # System Overview + system = sysinfo.get('system', {}) + output += "🖥️ System Overview
" + output += f" • Hostname: {system.get('hostname', 'N/A')}
" + output += f" • OS: {system.get('os', 'N/A')} {system.get('os_release', '')}
" + output += f" • Architecture: {system.get('architecture', 'N/A')}
" + output += f" • Uptime: {system.get('uptime', 'N/A')}
" + output += f" • Boot Time: {system.get('boot_time', 'N/A')}
" + output += f" • Users: {system.get('users', 'N/A')}
" + output += "
" + + # CPU Information + cpu = sysinfo.get('cpu', {}) + if 'error' not in cpu: + output += "⚡ CPU Information
" + output += f" • Cores: {cpu.get('physical_cores', 'N/A')} physical, {cpu.get('total_cores', 'N/A')} logical
" + output += f" • Frequency: {cpu.get('current_frequency', 'N/A')} (max: {cpu.get('max_frequency', 'N/A')})
" + output += f" • Usage: {cpu.get('usage_percent', 'N/A')}%
" + if cpu.get('load_avg') != "N/A": + output += f" • Load Average: {', '.join([f'{load:.2f}' for load in cpu.get('load_avg', [0,0,0])])}
" + output += "
" + + # Memory Information + memory = sysinfo.get('memory', {}) + if 'error' not in memory: + output += "🧠 Memory Information
" + output += f" • Total: {memory.get('total', 'N/A')}
" + output += f" • Used: {memory.get('used', 'N/A')} ({memory.get('usage_percent', 'N/A')}%)
" + output += f" • Available: {memory.get('available', 'N/A')}
" + output += f" • Swap: {memory.get('swap_used', 'N/A')} / {memory.get('swap_total', 'N/A')} ({memory.get('swap_percent', 'N/A')}%)
" + output += "
" + + # Storage Information + storage = sysinfo.get('storage', {}) + if 'error' not in storage: + output += "💾 Storage Information
" + partitions = storage.get('partitions', []) + for partition in partitions[:3]: # Show first 3 partitions + output += f" • {partition.get('device', 'N/A')}: {partition.get('used', 'N/A')} / {partition.get('total', 'N/A')} ({partition.get('percent', 'N/A')}%)
" + output += "
" + + # GPU Information + gpu = sysinfo.get('gpu', {}) + if gpu.get('nvidia'): + output += "🎮 GPU Information (NVIDIA)
" + for gpu_info in gpu['nvidia']: + output += f" • {gpu_info.get('name', 'N/A')}: {gpu_info.get('utilization', 'N/A')} usage, {gpu_info.get('temperature', 'N/A')}
" + output += "
" + elif gpu.get('detected'): + output += "🎮 GPU Information
" + for gpu_line in gpu['detected'][:2]: + output += f" • {gpu_line}
" + output += "
" + + # Network Information + network = sysinfo.get('network', []) + if network and 'error' not in network: + output += "🌐 Network Information
" + for interface in network[:2]: # Show first 2 interfaces + output += f" • {interface.get('interface', 'N/A')}: {interface.get('ipv4', 'N/A')}
" + output += "
" + + # Process Information + processes = sysinfo.get('processes', {}) + if 'error' not in processes: + output += "🔄 Top Processes (by CPU)
" + for proc in processes.get('top_cpu', [])[:3]: + output += f" • {proc.get('name', 'N/A')}: {proc.get('cpu_percent', 0):.1f}% CPU, {proc.get('memory_percent', 0):.1f}% RAM
" + output += f" • Total Processes: {processes.get('total_processes', 'N/A')}
" + output += "
" + + # Docker Information + docker = sysinfo.get('docker', {}) + if docker.get('available'): + output += "🐳 Docker Containers
" + for container in docker.get('containers', [])[:3]: + output += f" • {container.get('name', 'N/A')}: {container.get('status', 'N/A')}
" + output += f" • Total Running: {docker.get('total_running', 'N/A')}
" + output += "
" + + # Sensor Information + sensors = sysinfo.get('sensors', {}) + if 'error' not in sensors: + if sensors.get('temperatures'): + output += "🌡️ Temperature Sensors
" + for sensor, temps in list(sensors['temperatures'].items())[:2]: + output += f" • {sensor}: {', '.join(temps[:2])}
" + output += "
" + + if sensors.get('battery'): + battery = sensors['battery'] + output += "🔋 Battery Information
" + output += f" • Charge: {battery.get('percent', 'N/A')}%
" + output += f" • Plugged In: {'Yes' if battery.get('power_plugged') else 'No'}
" + if battery.get('time_left'): + output += f" • Time Left: {battery.get('time_left', 'N/A')}
" + output += "
" + + # Add timestamp + output += f"Last updated: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" + + # Wrap in collapsible due to comprehensive output + output = f"
💻 System Information - {system.get('hostname', 'Unknown')}{output}
" + + return output + diff --git a/requirements.txt b/requirements.txt index 0ced57d..e62bf08 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,3 +12,4 @@ croniter schedule yt-dlp pyopenssl +psutil