From a1254c7a3991dabbdfc8b9407a27d618affe388c Mon Sep 17 00:00:00 2001 From: adator <85586985+adator85@users.noreply.github.com> Date: Sun, 9 Nov 2025 23:39:19 +0100 Subject: [PATCH] Refactoring the Code, Dispatch server responses to all modules including UID, avoid multi calls to get_user, get_nickname... --- core/irc.py | 158 +++++++++++++++++++++++++++------------------------- 1 file changed, 83 insertions(+), 75 deletions(-) diff --git a/core/irc.py b/core/irc.py index a2edc41..a4aab8c 100644 --- a/core/irc.py +++ b/core/irc.py @@ -6,7 +6,6 @@ import time from ssl import SSLSocket from datetime import datetime, timedelta from typing import TYPE_CHECKING, Any, Optional, Union -from xml.etree.ElementInclude import DEFAULT_MAX_INCLUSION_DEPTH from core.classes.modules import rehash from core.classes.interfaces.iprotocol import IProtocol from core.utils import tr @@ -126,6 +125,7 @@ class Irc: self.build_command(3, 'core', 'cert', 'Append your new fingerprint to your account!') self.build_command(4, 'core', 'rehash', 'Reload the configuration file without restarting') self.build_command(4, 'core', 'raw', 'Send a raw command directly to the IRC server') + self.build_command(4, 'core', 'print_users', 'Print users in a file.') # Define the IrcSocket object self.IrcSocket: Optional[Union[socket.socket, SSLSocket]] = None @@ -482,28 +482,26 @@ class Irc: """ try: original_response: list[str] = data.copy() - RED = self.Config.COLORS.red - GREEN = self.Config.COLORS.green - NOGC = self.Config.COLORS.nogc - if len(original_response) < 2: self.Logs.warning(f'Size ({str(len(original_response))}) - {original_response}') return None self.Logs.debug(f">> {self.Utils.hide_sensitive_data(original_response)}") - pos, parsed_protocol = self.Protocol.get_ircd_protocol_poisition(cmd=original_response, log=True) + modules = self.ModuleUtils.model_get_loaded_modules().copy() for parsed in self.Protocol.Handler.get_ircd_commands(): if parsed.command_name.upper() == parsed_protocol: parsed.func(original_response) - - if len(original_response) > 2: - if original_response[2] != 'UID': - # Envoyer la commande aux classes dynamiquement chargées - for module in self.ModuleUtils.model_get_loaded_modules().copy(): + for module in modules: module.class_instance.cmd(original_response) + # if len(original_response) > 2: + # if original_response[2] != 'UID': + # # Envoyer la commande aux classes dynamiquement chargées + # for module in self.ModuleUtils.model_get_loaded_modules().copy(): + # module.class_instance.cmd(original_response) + except IndexError as ie: self.Logs.error(f"IndexError: {ie}") except Exception as err: @@ -521,9 +519,16 @@ class Irc: Returns: None: Nothing to return """ + u = self.User.get_user(user) + """The User Object""" + if u is None: + return None - fromuser = self.User.get_nickname(user) # Nickname qui a lancé la commande - uid = self.User.get_uid(user) # Récuperer le uid de l'utilisateur + c = self.Client.get_Client(u.uid) + """The Client Object""" + + fromuser = u.nickname + uid = u.uid self.Settings.current_admin = self.Admin.get_admin(user) # set Current admin if any. RED = self.Config.COLORS.red @@ -571,7 +576,7 @@ class Irc: case 'deauth': current_command = str(cmd[0]).upper() - uid_to_deauth = self.User.get_uid(fromuser) + uid_to_deauth = uid self.delete_db_admin(uid_to_deauth) self.Protocol.send_priv_msg( @@ -584,11 +589,20 @@ class Irc: return None case 'firstauth': - # firstauth OWNER_NICKNAME OWNER_PASSWORD - current_nickname = self.User.get_nickname(fromuser) - current_uid = self.User.get_uid(fromuser) + # Syntax. /msg defender firstauth OWNER_NICKNAME OWNER_PASSWORD + # Check command + current_nickname = fromuser + current_uid = uid current_command = str(cmd[0]) + if current_nickname is None: + self.Logs.critical(f"This nickname [{fromuser}] don't exist") + return None + + if len(cmd) < 3: + self.Protocol.send_notice(dnickname,fromuser, tr("Syntax. /msg %s %s [OWNER_NICKNAME] [OWNER_PASSWORD]", self.Config.SERVICE_NICKNAME, current_command)) + return None + query = f"SELECT count(id) as c FROM {self.Config.TABLE_ADMIN}" result = self.Base.db_execute_query(query) result_db = result.fetchone() @@ -601,10 +615,6 @@ class Irc: ) return None - if current_nickname is None: - self.Logs.critical(f"This nickname [{fromuser}] don't exist") - return False - # Credentials sent from the user cmd_owner = str(cmd[1]) cmd_password = str(cmd[2]) @@ -613,59 +623,33 @@ class Irc: config_owner = self.Config.OWNER config_password = self.Config.PASSWORD - if current_nickname != cmd_owner: - self.Logs.critical(f"The current nickname [{fromuser}] is different than the nickname sent [{cmd_owner}] !") - self.Protocol.send_notice( - nick_from=dnickname, - nick_to=fromuser, - msg=f"The current nickname [{fromuser}] is different than the nickname sent [{cmd_owner}] !" - ) - return False - - if current_nickname != config_owner: - self.Logs.critical(f"The current nickname [{current_nickname}] is different than the configuration owner [{config_owner}] !") - self.Protocol.send_notice( - nick_from=dnickname, - nick_to=fromuser, - msg=f"The current nickname [{current_nickname}] is different than the configuration owner [{config_owner}] !" - ) - return False - if cmd_owner != config_owner: self.Logs.critical(f"The nickname sent [{cmd_owner}] is different than the configuration owner [{config_owner}] !") self.Protocol.send_notice( nick_from=dnickname, nick_to=fromuser, - msg=f"The nickname sent [{cmd_owner}] is different than the configuration owner [{config_owner}] !" + msg=tr("The nickname sent [%s] is different than the one set in the configuration file !", cmd_owner) ) - return False + return None if cmd_owner == config_owner and cmd_password == config_password: self.Base.db_create_first_admin() self.insert_db_admin(current_uid, cmd_owner, 5, self.Config.LANG) self.Protocol.send_priv_msg( - msg=f"[ {GREEN}{str(current_command).upper()} {NOGC}] - {self.User.get_nickname(fromuser)} est désormais connecté a {dnickname}", + msg=tr("[%s %s %s] - %s is now connected to %s", GREEN, current_command.upper(), NOGC, fromuser, dnickname), nick_from=dnickname, channel=dchanlog ) - self.Protocol.send_notice( - nick_from=dnickname, - nick_to=fromuser, - msg=tr("Successfuly connected to %s", dnickname) - ) + self.Protocol.send_notice(dnickname, fromuser, tr("Successfuly connected to %s", dnickname)) else: self.Protocol.send_priv_msg( - msg=f"[ {RED}{str(current_command).upper()} {NOGC}] - {self.User.get_nickname(fromuser)} a tapé un mauvais mot de pass", + msg=tr("[ %s %s %s ] - %s provided a wrong password!", RED, current_command.upper(), NOGC, current_nickname), nick_from=dnickname, channel=dchanlog ) - self.Protocol.send_notice( - nick_from=dnickname, - nick_to=fromuser, - msg=tr("Wrong password!") - ) + self.Protocol.send_notice(dnickname, fromuser, tr("Wrong password!")) case 'auth': # Syntax. !auth nickname password @@ -673,22 +657,21 @@ class Irc: self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} [nickname] [password]") return None - current_command = cmd[0] user_to_log = cmd[1] password = cmd[2] - current_client = self.User.get_user(fromuser) + current_client = u admin_obj = self.Admin.get_admin(fromuser) if current_client is None: # This case should never happen self.Protocol.send_priv_msg(nick_from=dnickname, - msg=f"[ {GREEN}{str(current_command).upper()}{NOGC} ] - Nickname {fromuser} is trying to connect to defender wrongly", + msg=f"[ {RED}{str(command).upper()} FAIL{NOGC} ] - Nickname {fromuser} is trying to connect to defender wrongly", channel=dchanlog) return None if admin_obj: self.Protocol.send_priv_msg(nick_from=dnickname, - msg=f"[ {GREEN}{str(current_command).upper()}{NOGC} ] - {fromuser} is already connected to {dnickname}", + msg=f"[ {GREEN}{str(command).upper()}{NOGC} ] - {fromuser} is already connected to {dnickname}", channel=dchanlog) self.Protocol.send_notice(dnickname, fromuser, tr("You are already connected to %s", dnickname)) return None @@ -704,15 +687,15 @@ class Irc: language = str(user_from_db[3]) self.insert_db_admin(current_client.uid, account, level, language) self.Protocol.send_priv_msg(nick_from=dnickname, - msg=f"[ {GREEN}{str(current_command).upper()}{NOGC} ] - {current_client.nickname} ({account}) est désormais connecté a {dnickname}", + msg=f"[ {GREEN}{str(command).upper()} SUCCESS{NOGC} ] - {current_client.nickname} ({account}) est désormais connecté a {dnickname}", channel=dchanlog) - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Connexion a {dnickname} réussie!") + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=tr("Successfuly connected to %s", dnickname)) return None else: self.Protocol.send_priv_msg(nick_from=dnickname, - msg=f"[ {RED}{str(current_command).upper()}{NOGC} ] - {current_client.nickname} a tapé un mauvais mot de pass", + msg=f"[ {RED}{str(command).upper()} FAIL{NOGC} ] - {current_client.nickname} a tapé un mauvais mot de pass", channel=dchanlog) - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Mot de passe incorrecte") + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=tr("Wrong password!")) return None case 'addaccess': @@ -738,7 +721,7 @@ class Irc: self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} addaccess [nickname] [level] [password]") case 'editaccess': - # .editaccess [USER] [PASSWORD] [LEVEL] + # .editaccess [USER] [NEW_PASSWORD] [LEVEL] try: if len(cmd) < 3: self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Right command : /msg {dnickname} editaccess [nickname] [NEWPASSWORD] [NEWLEVEL]") @@ -753,8 +736,8 @@ class Irc: self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"This user {fromuser} has no Admin access") return None - current_user = self.User.get_nickname(fromuser) - current_uid = self.User.get_uid(fromuser) + current_user = fromuser + current_uid = uid current_user_level = get_admin.level user_new_level = int(cmd[3]) if len(cmd) == 4 else get_admin.level @@ -818,8 +801,8 @@ class Irc: self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"This user {fromuser} has no admin access") return None - current_user = self.User.get_nickname(fromuser) - current_uid = self.User.get_uid(fromuser) + current_user = fromuser + current_uid = uid current_user_level = get_admin.level # Rechercher le user dans la base de données. @@ -906,7 +889,7 @@ class Irc: ) return None - user_obj = self.User.get_user(fromuser) + user_obj = u if user_obj is None: self.Logs.error(f"Nickname ({fromuser}) doesn't exist, it is impossible to register this nickname") @@ -961,8 +944,8 @@ class Irc: account = str(cmd[1]) # account encrypted_password = self.Loader.Utils.hash_password(cmd[2]) - user_obj = self.User.get_user(fromuser) - client_obj = self.Client.get_Client(user_obj.uid) + user_obj = u + client_obj = c if client_obj is not None: self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You are already logged in") @@ -1002,19 +985,18 @@ class Irc: self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} ") return None - user_obj = self.User.get_user(fromuser) + user_obj = u if user_obj is None: self.Logs.error(f"The User [{fromuser}] is not available in the database") return None - client_obj = self.Client.get_Client(user_obj.uid) + client_obj = c if client_obj is None: self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="Nothing to logout. please login first") return None self.Protocol.send_svslogout(client_obj) - # self.Protocol.send_svsmode(nickname=fromuser, user_mode='-r') self.Client.delete(user_obj.uid) self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You have been logged out successfully") @@ -1034,6 +1016,10 @@ class Irc: case 'load': try: # Load a module ex: .load mod_defender + if len(cmd) < 2: + self.Protocol.send_notice(dnickname, fromuser, tr("Syntax. /msg %s %s MODULE_NAME", dnickname, command.upper())) + return None + mod_name = str(cmd[1]) self.ModuleUtils.load_one_module(self, mod_name, fromuser) return None @@ -1046,6 +1032,9 @@ class Irc: # unload mod_defender try: # The module name. exemple: mod_defender + if len(cmd) < 2: + self.Protocol.send_notice(dnickname, fromuser, tr("Syntax. /msg %s %s MODULE_NAME", dnickname, command.upper())) + return None module_name = str(cmd[1]).lower() self.ModuleUtils.unload_one_module(self, module_name, False) return None @@ -1056,6 +1045,10 @@ class Irc: # reload mod_defender try: # ==> mod_defender + if len(cmd) < 2: + self.Protocol.send_notice(dnickname, fromuser, tr("Syntax. /msg %s %s MODULE_NAME", dnickname, command.upper())) + return None + module_name = str(cmd[1]).lower() self.ModuleUtils.reload_one_module(self, module_name, fromuser) return None @@ -1098,8 +1091,12 @@ class Irc: case 'restart': final_reason = ' '.join(cmd[1:]) self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"{dnickname.capitalize()} is going to restart!") - self.Config.DEFENDER_RESTART = 1 # Set restart status to 1 saying that the service will restart - self.Config.DEFENDER_INIT = 1 # set init to 1 saying that the service will be re initiated + + # Set restart status to 1 saying that the service will restart + self.Config.DEFENDER_RESTART = 1 + + # set init to 1 saying that the service will be re initiated + self.Config.DEFENDER_INIT = 1 case 'rehash': rehash.rehash_service(self, fromuser) @@ -1235,7 +1232,7 @@ class Irc: self.Protocol.send_notice( nick_from=dnickname, nick_to=fromuser, - msg=f"{uptime}" + msg=uptime ) return None @@ -1256,5 +1253,16 @@ class Irc: self.Protocol.send_raw(raw_command) return None + case 'print_users': + with open('users.txt', 'w') as fw: + i = 1 + for u in self.User.UID_DB: + w = fw.write(u.to_dict().__str__() + "\n") + self.Logs.debug(f" {i} - chars written {w}") + i += 1 + self.Protocol.send_priv_msg(dnickname, "Data written in users.txt file", dchanlog) + + return None + case _: pass