''' This is the main operational file to handle modules ''' import sys import importlib from pathlib import Path from types import ModuleType from typing import TYPE_CHECKING, Optional from core.definition import DefenderModuleHeader, MModule from core.utils import tr if TYPE_CHECKING: from core.loader import Loader from core.irc import Irc from core.classes.interfaces.imodule import IModule class Module: DB_MODULES: list[MModule] = [] DB_MODULE_HEADERS: list[DefenderModuleHeader] = [] def __init__(self, loader: 'Loader') -> None: self._ctx = loader def is_module_compliant(self, obj: object) -> bool: class_name = obj.__name__ is_compliant = True attributs = {'MOD_HEADER', 'mod_config'} methods = {'load', 'unload', 'create_tables', 'cmd', 'hcmds', 'ModConfModel'} obj_attributs: set = set([attribut for attribut in dir(obj) if not callable(getattr(obj, attribut)) and not attribut.startswith('__')]) obj_methods: set = set([method for method in dir(obj) if callable(getattr(obj, method)) and not method.startswith('__')]) if not attributs.issubset(obj_attributs): self._ctx.Logs.error(f'[{class_name}] Your module is not valid make sure you have implemented required attributes {attributs}') raise AttributeError(f'[{class_name}] Your module is not valid make sure you have implemented required attributes {attributs}') if not methods.issubset(obj_methods): self._ctx.Logs.error(f'[{class_name}] Your module is not valid make sure you have implemented required methods {methods}') raise AttributeError(f'[{class_name}] Your module is not valid make sure you have implemented required methods {methods}') return is_compliant def get_all_available_modules(self) -> list[str]: """Get list of all main modules using this pattern mod_*.py all files starting with mod_ Returns: list[str]: List of all module names. """ base_path = Path('mods') modules_available = [file.name.replace('.py', '') for file in base_path.rglob('mod_*.py')] self._ctx.Logs.debug(f"Modules available: {modules_available}") return modules_available def get_module_information(self, module_name: str) -> tuple[Optional[str], Optional[str], Optional[str]]: # module_name : mod_defender if not module_name.lower().startswith('mod_'): return None, None, None module_name = module_name.lower() # --> mod_defender module_folder = module_name.split('_')[1].lower() # --> defender class_name = module_name.split('_')[1].capitalize() # --> Defender self._ctx.Logs.debug(f"Module information Folder: {module_folder}, Name: {module_name}, Class: {class_name}") return module_folder, module_name, class_name def get_module_header(self, module_name: str) -> Optional[DefenderModuleHeader]: for mod_h in self.DB_MODULE_HEADERS: if module_name.lower() == mod_h.name.lower(): self._ctx.Logs.debug(f"Module Header found: {mod_h}") return mod_h return None def create_module_header(self, module_header: dict[str, str]) -> bool: """Create a new module header into DB_MODULE_HEADERS Args: module_header (dict[str, str]): The module header Returns: bool: True if the module header has been created. """ mod_header = DefenderModuleHeader(**module_header) if self.get_module_header(mod_header.name) is None: self._ctx.Logs.debug(f"[MOD_HEADER] The module header has been created! ({mod_header.name} v{mod_header.version})") self.DB_MODULE_HEADERS.append(mod_header) return True return False def delete_module_header(self, module_name: str) -> bool: mod_header = self.get_module_header(module_name) if mod_header is not None: self._ctx.Logs.debug(f"[MOD_HEADER] The module header has been deleted ({mod_header.name} v{mod_header.version})") self.DB_MODULE_HEADERS.remove(mod_header) return True self._ctx.Logs.debug(f"[MOD_HEADER ERROR] Impossible to remove the module header ({module_name})") return False async def load_one_module(self, module_name: str, nickname: str, is_default: bool = False) -> bool: module_folder, module_name, class_name = self.get_module_information(module_name) if module_folder is None or module_name is None or class_name is None: self._ctx.Logs.error(f"There is an error with the module name! {module_folder}, {module_name}, {class_name}") return False if self.is_module_exist_in_sys_module(module_name): self._ctx.Logs.debug(f"Module [{module_folder}.{module_name}] already loaded!") if self.model_is_module_exist(module_name): # Si le module existe dans la variable globale retourne False self._ctx.Logs.debug(f"Module [{module_folder}.{module_name}] exist in the local variable!") await self._ctx.Irc.Protocol.send_priv_msg( nick_from=self._ctx.Config.SERVICE_NICKNAME, msg=f"Le module {module_name} est déja chargé ! si vous souhaiter le recharge tapez {self._ctx.Config.SERVICE_PREFIX}reload {module_name}", channel=self._ctx.Config.SERVICE_CHANLOG ) return False reload_mod = await self.reload_one_module(module_name, nickname) return reload_mod # Charger le module try: loaded_module = importlib.import_module(f'mods.{module_folder}.{module_name}') my_class = getattr(loaded_module, class_name, None) # Récuperer le nom de classe self.is_module_compliant(my_class) create_instance_of_the_class: 'IModule' = my_class(self._ctx) # Créer une nouvelle instance de la classe await create_instance_of_the_class.load() if self._ctx.Utils.is_coroutinefunction(create_instance_of_the_class.load) else create_instance_of_the_class.load() self.create_module_header(create_instance_of_the_class.MOD_HEADER) except AttributeError as attr: red = self._ctx.Config.COLORS.red nogc = self._ctx.Config.COLORS.red await self._ctx.Irc.Protocol.send_priv_msg( nick_from=self._ctx.Config.SERVICE_NICKNAME, msg=tr("[%sMODULE ERROR%s] Module %s is facing issues! %s", red, nogc, module_name, attr), channel=self._ctx.Config.SERVICE_CHANLOG ) self.drop_module_from_sys_modules(module_name) await self.db_delete_module(module_name) self._ctx.Logs.error(msg=attr, exc_info=True) return False # Charger la nouvelle class dans la variable globale if self.model_insert_module(MModule(module_name, class_name, create_instance_of_the_class)): # Enregistrer le module dans la base de données await self.db_register_module(module_name, nickname, is_default) await self._ctx.Irc.Protocol.send_priv_msg( nick_from=self._ctx.Config.SERVICE_NICKNAME, msg=tr("Module %s loaded!", module_name), channel=self._ctx.Config.SERVICE_CHANLOG ) self._ctx.Logs.debug(f"Module {class_name} has been loaded") return True return False def load_all_modules(self) -> bool: ... async def reload_one_module(self, module_name: str, nickname: str) -> bool: """Reloading one module and insert it into the model as well as the database Args: uplink (Irc): The Irc service instance module_name (str): The module name nickname (str): The nickname Returns: bool: True if the module has been reloaded """ module_folder, module_name, class_name = self.get_module_information(module_name) red = self._ctx.Config.COLORS.red nogc = self._ctx.Config.COLORS.nogc try: if self.is_module_exist_in_sys_module(module_name): module_model = self.model_get_module(module_name) if module_model: self.delete_module_header(module_model.class_instance.MOD_HEADER['name']) await module_model.class_instance.unload() if self._ctx.Utils.is_coroutinefunction(module_model.class_instance.unload) else module_model.class_instance.unload() else: await self._ctx.Irc.Protocol.send_priv_msg( nick_from=self._ctx.Config.SERVICE_NICKNAME, msg=f"[ {red}RELOAD MODULE ERROR{nogc} ] Module [{module_folder}.{module_name}] hasn't been reloaded! You must use {self._ctx.Config.SERVICE_PREFIX}load {module_name}", channel=self._ctx.Config.SERVICE_CHANLOG ) self._ctx.Logs.debug(f"Module [{module_folder}.{module_name}] not found! Please use {self._ctx.Config.SERVICE_PREFIX}load {module_name}") return False # reload module dependencies self.reload_all_modules_with_all_dependencies(f'mods.{module_folder}') the_module = sys.modules[f'mods.{module_folder}.{module_name}'] importlib.reload(the_module) my_class = getattr(the_module, class_name, None) self.is_module_compliant(my_class) new_instance: 'IModule' = my_class(self._ctx) await new_instance.load() if self._ctx.Utils.is_coroutinefunction(new_instance.load) else new_instance.load() self.create_module_header(new_instance.MOD_HEADER) module_model.class_instance = new_instance # Créer le module dans la base de données await self.db_register_module(module_name, nickname) await self._ctx.Irc.Protocol.send_priv_msg( nick_from=self._ctx.Config.SERVICE_NICKNAME, msg=f"Module [{module_folder}.{module_name}] has been reloaded!", channel=self._ctx.Config.SERVICE_CHANLOG ) self._ctx.Logs.debug(f"Module [{module_folder}.{module_name}] reloaded!") return True else: # Module is not loaded! Nothing to reload self._ctx.Logs.debug(f"[RELOAD MODULE ERROR] [{module_folder}.{module_name}] is not loaded! You must use {self._ctx.Config.SERVICE_PREFIX}load {module_name}") await self._ctx.Irc.Protocol.send_priv_msg( nick_from=self._ctx.Config.SERVICE_NICKNAME, msg=f"[ {red}RELOAD MODULE ERROR{nogc} ] Module [{module_folder}.{module_name}] is not loaded! You must use {self._ctx.Config.SERVICE_PREFIX}load {module_name}", channel=self._ctx.Config.SERVICE_CHANLOG ) return False except (TypeError, AttributeError, KeyError, Exception) as err: self._ctx.Logs.error(f"[RELOAD MODULE ERROR]: {err}", exc_info=True) await self._ctx.Irc.Protocol.send_priv_msg( nick_from=self._ctx.Config.SERVICE_NICKNAME, msg=f"[RELOAD MODULE ERROR]: {err}", channel=self._ctx.Config.SERVICE_CHANLOG ) self.drop_module_from_sys_modules(module_name) await self.db_delete_module(module_name) return False def reload_all_modules(self) -> bool: ... def reload_all_modules_with_all_dependencies(self, prefix: str = 'mods') -> bool: """ Reload all modules in sys.modules that start with the given prefix. Useful for reloading a full package during development. """ modules_to_reload = [] # Collect target modules for name, module in sys.modules.items(): if ( isinstance(module, ModuleType) and module is not None and name.startswith(prefix) ): modules_to_reload.append((name, module)) # Sort to reload submodules before parent modules for name, module in sorted(modules_to_reload, key=lambda x: x[0], reverse=True): try: if 'mod_' not in name and 'schemas' not in name: importlib.reload(module) self._ctx.Logs.debug(f'[LOAD_MODULE] Module {module} success') except Exception as err: self._ctx.Logs.error(f'[LOAD_MODULE] Module {module} failed [!] - {err}') async def unload_one_module(self, module_name: str, keep_in_db: bool = True) -> bool: """Unload a module Args: uplink (Irc): The Irc instance module_name (str): Module name ex mod_defender keep_in_db (bool): Keep in database Returns: bool: True if success """ try: # Le nom du module. exemple: mod_defender red = self._ctx.Config.COLORS.red nogc = self._ctx.Config.COLORS.nogc module_folder, module_name, class_name = self.get_module_information(module_name) module = self.model_get_module(module_name) if module is None: self._ctx.Logs.debug(f"[ UNLOAD MODULE ERROR ] This module {module_name} is not loaded!") await self.db_delete_module(module_name) await self._ctx.Irc.Protocol.send_priv_msg( nick_from=self._ctx.Config.SERVICE_NICKNAME, msg=f"[ {red}UNLOAD MODULE ERROR{nogc} ] This module {module_name} is not loaded!", channel=self._ctx.Config.SERVICE_CHANLOG ) return False if module: self.delete_module_header(module.class_instance.MOD_HEADER['name']) await module.class_instance.unload() if self._ctx.Utils.is_coroutinefunction(module.class_instance.unload) else module.class_instance.unload() self.DB_MODULES.remove(module) # Delete from the sys.modules. if sys.modules.get(f'mods.{module_folder}.{module_name}'): del sys.modules[f"mods.{module_folder}.{module_name}"] if sys.modules.get(f'mods.{module_folder}.{module_name}'): self._ctx.Logs.debug(f"Module mods.{module_folder}.{module_name} still in the sys.modules") # Supprimer le module de la base de données if not keep_in_db: await self.db_delete_module(module_name) await self._ctx.Irc.Protocol.send_priv_msg( nick_from=self._ctx.Config.SERVICE_NICKNAME, msg=f"[ UNLOAD MODULE INFO ] Module {module_name} has been unloaded!", channel=self._ctx.Config.SERVICE_CHANLOG ) self._ctx.Logs.debug(f"[ UNLOAD MODULE ] {module_name} has been unloaded!") return True self._ctx.Logs.debug(f"[UNLOAD MODULE]: Module {module_name} not found in DB_MODULES variable!") return False except Exception as err: self._ctx.Logs.error(f"General Error: {err}", exc_info=True) return False def unload_all_modules(self) -> bool: ... def is_module_exist_in_sys_module(self, module_name: str) -> bool: """Check if the module exist in the sys.modules This will check only in the folder mods/ Args: module_name (str): The module name Returns: bool: True if the module exist """ module_folder, module_name, class_name = self.get_module_information(module_name) if "mods." + module_folder + "." + module_name in sys.modules: self._ctx.Logs.debug(f"[SYS MODULE] (mods.{module_folder}.{module_name}) found in sys.modules") return True self._ctx.Logs.debug(f"[SYS MODULE] (mods.{module_folder}.{module_name}) not found in sys.modules") return False def drop_module_from_sys_modules(self, module_name: str) -> bool: """_summary_ Args: module_name (str): _description_ Returns: bool: _description_ """ module_folder, module_name, class_name = self.get_module_information(module_name) full_module_name = "mods." + module_folder + "." + module_name del sys.modules[full_module_name] if not self.is_module_exist_in_sys_module(module_name): return True return False ''' ALL METHODS RELATED TO THE MModule MODEL DATACLASS ''' def model_get_module(self, module_name: str) -> Optional[MModule]: """Get The module model object if exist otherwise it returns None Args: module_name (str): The module name you want to fetch Returns: Optional[MModule]: The Module Model Object """ for module in self.DB_MODULES: if module.module_name.lower() == module_name.lower(): self._ctx.Logs.debug(f"[MODEL MODULE GET] The module {module_name} has been found in the model DB_MODULES") return module self._ctx.Logs.debug(f"[MODEL MODULE GET] The module {module_name} not found in the model DB_MODULES") return None def model_drop_module(self, module_name: str) -> bool: """Drop a module model object from DB_MODULES Args: module_name (str): The module name you want to drop Returns: bool: True if the model has been dropped """ module = self.model_get_module(module_name) if module: self.DB_MODULES.remove(module) return True return False def model_get_loaded_modules(self) -> list[MModule]: """Get the instance of DB_MODULES. Warning: You should use a copy if you want to loop through the list! Returns: list[MModule]: A list of module model object """ # self._ctx.Logs.debug(f"[MODEL MODULE LOADED MODULES] {len(self.DB_MODULES)} modules found!") return self.DB_MODULES def model_insert_module(self, module_model: MModule) -> bool: """Insert a new module model object Args: module_model (MModule): The module model object Returns: bool: True if the model has been inserted """ module = self.model_get_module(module_model.module_name) if module is None: self.DB_MODULES.append(module_model) self._ctx.Logs.debug(f"[MODEL MODULE INSERT] The module {module_model.module_name} has been inserted in the local variable model DB_MODULES") return True self._ctx.Logs.debug(f"[MODEL MODULE INSERT] The module {module_model.module_name} already exist in the local variable model DB_MODULES") return False def model_clear(self) -> None: """Clear DB_MODULES list! """ self.DB_MODULES.clear() self._ctx.Logs.debug("[MODEL MODULE CLEAR] The local variable model DB_MODULES has been cleared") return None def model_is_module_exist(self, module_name: str) -> bool: """Check if the module exist in the module model object Args: module_name (str): The module name Returns: bool: True if the module_name exist """ if self.model_get_module(module_name): self._ctx.Logs.debug(f"[MODEL MODULE EXIST] The module {module_name} exist in the local model DB_MODULES!") return True self._ctx.Logs.debug(f"[MODEL MODULE EXIST] The module {module_name} is not available in the local model DB_MODULES!") return False ''' OPERATION DEDICATED TO DATABASE MANAGEMENT ''' async def db_load_all_existing_modules(self) -> bool: """Charge les modules qui existe déja dans la base de données Returns: None: Aucun retour requis, elle charge puis c'est tout """ self._ctx.Logs.debug("[DB LOAD MODULE] Loading modules from the database!") result = await self._ctx.Base.db_execute_query(f"SELECT module_name FROM {self._ctx.Config.TABLE_MODULE}") for r in result.fetchall(): await self.load_one_module(r[0], 'sys', True) return True async def db_is_module_exist(self, module_name: str) -> bool: """Check if the module exist in the database Args: module_name (str): The module name you want to check Returns: bool: True if the module exist in the database """ query = f"SELECT id FROM {self._ctx.Config.TABLE_MODULE} WHERE module_name = :module_name" mes_donnes = {'module_name': module_name.lower()} results = await self._ctx.Base.db_execute_query(query, mes_donnes) if results.fetchall(): self._ctx.Logs.debug(f"[DB MODULE EXIST] The module {module_name} exist in the database!") return True else: self._ctx.Logs.debug(f"[DB MODULE EXIST] The module {module_name} is not available in the database!") return False async def db_register_module(self, module_name: str, nickname: str, is_default: bool = False) -> bool: """Insert a new module in the database Args: module_name (str): The module name nickname (str): The user who loaded the module isdefault (int): Is this a default module. Default 0 """ if not await self.db_is_module_exist(module_name): insert_cmd_query = f"INSERT INTO {self._ctx.Config.TABLE_MODULE} (datetime, user, module_name, isdefault) VALUES (:datetime, :user, :module_name, :isdefault)" mes_donnees = {'datetime': self._ctx.Utils.get_sdatetime(), 'user': nickname, 'module_name': module_name.lower(), 'isdefault': is_default} insert = await self._ctx.Base.db_execute_query(insert_cmd_query, mes_donnees) if insert.rowcount > 0: self._ctx.Logs.debug(f"[DB REGISTER MODULE] Module {module_name} has been inserted to the database!") return True else: self._ctx.Logs.debug(f"[DB REGISTER MODULE] Module {module_name} not inserted to the database!") return False self._ctx.Logs.debug(f"[DB REGISTER MODULE] Module {module_name} already exist in the database! Nothing to insert!") return False async def db_update_module(self, module_name: str, nickname: str) -> None: """Update the datetime and the user that updated the module Args: module_name (str): The module name to update nickname (str): The nickname who updated the module """ update_cmd_query = f"UPDATE {self._ctx.Config.TABLE_MODULE} SET datetime = :datetime, LOWER(user) = :user WHERE LOWER(module_name) = :module_name" mes_donnees = {'datetime': self._ctx.Utils.get_sdatetime(), 'user': nickname.lower(), 'module_name': module_name.lower()} result = await self._ctx.Base.db_execute_query(update_cmd_query, mes_donnees) if result.rowcount > 0: self._ctx.Logs.debug(f"[DB UPDATE MODULE] Module {module_name} has been updated!") return True else: self._ctx.Logs.debug(f"[DB UPDATE MODULE] Module {module_name} not found! Nothing to update!") return False async def db_delete_module(self, module_name:str) -> None: """Delete a module from the database Args: module_name (str): The module name you want to delete """ insert_cmd_query = f"DELETE FROM {self._ctx.Config.TABLE_MODULE} WHERE LOWER(module_name) = :module_name" mes_donnees = {'module_name': module_name.lower()} delete = await self._ctx.Base.db_execute_query(insert_cmd_query, mes_donnees) if delete.rowcount > 0: self._ctx.Logs.debug(f"[DB MODULE DELETE] The module {module_name} has been deleted from the dabatase!") return True self._ctx.Logs.debug(f"[DB MODULE DELETE] The module {module_name} is not available in the database! Nothing to delete!") return False