Refactoring the Code, Dispatch server responses to all modules including UID, avoid multi calls to get_user, get_nickname...

This commit is contained in:
adator
2025-11-09 23:39:19 +01:00
parent c05990f862
commit a1254c7a39

View File

@@ -6,7 +6,6 @@ import time
from ssl import SSLSocket from ssl import SSLSocket
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, Optional, Union from typing import TYPE_CHECKING, Any, Optional, Union
from xml.etree.ElementInclude import DEFAULT_MAX_INCLUSION_DEPTH
from core.classes.modules import rehash from core.classes.modules import rehash
from core.classes.interfaces.iprotocol import IProtocol from core.classes.interfaces.iprotocol import IProtocol
from core.utils import tr from core.utils import tr
@@ -126,6 +125,7 @@ class Irc:
self.build_command(3, 'core', 'cert', 'Append your new fingerprint to your account!') self.build_command(3, 'core', 'cert', 'Append your new fingerprint to your account!')
self.build_command(4, 'core', 'rehash', 'Reload the configuration file without restarting') self.build_command(4, 'core', 'rehash', 'Reload the configuration file without restarting')
self.build_command(4, 'core', 'raw', 'Send a raw command directly to the IRC server') self.build_command(4, 'core', 'raw', 'Send a raw command directly to the IRC server')
self.build_command(4, 'core', 'print_users', 'Print users in a file.')
# Define the IrcSocket object # Define the IrcSocket object
self.IrcSocket: Optional[Union[socket.socket, SSLSocket]] = None self.IrcSocket: Optional[Union[socket.socket, SSLSocket]] = None
@@ -482,28 +482,26 @@ class Irc:
""" """
try: try:
original_response: list[str] = data.copy() original_response: list[str] = data.copy()
RED = self.Config.COLORS.red
GREEN = self.Config.COLORS.green
NOGC = self.Config.COLORS.nogc
if len(original_response) < 2: if len(original_response) < 2:
self.Logs.warning(f'Size ({str(len(original_response))}) - {original_response}') self.Logs.warning(f'Size ({str(len(original_response))}) - {original_response}')
return None return None
self.Logs.debug(f">> {self.Utils.hide_sensitive_data(original_response)}") self.Logs.debug(f">> {self.Utils.hide_sensitive_data(original_response)}")
pos, parsed_protocol = self.Protocol.get_ircd_protocol_poisition(cmd=original_response, log=True) pos, parsed_protocol = self.Protocol.get_ircd_protocol_poisition(cmd=original_response, log=True)
modules = self.ModuleUtils.model_get_loaded_modules().copy()
for parsed in self.Protocol.Handler.get_ircd_commands(): for parsed in self.Protocol.Handler.get_ircd_commands():
if parsed.command_name.upper() == parsed_protocol: if parsed.command_name.upper() == parsed_protocol:
parsed.func(original_response) parsed.func(original_response)
for module in modules:
if len(original_response) > 2:
if original_response[2] != 'UID':
# Envoyer la commande aux classes dynamiquement chargées
for module in self.ModuleUtils.model_get_loaded_modules().copy():
module.class_instance.cmd(original_response) module.class_instance.cmd(original_response)
# if len(original_response) > 2:
# if original_response[2] != 'UID':
# # Envoyer la commande aux classes dynamiquement chargées
# for module in self.ModuleUtils.model_get_loaded_modules().copy():
# module.class_instance.cmd(original_response)
except IndexError as ie: except IndexError as ie:
self.Logs.error(f"IndexError: {ie}") self.Logs.error(f"IndexError: {ie}")
except Exception as err: except Exception as err:
@@ -521,9 +519,16 @@ class Irc:
Returns: Returns:
None: Nothing to return None: Nothing to return
""" """
u = self.User.get_user(user)
"""The User Object"""
if u is None:
return None
fromuser = self.User.get_nickname(user) # Nickname qui a lancé la commande c = self.Client.get_Client(u.uid)
uid = self.User.get_uid(user) # Récuperer le uid de l'utilisateur """The Client Object"""
fromuser = u.nickname
uid = u.uid
self.Settings.current_admin = self.Admin.get_admin(user) # set Current admin if any. self.Settings.current_admin = self.Admin.get_admin(user) # set Current admin if any.
RED = self.Config.COLORS.red RED = self.Config.COLORS.red
@@ -571,7 +576,7 @@ class Irc:
case 'deauth': case 'deauth':
current_command = str(cmd[0]).upper() current_command = str(cmd[0]).upper()
uid_to_deauth = self.User.get_uid(fromuser) uid_to_deauth = uid
self.delete_db_admin(uid_to_deauth) self.delete_db_admin(uid_to_deauth)
self.Protocol.send_priv_msg( self.Protocol.send_priv_msg(
@@ -584,11 +589,20 @@ class Irc:
return None return None
case 'firstauth': case 'firstauth':
# firstauth OWNER_NICKNAME OWNER_PASSWORD # Syntax. /msg defender firstauth OWNER_NICKNAME OWNER_PASSWORD
current_nickname = self.User.get_nickname(fromuser) # Check command
current_uid = self.User.get_uid(fromuser) current_nickname = fromuser
current_uid = uid
current_command = str(cmd[0]) current_command = str(cmd[0])
if current_nickname is None:
self.Logs.critical(f"This nickname [{fromuser}] don't exist")
return None
if len(cmd) < 3:
self.Protocol.send_notice(dnickname,fromuser, tr("Syntax. /msg %s %s [OWNER_NICKNAME] [OWNER_PASSWORD]", self.Config.SERVICE_NICKNAME, current_command))
return None
query = f"SELECT count(id) as c FROM {self.Config.TABLE_ADMIN}" query = f"SELECT count(id) as c FROM {self.Config.TABLE_ADMIN}"
result = self.Base.db_execute_query(query) result = self.Base.db_execute_query(query)
result_db = result.fetchone() result_db = result.fetchone()
@@ -601,10 +615,6 @@ class Irc:
) )
return None return None
if current_nickname is None:
self.Logs.critical(f"This nickname [{fromuser}] don't exist")
return False
# Credentials sent from the user # Credentials sent from the user
cmd_owner = str(cmd[1]) cmd_owner = str(cmd[1])
cmd_password = str(cmd[2]) cmd_password = str(cmd[2])
@@ -613,59 +623,33 @@ class Irc:
config_owner = self.Config.OWNER config_owner = self.Config.OWNER
config_password = self.Config.PASSWORD config_password = self.Config.PASSWORD
if current_nickname != cmd_owner:
self.Logs.critical(f"The current nickname [{fromuser}] is different than the nickname sent [{cmd_owner}] !")
self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg=f"The current nickname [{fromuser}] is different than the nickname sent [{cmd_owner}] !"
)
return False
if current_nickname != config_owner:
self.Logs.critical(f"The current nickname [{current_nickname}] is different than the configuration owner [{config_owner}] !")
self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg=f"The current nickname [{current_nickname}] is different than the configuration owner [{config_owner}] !"
)
return False
if cmd_owner != config_owner: if cmd_owner != config_owner:
self.Logs.critical(f"The nickname sent [{cmd_owner}] is different than the configuration owner [{config_owner}] !") self.Logs.critical(f"The nickname sent [{cmd_owner}] is different than the configuration owner [{config_owner}] !")
self.Protocol.send_notice( self.Protocol.send_notice(
nick_from=dnickname, nick_from=dnickname,
nick_to=fromuser, nick_to=fromuser,
msg=f"The nickname sent [{cmd_owner}] is different than the configuration owner [{config_owner}] !" msg=tr("The nickname sent [%s] is different than the one set in the configuration file !", cmd_owner)
) )
return False return None
if cmd_owner == config_owner and cmd_password == config_password: if cmd_owner == config_owner and cmd_password == config_password:
self.Base.db_create_first_admin() self.Base.db_create_first_admin()
self.insert_db_admin(current_uid, cmd_owner, 5, self.Config.LANG) self.insert_db_admin(current_uid, cmd_owner, 5, self.Config.LANG)
self.Protocol.send_priv_msg( self.Protocol.send_priv_msg(
msg=f"[ {GREEN}{str(current_command).upper()} {NOGC}] - {self.User.get_nickname(fromuser)} est désormais connecté a {dnickname}", msg=tr("[%s %s %s] - %s is now connected to %s", GREEN, current_command.upper(), NOGC, fromuser, dnickname),
nick_from=dnickname, nick_from=dnickname,
channel=dchanlog channel=dchanlog
) )
self.Protocol.send_notice( self.Protocol.send_notice(dnickname, fromuser, tr("Successfuly connected to %s", dnickname))
nick_from=dnickname,
nick_to=fromuser,
msg=tr("Successfuly connected to %s", dnickname)
)
else: else:
self.Protocol.send_priv_msg( self.Protocol.send_priv_msg(
msg=f"[ {RED}{str(current_command).upper()} {NOGC}] - {self.User.get_nickname(fromuser)} a tapé un mauvais mot de pass", msg=tr("[ %s %s %s ] - %s provided a wrong password!", RED, current_command.upper(), NOGC, current_nickname),
nick_from=dnickname, nick_from=dnickname,
channel=dchanlog channel=dchanlog
) )
self.Protocol.send_notice( self.Protocol.send_notice(dnickname, fromuser, tr("Wrong password!"))
nick_from=dnickname,
nick_to=fromuser,
msg=tr("Wrong password!")
)
case 'auth': case 'auth':
# Syntax. !auth nickname password # Syntax. !auth nickname password
@@ -673,22 +657,21 @@ class Irc:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} [nickname] [password]") self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} [nickname] [password]")
return None return None
current_command = cmd[0]
user_to_log = cmd[1] user_to_log = cmd[1]
password = cmd[2] password = cmd[2]
current_client = self.User.get_user(fromuser) current_client = u
admin_obj = self.Admin.get_admin(fromuser) admin_obj = self.Admin.get_admin(fromuser)
if current_client is None: if current_client is None:
# This case should never happen # This case should never happen
self.Protocol.send_priv_msg(nick_from=dnickname, self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {GREEN}{str(current_command).upper()}{NOGC} ] - Nickname {fromuser} is trying to connect to defender wrongly", msg=f"[ {RED}{str(command).upper()} FAIL{NOGC} ] - Nickname {fromuser} is trying to connect to defender wrongly",
channel=dchanlog) channel=dchanlog)
return None return None
if admin_obj: if admin_obj:
self.Protocol.send_priv_msg(nick_from=dnickname, self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {GREEN}{str(current_command).upper()}{NOGC} ] - {fromuser} is already connected to {dnickname}", msg=f"[ {GREEN}{str(command).upper()}{NOGC} ] - {fromuser} is already connected to {dnickname}",
channel=dchanlog) channel=dchanlog)
self.Protocol.send_notice(dnickname, fromuser, tr("You are already connected to %s", dnickname)) self.Protocol.send_notice(dnickname, fromuser, tr("You are already connected to %s", dnickname))
return None return None
@@ -704,15 +687,15 @@ class Irc:
language = str(user_from_db[3]) language = str(user_from_db[3])
self.insert_db_admin(current_client.uid, account, level, language) self.insert_db_admin(current_client.uid, account, level, language)
self.Protocol.send_priv_msg(nick_from=dnickname, self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {GREEN}{str(current_command).upper()}{NOGC} ] - {current_client.nickname} ({account}) est désormais connecté a {dnickname}", msg=f"[ {GREEN}{str(command).upper()} SUCCESS{NOGC} ] - {current_client.nickname} ({account}) est désormais connecté a {dnickname}",
channel=dchanlog) channel=dchanlog)
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Connexion a {dnickname} réussie!") self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=tr("Successfuly connected to %s", dnickname))
return None return None
else: else:
self.Protocol.send_priv_msg(nick_from=dnickname, self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {RED}{str(current_command).upper()}{NOGC} ] - {current_client.nickname} a tapé un mauvais mot de pass", msg=f"[ {RED}{str(command).upper()} FAIL{NOGC} ] - {current_client.nickname} a tapé un mauvais mot de pass",
channel=dchanlog) channel=dchanlog)
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Mot de passe incorrecte") self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=tr("Wrong password!"))
return None return None
case 'addaccess': case 'addaccess':
@@ -738,7 +721,7 @@ class Irc:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} addaccess [nickname] [level] [password]") self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} addaccess [nickname] [level] [password]")
case 'editaccess': case 'editaccess':
# .editaccess [USER] [PASSWORD] [LEVEL] # .editaccess [USER] [NEW_PASSWORD] [LEVEL]
try: try:
if len(cmd) < 3: if len(cmd) < 3:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Right command : /msg {dnickname} editaccess [nickname] [NEWPASSWORD] [NEWLEVEL]") self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Right command : /msg {dnickname} editaccess [nickname] [NEWPASSWORD] [NEWLEVEL]")
@@ -753,8 +736,8 @@ class Irc:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"This user {fromuser} has no Admin access") self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"This user {fromuser} has no Admin access")
return None return None
current_user = self.User.get_nickname(fromuser) current_user = fromuser
current_uid = self.User.get_uid(fromuser) current_uid = uid
current_user_level = get_admin.level current_user_level = get_admin.level
user_new_level = int(cmd[3]) if len(cmd) == 4 else get_admin.level user_new_level = int(cmd[3]) if len(cmd) == 4 else get_admin.level
@@ -818,8 +801,8 @@ class Irc:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"This user {fromuser} has no admin access") self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"This user {fromuser} has no admin access")
return None return None
current_user = self.User.get_nickname(fromuser) current_user = fromuser
current_uid = self.User.get_uid(fromuser) current_uid = uid
current_user_level = get_admin.level current_user_level = get_admin.level
# Rechercher le user dans la base de données. # Rechercher le user dans la base de données.
@@ -906,7 +889,7 @@ class Irc:
) )
return None return None
user_obj = self.User.get_user(fromuser) user_obj = u
if user_obj is None: if user_obj is None:
self.Logs.error(f"Nickname ({fromuser}) doesn't exist, it is impossible to register this nickname") self.Logs.error(f"Nickname ({fromuser}) doesn't exist, it is impossible to register this nickname")
@@ -961,8 +944,8 @@ class Irc:
account = str(cmd[1]) # account account = str(cmd[1]) # account
encrypted_password = self.Loader.Utils.hash_password(cmd[2]) encrypted_password = self.Loader.Utils.hash_password(cmd[2])
user_obj = self.User.get_user(fromuser) user_obj = u
client_obj = self.Client.get_Client(user_obj.uid) client_obj = c
if client_obj is not None: if client_obj is not None:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You are already logged in") self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You are already logged in")
@@ -1002,19 +985,18 @@ class Irc:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} <account>") self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} <account>")
return None return None
user_obj = self.User.get_user(fromuser) user_obj = u
if user_obj is None: if user_obj is None:
self.Logs.error(f"The User [{fromuser}] is not available in the database") self.Logs.error(f"The User [{fromuser}] is not available in the database")
return None return None
client_obj = self.Client.get_Client(user_obj.uid) client_obj = c
if client_obj is None: if client_obj is None:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="Nothing to logout. please login first") self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="Nothing to logout. please login first")
return None return None
self.Protocol.send_svslogout(client_obj) self.Protocol.send_svslogout(client_obj)
# self.Protocol.send_svsmode(nickname=fromuser, user_mode='-r')
self.Client.delete(user_obj.uid) self.Client.delete(user_obj.uid)
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You have been logged out successfully") self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You have been logged out successfully")
@@ -1034,6 +1016,10 @@ class Irc:
case 'load': case 'load':
try: try:
# Load a module ex: .load mod_defender # Load a module ex: .load mod_defender
if len(cmd) < 2:
self.Protocol.send_notice(dnickname, fromuser, tr("Syntax. /msg %s %s MODULE_NAME", dnickname, command.upper()))
return None
mod_name = str(cmd[1]) mod_name = str(cmd[1])
self.ModuleUtils.load_one_module(self, mod_name, fromuser) self.ModuleUtils.load_one_module(self, mod_name, fromuser)
return None return None
@@ -1046,6 +1032,9 @@ class Irc:
# unload mod_defender # unload mod_defender
try: try:
# The module name. exemple: mod_defender # The module name. exemple: mod_defender
if len(cmd) < 2:
self.Protocol.send_notice(dnickname, fromuser, tr("Syntax. /msg %s %s MODULE_NAME", dnickname, command.upper()))
return None
module_name = str(cmd[1]).lower() module_name = str(cmd[1]).lower()
self.ModuleUtils.unload_one_module(self, module_name, False) self.ModuleUtils.unload_one_module(self, module_name, False)
return None return None
@@ -1056,6 +1045,10 @@ class Irc:
# reload mod_defender # reload mod_defender
try: try:
# ==> mod_defender # ==> mod_defender
if len(cmd) < 2:
self.Protocol.send_notice(dnickname, fromuser, tr("Syntax. /msg %s %s MODULE_NAME", dnickname, command.upper()))
return None
module_name = str(cmd[1]).lower() module_name = str(cmd[1]).lower()
self.ModuleUtils.reload_one_module(self, module_name, fromuser) self.ModuleUtils.reload_one_module(self, module_name, fromuser)
return None return None
@@ -1098,8 +1091,12 @@ class Irc:
case 'restart': case 'restart':
final_reason = ' '.join(cmd[1:]) final_reason = ' '.join(cmd[1:])
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"{dnickname.capitalize()} is going to restart!") self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"{dnickname.capitalize()} is going to restart!")
self.Config.DEFENDER_RESTART = 1 # Set restart status to 1 saying that the service will restart
self.Config.DEFENDER_INIT = 1 # set init to 1 saying that the service will be re initiated # Set restart status to 1 saying that the service will restart
self.Config.DEFENDER_RESTART = 1
# set init to 1 saying that the service will be re initiated
self.Config.DEFENDER_INIT = 1
case 'rehash': case 'rehash':
rehash.rehash_service(self, fromuser) rehash.rehash_service(self, fromuser)
@@ -1235,7 +1232,7 @@ class Irc:
self.Protocol.send_notice( self.Protocol.send_notice(
nick_from=dnickname, nick_from=dnickname,
nick_to=fromuser, nick_to=fromuser,
msg=f"{uptime}" msg=uptime
) )
return None return None
@@ -1256,5 +1253,16 @@ class Irc:
self.Protocol.send_raw(raw_command) self.Protocol.send_raw(raw_command)
return None return None
case 'print_users':
with open('users.txt', 'w') as fw:
i = 1
for u in self.User.UID_DB:
w = fw.write(u.to_dict().__str__() + "\n")
self.Logs.debug(f" {i} - chars written {w}")
i += 1
self.Protocol.send_priv_msg(dnickname, "Data written in users.txt file", dchanlog)
return None
case _: case _:
pass pass