From 25262d40495f8b1b624795fca9c6a631813df4d5 Mon Sep 17 00:00:00 2001 From: adator <85586985+adator85@users.noreply.github.com> Date: Sun, 17 Aug 2025 22:41:51 +0200 Subject: [PATCH] Moving some methods to utils.py, creating new logs class --- core/base.py | 111 +---------- core/classes/admin.py | 9 +- core/classes/channel.py | 10 +- core/classes/client.py | 8 +- core/classes/commands.py | 6 +- core/classes/protocols/inspircd.py | 111 +++++------ core/classes/protocols/unreal6.py | 189 ++++++++++--------- core/classes/reputation.py | 10 +- core/classes/user.py | 8 +- core/irc.py | 80 ++------ core/loader.py | 30 +-- core/logs.py | 108 +++++++++++ core/utils.py | 25 ++- mods/clone/mod_clone.py | 2 +- mods/clone/utils.py | 4 +- mods/command/mod_command.py | 32 ++-- mods/defender/mod_defender.py | 20 +- mods/defender/utils.py | 10 +- mods/jsonrpc/mod_jsonrpc.py | 2 +- mods/test/mod_test.py | 2 +- mods/votekick/mod_votekick.py | 294 ++++++++--------------------- mods/votekick/threads.py | 40 ++++ mods/votekick/utils.py | 74 ++++++++ mods/votekick/votekick_manager.py | 69 ++++++- 24 files changed, 653 insertions(+), 601 deletions(-) create mode 100644 core/logs.py create mode 100644 mods/votekick/threads.py create mode 100644 mods/votekick/utils.py diff --git a/core/base.py b/core/base.py index fe4d47d..e795b1c 100644 --- a/core/base.py +++ b/core/base.py @@ -32,8 +32,9 @@ class Base: self.Config = loader.Config self.Settings = loader.Settings self.Utils = loader.Utils + self.logs = loader.Logs - self.init_log_system() # Demarrer le systeme de log + # self.init_log_system() # Demarrer le systeme de log self.check_for_new_version(True) # Verifier si une nouvelle version est disponible # Liste des timers en cours @@ -140,18 +141,6 @@ class Base: except Exception as err: self.logs.error(f'General Error: {err}') - def get_unixtime(self) -> int: - """ - Cette fonction retourne un UNIXTIME de type 12365456 - Return: Current time in seconds since the Epoch (int) - """ - cet_offset = timezone(timedelta(hours=2)) - now_cet = datetime.now(cet_offset) - unixtime_cet = int(now_cet.timestamp()) - unixtime = int( time.time() ) - - return unixtime - def get_all_modules(self) -> list[str]: """Get list of all main modules using this pattern mod_*.py @@ -203,73 +192,6 @@ class Base: return None - def init_log_system(self) -> None: - # Create folder if not available - logs_directory = f'logs{self.Config.OS_SEP}' - if not os.path.exists(f'{logs_directory}'): - os.makedirs(logs_directory) - - # Init logs object - self.logs = logging.getLogger(self.Config.LOGGING_NAME) - self.logs.setLevel(self.Config.DEBUG_LEVEL) - - # Add Handlers - file_hanlder = logging.FileHandler(f'logs{self.Config.OS_SEP}defender.log',encoding='UTF-8') - file_hanlder.setLevel(self.Config.DEBUG_LEVEL) - - stdout_handler = logging.StreamHandler() - stdout_handler.setLevel(50) - - # Define log format - formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(filename)s - %(lineno)d - %(funcName)s - %(message)s') - - # Apply log format - file_hanlder.setFormatter(formatter) - stdout_handler.setFormatter(formatter) - - # Add handler to logs - self.logs.addHandler(file_hanlder) - self.logs.addHandler(stdout_handler) - - # Apply the filter - self.logs.addFilter(self.replace_filter) - - # self.logs.Logger('defender').addFilter(self.replace_filter) - self.logs.info('#################### STARTING DEFENDER ####################') - - return None - - def replace_filter(self, record: logging.LogRecord) -> bool: - - response = True - filters: list[str] = ['PING', - f':{self.Config.SERVICE_PREFIX}auth'] - - # record.msg = record.getMessage().replace("PING", "[REDACTED]") - if self.Settings.CONSOLE: - print(record.getMessage()) - - for f in filters: - if f in record.getMessage(): - response = False - - return response # Retourne True pour permettre l'affichage du message - - def delete_logger(self, logger_name: str) -> None: - - # Récupérer le logger - logger = logging.getLogger(logger_name) - - # Retirer tous les gestionnaires du logger et les fermer - for handler in logger.handlers[:]: # Utiliser une copie de la liste - logger.removeHandler(handler) - handler.close() - - # Supprimer le logger du dictionnaire global - logging.Logger.manager.loggerDict.pop(logger_name, None) - - return None - def log_cmd(self, user_cmd: str, cmd: str) -> None: """Enregistre les commandes envoyées par les utilisateurs @@ -830,15 +752,6 @@ class Base: self.logs.critical(f'General Error: {err}') return None - def get_random(self, lenght:int) -> str: - """ - Retourn une chaîne aléatoire en fonction de la longueur spécifiée. - """ - caracteres = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' - randomize = ''.join(random.choice(caracteres) for _ in range(lenght)) - - return randomize - def execute_periodic_action(self) -> None: if not self.periodic_func: @@ -878,23 +791,3 @@ class Base: self.logs.debug(f'Method to execute : {str(self.periodic_func)}') return None - - def clean_uid(self, uid:str) -> Optional[str]: - """Clean UID by removing @ / % / + / ~ / * / : - - Args: - uid (str): The UID to clean - - Returns: - str: Clean UID without any sign - """ - try: - if uid is None: - return None - - pattern = fr'[:|@|%|\+|~|\*]*' - parsed_UID = re.sub(pattern, '', uid) - - return parsed_UID - except TypeError as te: - self.logs.error(f'Type Error: {te}') diff --git a/core/classes/admin.py b/core/classes/admin.py index 95dc76b..e7ce183 100644 --- a/core/classes/admin.py +++ b/core/classes/admin.py @@ -1,13 +1,16 @@ -from typing import Optional +from typing import TYPE_CHECKING, Optional from core.base import Base from core.definition import MAdmin +if TYPE_CHECKING: + from core.loader import Loader + class Admin: UID_ADMIN_DB: list[MAdmin] = [] - def __init__(self, base: Base) -> None: - self.Logs = base.logs + def __init__(self, loader: 'Loader') -> None: + self.Logs = loader.Logs def insert(self, new_admin: MAdmin) -> bool: """Insert a new admin object model diff --git a/core/classes/channel.py b/core/classes/channel.py index f628394..ff74229 100644 --- a/core/classes/channel.py +++ b/core/classes/channel.py @@ -13,7 +13,7 @@ class Channel: def __init__(self, loader: 'Loader') -> None: - self.Logs = loader.Base.logs + self.Logs = loader.Logs self.Base = loader.Base self.Utils = loader.Utils @@ -102,7 +102,7 @@ class Channel: return result for userid in chan_obj.uids: - if self.Base.clean_uid(userid) == self.Base.clean_uid(uid): + if self.Utils.clean_uid(userid) == self.Utils.clean_uid(uid): chan_obj.uids.remove(userid) result = True @@ -126,7 +126,7 @@ class Channel: for record in self.UID_CHANNEL_DB: for user_id in record.uids: - if self.Base.clean_uid(user_id) == self.Base.clean_uid(uid): + if self.Utils.clean_uid(user_id) == self.Utils.clean_uid(uid): record.uids.remove(user_id) result = True @@ -177,9 +177,9 @@ class Channel: if chan is None: return False - clean_uid = self.Base.clean_uid(uid=uid) + clean_uid = self.Utils.clean_uid(uid=uid) for chan_uid in chan.uids: - if self.Base.clean_uid(chan_uid) == clean_uid: + if self.Utils.clean_uid(chan_uid) == clean_uid: return True return False diff --git a/core/classes/client.py b/core/classes/client.py index f5e0172..f7afb41 100644 --- a/core/classes/client.py +++ b/core/classes/client.py @@ -2,17 +2,17 @@ from re import sub from typing import Any, Optional, Union, TYPE_CHECKING if TYPE_CHECKING: - from core.base import Base + from core.loader import Loader from core.definition import MClient class Client: CLIENT_DB: list['MClient'] = [] - def __init__(self, base: 'Base'): + def __init__(self, loader: 'Loader'): - self.Logs = base.logs - self.Base = base + self.Logs = loader.Logs + self.Base = loader.Base def insert(self, new_client: 'MClient') -> bool: """Insert a new User object diff --git a/core/classes/commands.py b/core/classes/commands.py index 6a47c75..64e9e93 100644 --- a/core/classes/commands.py +++ b/core/classes/commands.py @@ -2,14 +2,14 @@ from typing import TYPE_CHECKING, Optional from core.definition import MCommand if TYPE_CHECKING: - from core.base import Base + from core.loader import Loader class Command: DB_COMMANDS: list['MCommand'] = [] - def __init__(self, base: 'Base'): - self.Base = base + def __init__(self, loader: 'Loader'): + self.Base = loader.Base def build(self, new_command_obj: MCommand) -> bool: diff --git a/core/classes/protocols/inspircd.py b/core/classes/protocols/inspircd.py index 81bc9c8..67bc57e 100644 --- a/core/classes/protocols/inspircd.py +++ b/core/classes/protocols/inspircd.py @@ -15,8 +15,9 @@ class Inspircd: self.__Config = ircInstance.Config self.__Base = ircInstance.Base self.__Utils = ircInstance.Loader.Utils + self.__Logs = ircInstance.Loader.Logs - self.__Base.logs.info(f"** Loading protocol [{__name__}]") + self.__Logs.info(f"** Loading protocol [{__name__}]") def send2socket(self, message: str, print_log: bool = True) -> None: """Envoit les commandes à envoyer au serveur. @@ -28,24 +29,24 @@ class Inspircd: with self.__Base.lock: self.__Irc.IrcSocket.send(f"{message}\r\n".encode(self.__Config.SERVEUR_CHARSET[0])) if print_log: - self.__Base.logs.debug(f'<< {message}') + self.__Logs.debug(f'<< {message}') except UnicodeDecodeError as ude: - self.__Base.logs.error(f'Decode Error try iso-8859-1 - {ude} - {message}') + self.__Logs.error(f'Decode Error try iso-8859-1 - {ude} - {message}') self.__Irc.IrcSocket.send(f"{message}\r\n".encode(self.__Config.SERVEUR_CHARSET[1],'replace')) except UnicodeEncodeError as uee: - self.__Base.logs.error(f'Encode Error try iso-8859-1 - {uee} - {message}') + self.__Logs.error(f'Encode Error try iso-8859-1 - {uee} - {message}') self.__Irc.IrcSocket.send(f"{message}\r\n".encode(self.__Config.SERVEUR_CHARSET[1],'replace')) except AssertionError as ae: - self.__Base.logs.warning(f'Assertion Error {ae} - message: {message}') + self.__Logs.warning(f'Assertion Error {ae} - message: {message}') except SSLEOFError as soe: - self.__Base.logs.error(f"SSLEOFError: {soe} - {message}") + self.__Logs.error(f"SSLEOFError: {soe} - {message}") except SSLError as se: - self.__Base.logs.error(f"SSLError: {se} - {message}") + self.__Logs.error(f"SSLError: {se} - {message}") except OSError as oe: - self.__Base.logs.error(f"OSError: {oe} - {message}") + self.__Logs.error(f"OSError: {oe} - {message}") except AttributeError as ae: - self.__Base.logs.critical(f"Attribute Error: {ae}") + self.__Logs.critical(f"Attribute Error: {ae}") def send_priv_msg(self, nick_from: str, msg: str, channel: str = None, nick_to: str = None): """Sending PRIVMSG to a channel or to a nickname by batches @@ -62,7 +63,7 @@ class Inspircd: User_to = self.__Irc.User.get_User(nick_to) if nick_to is None else None if User_from is None: - self.__Base.logs.error(f"The sender nickname [{nick_from}] do not exist") + self.__Logs.error(f"The sender nickname [{nick_from}] do not exist") return None if not channel is None: @@ -75,7 +76,7 @@ class Inspircd: batch = str(msg)[i:i+batch_size] self.send2socket(f":{nick_from} PRIVMSG {User_to.uid} :{batch}") except Exception as err: - self.__Base.logs.error(f"General Error: {err}") + self.__Logs.error(f"General Error: {err}") def send_notice(self, nick_from: str, nick_to: str, msg: str) -> None: """Sending NOTICE by batches @@ -91,7 +92,7 @@ class Inspircd: User_to = self.__Irc.User.get_User(nick_to) if User_from is None or User_to is None: - self.__Base.logs.error(f"The sender [{nick_from}] or the Reciever [{nick_to}] do not exist") + self.__Logs.error(f"The sender [{nick_from}] or the Reciever [{nick_to}] do not exist") return None for i in range(0, len(str(msg)), batch_size): @@ -99,9 +100,9 @@ class Inspircd: self.send2socket(f":{User_from.uid} NOTICE {User_to.uid} :{batch}") except Exception as err: - self.__Base.logs.error(f"General Error: {err}") + self.__Logs.error(f"General Error: {err}") - def link(self): + def send_link(self): """Créer le link et envoyer les informations nécessaires pour la connexion au serveur. """ @@ -123,7 +124,7 @@ class Inspircd: service_id = self.__Config.SERVICE_ID version = self.__Config.CURRENT_VERSION - unixtime = self.__Base.get_unixtime() + unixtime = self.__Utils.get_unixtime() self.send2socket(f"CAPAB START 1206") @@ -133,7 +134,7 @@ class Inspircd: self.send2socket(f"BURST {unixtime}") self.send2socket(f":{server_id} ENDBURST") - self.__Base.logs.debug(f'>> {__name__} Link information sent to the server') + self.__Logs.debug(f'>> {__name__} Link information sent to the server') def gline(self, nickname: str, hostname: str, set_by: str, expire_timestamp: int, set_at_timestamp: int, reason: str) -> None: # TKL + G user host set_by expire_timestamp set_at_timestamp :reason @@ -142,12 +143,12 @@ class Inspircd: return None - def set_nick(self, newnickname: str) -> None: + def send_set_nick(self, newnickname: str) -> None: self.send2socket(f":{self.__Config.SERVICE_NICKNAME} NICK {newnickname}") return None - def squit(self, server_id: str, server_link: str, reason: str) -> None: + def send_squit(self, server_id: str, server_link: str, reason: str) -> None: if not reason: reason = 'Service Shutdown' @@ -155,26 +156,26 @@ class Inspircd: self.send2socket(f":{server_id} SQUIT {server_link} :{reason}") return None - def ungline(self, nickname:str, hostname: str) -> None: + def send_ungline(self, nickname:str, hostname: str) -> None: self.send2socket(f":{self.__Config.SERVEUR_ID} TKL - G {nickname} {hostname} {self.__Config.SERVICE_NICKNAME}") return None - def kline(self, nickname: str, hostname: str, set_by: str, expire_timestamp: int, set_at_timestamp: int, reason: str) -> None: + def send_kline(self, nickname: str, hostname: str, set_by: str, expire_timestamp: int, set_at_timestamp: int, reason: str) -> None: # TKL + k user host set_by expire_timestamp set_at_timestamp :reason self.send2socket(f":{self.__Config.SERVEUR_ID} TKL + k {nickname} {hostname} {set_by} {expire_timestamp} {set_at_timestamp} :{reason}") return None - def sjoin(self, channel: str) -> None: + def send_sjoin(self, channel: str) -> None: if not self.__Irc.Channel.is_valid_channel(channel): - self.__Base.logs.error(f"The channel [{channel}] is not valid") + self.__Logs.error(f"The channel [{channel}] is not valid") return None - self.send2socket(f":{self.__Config.SERVEUR_ID} SJOIN {self.__Base.get_unixtime()} {channel} + :{self.__Config.SERVICE_ID}") + self.send2socket(f":{self.__Config.SERVEUR_ID} SJOIN {self.__Utils.get_unixtime()} {channel} + :{self.__Config.SERVICE_ID}") # Add defender to the channel uids list self.__Irc.Channel.insert(self.__Irc.Loader.Definition.MChannel(name=channel, uids=[self.__Config.SERVICE_ID])) @@ -202,7 +203,7 @@ class Inspircd: self.__Irc.Reputation.delete(reputationObj.uid) if not self.__Irc.Channel.delete_user_from_all_channel(uid): - self.__Base.logs.error(f"The UID [{uid}] has not been deleted from all channels") + self.__Logs.error(f"The UID [{uid}] has not been deleted from all channels") return None @@ -221,9 +222,9 @@ class Inspircd: print_log (bool, optional): print logs if true. Defaults to True. """ # {self.Config.SERVEUR_ID} UID - # {clone.nickname} 1 {self.Base.get_unixtime()} {clone.username} {clone.hostname} {clone.uid} * {clone.umodes} {clone.vhost} * {self.Base.encode_ip(clone.remote_ip)} :{clone.realname} + # {clone.nickname} 1 {self.__Utils.get_unixtime()} {clone.username} {clone.hostname} {clone.uid} * {clone.umodes} {clone.vhost} * {self.Base.encode_ip(clone.remote_ip)} :{clone.realname} try: - unixtime = self.__Base.get_unixtime() + unixtime = self.__Utils.get_unixtime() encoded_ip = self.__Base.encode_ip(remote_ip) # Create the user @@ -242,7 +243,7 @@ class Inspircd: return None except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") def send_join_chan(self, uidornickname: str, channel: str, password: str = None, print_log: bool = True) -> None: """Joining a channel @@ -261,7 +262,7 @@ class Inspircd: return None if not self.__Irc.Channel.is_valid_channel(channel): - self.__Base.logs.error(f"The channel [{channel}] is not valid") + self.__Logs.error(f"The channel [{channel}] is not valid") return None self.send2socket(f":{userObj.uid} JOIN {channel} {passwordChannel}", print_log=print_log) @@ -282,11 +283,11 @@ class Inspircd: userObj = self.__Irc.User.get_User(uidornickname) if userObj is None: - self.__Base.logs.error(f"The user [{uidornickname}] is not valid") + self.__Logs.error(f"The user [{uidornickname}] is not valid") return None if not self.__Irc.Channel.is_valid_channel(channel): - self.__Base.logs.error(f"The channel [{channel}] is not valid") + self.__Logs.error(f"The channel [{channel}] is not valid") return None self.send2socket(f":{userObj.uid} PART {channel}", print_log=print_log) @@ -295,7 +296,7 @@ class Inspircd: self.__Irc.Channel.delete_user_from_channel(channel, userObj.uid) return None - def unkline(self, nickname:str, hostname: str) -> None: + def send_unkline(self, nickname:str, hostname: str) -> None: self.send2socket(f":{self.__Config.SERVEUR_ID} TKL - K {nickname} {hostname} {self.__Config.SERVICE_NICKNAME}") @@ -322,14 +323,14 @@ class Inspircd: # TODO : User object should be able to update user modes if self.__Irc.User.update_mode(userObj.uid, userMode): return None - # self.__Base.logs.debug(f"Updating user mode for [{userObj.nickname}] [{old_umodes}] => [{userObj.umodes}]") + # self.__Logs.debug(f"Updating user mode for [{userObj.nickname}] [{old_umodes}] => [{userObj.umodes}]") return None except IndexError as ie: - self.__Base.logs.error(f"{__name__} - Index Error: {ie}") + self.__Logs.error(f"{__name__} - Index Error: {ie}") except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") def on_quit(self, serverMsg: list[str]) -> None: """Handle quit coming from a server @@ -350,9 +351,9 @@ class Inspircd: return None except IndexError as ie: - self.__Base.logs.error(f"{__name__} - Index Error: {ie}") + self.__Logs.error(f"{__name__} - Index Error: {ie}") except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") def on_squit(self, serverMsg: list[str]) -> None: """Handle squit coming from a server @@ -409,9 +410,9 @@ class Inspircd: return None except IndexError as ie: - self.__Base.logs.error(f"{__name__} - Index Error: {ie}") + self.__Logs.error(f"{__name__} - Index Error: {ie}") except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") def on_sjoin(self, serverMsg: list[str]) -> None: """Handle sjoin coming from a server @@ -444,7 +445,7 @@ class Inspircd: # Boucle qui va ajouter l'ensemble des users (UID) for i in range(start_boucle, len(serverMsg)): parsed_UID = str(serverMsg[i]) - clean_uid = self.__Irc.User.clean_uid(parsed_UID) + clean_uid = self.__Utils.clean_uid(parsed_UID) if not clean_uid is None and len(clean_uid) == 9: list_users.append(parsed_UID) @@ -458,9 +459,9 @@ class Inspircd: return None except IndexError as ie: - self.__Base.logs.error(f"{__name__} - Index Error: {ie}") + self.__Logs.error(f"{__name__} - Index Error: {ie}") except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") def on_part(self, serverMsg: list[str]) -> None: """Handle part coming from a server @@ -479,9 +480,9 @@ class Inspircd: return None except IndexError as ie: - self.__Base.logs.error(f"{__name__} - Index Error: {ie}") + self.__Logs.error(f"{__name__} - Index Error: {ie}") except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") def on_uid(self, serverMsg: list[str]) -> None: """Handle uid message coming from the server @@ -542,9 +543,9 @@ class Inspircd: ) return None except IndexError as ie: - self.__Base.logs.error(f"{__name__} - Index Error: {ie}") + self.__Logs.error(f"{__name__} - Index Error: {ie}") except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") def on_server_ping(self, serverMsg: list[str]) -> None: """Send a PONG message to the server @@ -562,7 +563,7 @@ class Inspircd: return None except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") def on_version(self, serverMsg: list[str]) -> None: """Sending Server Version to the server @@ -574,7 +575,7 @@ class Inspircd: # Réponse a un CTCP VERSION try: - nickname = self.__Irc.User.get_nickname(self.__Base.clean_uid(serverMsg[1])) + nickname = self.__Irc.User.get_nickname(self.__Utils.clean_uid(serverMsg[1])) dnickname = self.__Config.SERVICE_NICKNAME arg = serverMsg[4].replace(':', '') @@ -586,7 +587,7 @@ class Inspircd: return None except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") def on_time(self, serverMsg: list[str]) -> None: """Sending TIME answer to a requestor @@ -598,7 +599,7 @@ class Inspircd: # Réponse a un CTCP VERSION try: - nickname = self.__Irc.User.get_nickname(self.__Base.clean_uid(serverMsg[1])) + nickname = self.__Irc.User.get_nickname(self.__Utils.clean_uid(serverMsg[1])) dnickname = self.__Config.SERVICE_NICKNAME arg = serverMsg[4].replace(':', '') current_datetime = self.__Utils.get_sdatetime() @@ -611,7 +612,7 @@ class Inspircd: return None except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") def on_ping(self, serverMsg: list[str]) -> None: """Sending a PING answer to requestor @@ -623,7 +624,7 @@ class Inspircd: # Réponse a un CTCP VERSION try: - nickname = self.__Irc.User.get_nickname(self.__Base.clean_uid(serverMsg[1])) + nickname = self.__Irc.User.get_nickname(self.__Utils.clean_uid(serverMsg[1])) dnickname = self.__Config.SERVICE_NICKNAME arg = serverMsg[4].replace(':', '') @@ -632,7 +633,7 @@ class Inspircd: if arg == '\x01PING': recieved_unixtime = int(serverMsg[5].replace('\x01','')) - current_unixtime = self.__Base.get_unixtime() + current_unixtime = self.__Utils.get_unixtime() ping_response = current_unixtime - recieved_unixtime # self.__Irc.send2socket(f':{dnickname} NOTICE {nickname} :\x01PING {ping_response} secs\x01') @@ -644,7 +645,7 @@ class Inspircd: return None except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") def on_version_msg(self, serverMsg: list[str]) -> None: """Handle version coming from the server @@ -654,7 +655,7 @@ class Inspircd: """ try: # ['@label=0073', ':0014E7P06', 'VERSION', 'PyDefender'] - getUser = self.__Irc.User.get_User(self.__Irc.User.clean_uid(serverMsg[1])) + getUser = self.__Irc.User.get_User(self.__Utils.clean_uid(serverMsg[1])) if getUser is None: return None @@ -669,4 +670,4 @@ class Inspircd: return None except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") diff --git a/core/classes/protocols/unreal6.py b/core/classes/protocols/unreal6.py index f6166e5..c1c785f 100644 --- a/core/classes/protocols/unreal6.py +++ b/core/classes/protocols/unreal6.py @@ -1,6 +1,6 @@ from re import match, findall, search from datetime import datetime -from typing import TYPE_CHECKING, Optional, Union +from typing import TYPE_CHECKING, Optional from ssl import SSLEOFError, SSLError if TYPE_CHECKING: @@ -17,20 +17,31 @@ class Unrealircd6: self.__Base = ircInstance.Base self.__Settings = ircInstance.Base.Settings self.__Utils = ircInstance.Loader.Utils + self.__Logs = ircInstance.Loader.Logs self.known_protocol: set[str] = {'SJOIN', 'UID', 'MD', 'QUIT', 'SQUIT', 'EOS', 'PRIVMSG', 'MODE', 'UMODE2', 'VERSION', 'REPUTATION', 'SVS2MODE', - 'SLOG', 'NICK', 'PART', 'PONG'} + 'SLOG', 'NICK', 'PART', 'PONG', + 'PROTOCTL', 'SERVER', 'SMOD', 'TKL', 'NETINFO'} - self.__Base.logs.info(f"** Loading protocol [{__name__}]") + self.__Logs.info(f"** Loading protocol [{__name__}]") def get_ircd_protocol_poisition(self, cmd: list[str]) -> tuple[int, Optional[str]]: + """Get the position of known commands + Args: + cmd (list[str]): The server response + + Returns: + tuple[int, Optional[str]]: The position and the command. + """ for index, token in enumerate(cmd): if token.upper() in self.known_protocol: return index, token.upper() + self.__Logs.debug(f"[IRCD LOGS] You need to handle this response: {cmd}") + return (-1, None) def send2socket(self, message: str, print_log: bool = True) -> None: @@ -43,24 +54,24 @@ class Unrealircd6: with self.__Base.lock: self.__Irc.IrcSocket.send(f"{message}\r\n".encode(self.__Config.SERVEUR_CHARSET[0])) if print_log: - self.__Base.logs.debug(f'<< {message}') + self.__Logs.debug(f'<< {message}') except UnicodeDecodeError as ude: - self.__Base.logs.error(f'Decode Error try iso-8859-1 - {ude} - {message}') + self.__Logs.error(f'Decode Error try iso-8859-1 - {ude} - {message}') self.__Irc.IrcSocket.send(f"{message}\r\n".encode(self.__Config.SERVEUR_CHARSET[1],'replace')) except UnicodeEncodeError as uee: - self.__Base.logs.error(f'Encode Error try iso-8859-1 - {uee} - {message}') + self.__Logs.error(f'Encode Error try iso-8859-1 - {uee} - {message}') self.__Irc.IrcSocket.send(f"{message}\r\n".encode(self.__Config.SERVEUR_CHARSET[1],'replace')) except AssertionError as ae: - self.__Base.logs.warning(f'Assertion Error {ae} - message: {message}') + self.__Logs.warning(f'Assertion Error {ae} - message: {message}') except SSLEOFError as soe: - self.__Base.logs.error(f"SSLEOFError: {soe} - {message}") + self.__Logs.error(f"SSLEOFError: {soe} - {message}") except SSLError as se: - self.__Base.logs.error(f"SSLError: {se} - {message}") + self.__Logs.error(f"SSLError: {se} - {message}") except OSError as oe: - self.__Base.logs.error(f"OSError: {oe} - {message}") + self.__Logs.error(f"OSError: {oe} - {message}") except AttributeError as ae: - self.__Base.logs.critical(f"Attribute Error: {ae}") + self.__Logs.critical(f"Attribute Error: {ae}") def send_priv_msg(self, nick_from: str, msg: str, channel: str = None, nick_to: str = None): """Sending PRIVMSG to a channel or to a nickname by batches @@ -77,7 +88,7 @@ class Unrealircd6: User_to = self.__Irc.User.get_User(nick_to) if not nick_to is None else None if User_from is None: - self.__Base.logs.error(f"The sender nickname [{nick_from}] do not exist") + self.__Logs.error(f"The sender nickname [{nick_from}] do not exist") return None if not channel is None: @@ -91,8 +102,8 @@ class Unrealircd6: self.send2socket(f":{nick_from} PRIVMSG {User_to.uid} :{batch}") except Exception as err: - self.__Base.logs.error(f"General Error: {err}") - self.__Base.logs.error(f"General Error: {nick_from} - {channel} - {nick_to}") + self.__Logs.error(f"General Error: {err}") + self.__Logs.error(f"General Error: {nick_from} - {channel} - {nick_to}") def send_notice(self, nick_from: str, nick_to: str, msg: str) -> None: """Sending NOTICE by batches @@ -108,7 +119,7 @@ class Unrealircd6: User_to = self.__Irc.User.get_User(nick_to) if User_from is None or User_to is None: - self.__Base.logs.error(f"The sender [{nick_from}] or the Reciever [{nick_to}] do not exist") + self.__Logs.error(f"The sender [{nick_from}] or the Reciever [{nick_to}] do not exist") return None for i in range(0, len(str(msg)), batch_size): @@ -116,9 +127,9 @@ class Unrealircd6: self.send2socket(f":{User_from.uid} NOTICE {User_to.uid} :{batch}") except Exception as err: - self.__Base.logs.error(f"General Error: {err}") + self.__Logs.error(f"General Error: {err}") - def parse_server_msg(self, server_msg: list[str]) -> Union[str, None]: + def parse_server_msg(self, server_msg: list[str]) -> Optional[str]: """Parse the server message and return the command Args: @@ -152,7 +163,7 @@ class Unrealircd6: return None - def link(self): + def send_link(self): """Créer le link et envoyer les informations nécessaires pour la connexion au serveur. """ @@ -175,7 +186,7 @@ class Unrealircd6: service_id = self.__Config.SERVICE_ID version = self.__Config.CURRENT_VERSION - unixtime = self.__Base.get_unixtime() + unixtime = self.__Utils.get_unixtime() self.send2socket(f":{server_id} PASS :{password}", print_log=False) self.send2socket(f":{server_id} PROTOCTL SID NOQUIT NICKv2 SJOIN SJ3 NICKIP TKLEXT2 NEXTBANS CLK EXTSWHOIS MLOCK MTAGS") @@ -185,20 +196,20 @@ class Unrealircd6: self.send2socket(f":{server_id} SERVER {link} 1 :{info}") self.send2socket(f":{server_id} {nickname} :Reserved for services") self.send2socket(f":{server_id} UID {nickname} 1 {unixtime} {username} {host} {service_id} * {smodes} * * fwAAAQ== :{realname}") - self.sjoin(chan) + self.send_sjoin(chan) self.send2socket(f":{server_id} TKL + Q * {nickname} {host} 0 {unixtime} :Reserved for services") self.send2socket(f":{service_id} MODE {chan} {cmodes}") - self.__Base.logs.debug(f'>> {__name__} Link information sent to the server') + self.__Logs.debug(f'>> {__name__} Link information sent to the server') - def gline(self, nickname: str, hostname: str, set_by: str, expire_timestamp: int, set_at_timestamp: int, reason: str) -> None: + def send_gline(self, nickname: str, hostname: str, set_by: str, expire_timestamp: int, set_at_timestamp: int, reason: str) -> None: # TKL + G user host set_by expire_timestamp set_at_timestamp :reason self.send2socket(f":{self.__Config.SERVEUR_ID} TKL + G {nickname} {hostname} {set_by} {expire_timestamp} {set_at_timestamp} :{reason}") return None - def set_nick(self, newnickname: str) -> None: + def send_set_nick(self, newnickname: str) -> None: """Change nickname of the server \n This method will also update the User object Args: @@ -210,7 +221,7 @@ class Unrealircd6: self.__Irc.User.update_nickname(userObj.uid, newnickname) return None - def squit(self, server_id: str, server_link: str, reason: str) -> None: + def send_squit(self, server_id: str, server_link: str, reason: str) -> None: if not reason: reason = 'Service Shutdown' @@ -218,36 +229,36 @@ class Unrealircd6: self.send2socket(f":{server_id} SQUIT {server_link} :{reason}") return None - def ungline(self, nickname:str, hostname: str) -> None: + def send_ungline(self, nickname:str, hostname: str) -> None: self.send2socket(f":{self.__Config.SERVEUR_ID} TKL - G {nickname} {hostname} {self.__Config.SERVICE_NICKNAME}") return None - def kline(self, nickname: str, hostname: str, set_by: str, expire_timestamp: int, set_at_timestamp: int, reason: str) -> None: + def send_kline(self, nickname: str, hostname: str, set_by: str, expire_timestamp: int, set_at_timestamp: int, reason: str) -> None: # TKL + k user host set_by expire_timestamp set_at_timestamp :reason self.send2socket(f":{self.__Config.SERVEUR_ID} TKL + k {nickname} {hostname} {set_by} {expire_timestamp} {set_at_timestamp} :{reason}") return None - def unkline(self, nickname:str, hostname: str) -> None: + def send_unkline(self, nickname:str, hostname: str) -> None: self.send2socket(f":{self.__Config.SERVEUR_ID} TKL - K {nickname} {hostname} {self.__Config.SERVICE_NICKNAME}") return None - def sjoin(self, channel: str) -> None: + def send_sjoin(self, channel: str) -> None: """Server will join a channel with pre defined umodes Args: channel (str): Channel to join """ if not self.__Irc.Channel.is_valid_channel(channel): - self.__Base.logs.error(f"The channel [{channel}] is not valid") + self.__Logs.error(f"The channel [{channel}] is not valid") return None - self.send2socket(f":{self.__Config.SERVEUR_ID} SJOIN {self.__Base.get_unixtime()} {channel} {self.__Config.SERVICE_UMODES} :{self.__Config.SERVICE_ID}") + self.send2socket(f":{self.__Config.SERVEUR_ID} SJOIN {self.__Utils.get_unixtime()} {channel} {self.__Config.SERVICE_UMODES} :{self.__Config.SERVICE_ID}") self.send2socket(f":{self.__Config.SERVICE_ID} MODE {channel} {self.__Config.SERVICE_UMODES} {self.__Config.SERVICE_ID}") # Add defender to the channel uids list @@ -277,7 +288,7 @@ class Unrealircd6: return None except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") def send_sajoin(self, nick_to_sajoin: str, channel_name: str) -> None: """_summary_ @@ -314,7 +325,7 @@ class Unrealircd6: return None except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") def send_svs_mode(self, nickname: str, user_mode: str) -> None: try: @@ -333,7 +344,7 @@ class Unrealircd6: return None except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") def send_quit(self, uid: str, reason: str, print_log: True) -> None: """Send quit message @@ -355,7 +366,7 @@ class Unrealircd6: self.__Irc.Reputation.delete(reputationObj.uid) if not self.__Irc.Channel.delete_user_from_all_channel(uid): - self.__Base.logs.error(f"The UID [{uid}] has not been deleted from all channels") + self.__Logs.error(f"The UID [{uid}] has not been deleted from all channels") return None @@ -374,9 +385,9 @@ class Unrealircd6: print_log (bool, optional): print logs if true. Defaults to True. """ # {self.Config.SERVEUR_ID} UID - # {clone.nickname} 1 {self.Base.get_unixtime()} {clone.username} {clone.hostname} {clone.uid} * {clone.umodes} {clone.vhost} * {self.Base.encode_ip(clone.remote_ip)} :{clone.realname} + # {clone.nickname} 1 {self.__Utils.get_unixtime()} {clone.username} {clone.hostname} {clone.uid} * {clone.umodes} {clone.vhost} * {self.Base.encode_ip(clone.remote_ip)} :{clone.realname} try: - unixtime = self.__Base.get_unixtime() + unixtime = self.__Utils.get_unixtime() encoded_ip = self.__Base.encode_ip(remote_ip) # Create the user @@ -395,7 +406,7 @@ class Unrealircd6: return None except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") def send_join_chan(self, uidornickname: str, channel: str, password: str = None, print_log: bool = True) -> None: """Joining a channel @@ -414,7 +425,7 @@ class Unrealircd6: return None if not self.__Irc.Channel.is_valid_channel(channel): - self.__Base.logs.error(f"The channel [{channel}] is not valid") + self.__Logs.error(f"The channel [{channel}] is not valid") return None self.send2socket(f":{userObj.uid} JOIN {channel} {passwordChannel}", print_log=print_log) @@ -450,11 +461,11 @@ class Unrealircd6: userObj = self.__Irc.User.get_User(uidornickname) if userObj is None: - self.__Base.logs.error(f"The user [{uidornickname}] is not valid") + self.__Logs.error(f"The user [{uidornickname}] is not valid") return None if not self.__Irc.Channel.is_valid_channel(channel): - self.__Base.logs.error(f"The channel [{channel}] is not valid") + self.__Logs.error(f"The channel [{channel}] is not valid") return None self.send2socket(f":{userObj.uid} PART {channel}", print_log=print_log) @@ -467,7 +478,7 @@ class Unrealircd6: channel = self.__Irc.Channel.is_valid_channel(channel_name) if not channel: - self.__Base.logs.error(f'The channel [{channel_name}] is not correct') + self.__Logs.error(f'The channel [{channel_name}] is not correct') return None self.send2socket(f":{self.__Config.SERVICE_NICKNAME} MODE {channel_name} {channel_mode}") @@ -505,9 +516,9 @@ class Unrealircd6: return None except IndexError as ie: - self.__Base.logs.error(f"{__name__} - Index Error: {ie}") + self.__Logs.error(f"{__name__} - Index Error: {ie}") except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") def on_mode(self, serverMsg: list[str]) -> None: """Handle mode coming from a server @@ -541,14 +552,14 @@ class Unrealircd6: # TODO : User object should be able to update user modes if self.__Irc.User.update_mode(userObj.uid, userMode): return None - # self.__Base.logs.debug(f"Updating user mode for [{userObj.nickname}] [{old_umodes}] => [{userObj.umodes}]") + # self.__Logs.debug(f"Updating user mode for [{userObj.nickname}] [{old_umodes}] => [{userObj.umodes}]") return None except IndexError as ie: - self.__Base.logs.error(f"{__name__} - Index Error: {ie}") + self.__Logs.error(f"{__name__} - Index Error: {ie}") except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") def on_quit(self, serverMsg: list[str]) -> None: """Handle quit coming from a server @@ -569,9 +580,9 @@ class Unrealircd6: return None except IndexError as ie: - self.__Base.logs.error(f"{__name__} - Index Error: {ie}") + self.__Logs.error(f"{__name__} - Index Error: {ie}") except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") def on_squit(self, serverMsg: list[str]) -> None: """Handle squit coming from a server @@ -651,9 +662,9 @@ class Unrealircd6: return None except IndexError as ie: - self.__Base.logs.error(f"{__name__} - Index Error: {ie}") + self.__Logs.error(f"{__name__} - Index Error: {ie}") except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") def on_sjoin(self, serverMsg: list[str]) -> None: """Handle sjoin coming from a server @@ -690,7 +701,7 @@ class Unrealircd6: # Boucle qui va ajouter l'ensemble des users (UID) for i in range(start_boucle, len(serverMsg_copy)): parsed_UID = str(serverMsg_copy[i]) - clean_uid = self.__Irc.User.clean_uid(parsed_UID) + clean_uid = self.__Utils.clean_uid(parsed_UID) if not clean_uid is None and len(clean_uid) == 9: list_users.append(clean_uid) @@ -704,9 +715,9 @@ class Unrealircd6: return None except IndexError as ie: - self.__Base.logs.error(f"{__name__} - Index Error: {ie}") + self.__Logs.error(f"{__name__} - Index Error: {ie}") except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") def on_part(self, serverMsg: list[str]) -> None: """Handle part coming from a server @@ -725,9 +736,9 @@ class Unrealircd6: return None except IndexError as ie: - self.__Base.logs.error(f"{__name__} - Index Error: {ie}") + self.__Logs.error(f"{__name__} - Index Error: {ie}") except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") def on_eos(self, serverMsg: list[str]) -> None: """Handle EOS coming from a server @@ -759,16 +770,16 @@ class Unrealircd6: print(f"# VERSION : {version} ") print(f"################################################") - self.__Base.logs.info(f"################### DEFENDER ###################") - self.__Base.logs.info(f"# SERVICE CONNECTE ") - self.__Base.logs.info(f"# SERVEUR : {self.__Config.SERVEUR_IP} ") - self.__Base.logs.info(f"# PORT : {self.__Config.SERVEUR_PORT} ") - self.__Base.logs.info(f"# SSL : {self.__Config.SERVEUR_SSL} ") - self.__Base.logs.info(f"# SSL VER : {self.__Config.SSL_VERSION} ") - self.__Base.logs.info(f"# NICKNAME : {self.__Config.SERVICE_NICKNAME} ") - self.__Base.logs.info(f"# CHANNEL : {self.__Config.SERVICE_CHANLOG} ") - self.__Base.logs.info(f"# VERSION : {version} ") - self.__Base.logs.info(f"################################################") + self.__Logs.info(f"################### DEFENDER ###################") + self.__Logs.info(f"# SERVICE CONNECTE ") + self.__Logs.info(f"# SERVEUR : {self.__Config.SERVEUR_IP} ") + self.__Logs.info(f"# PORT : {self.__Config.SERVEUR_PORT} ") + self.__Logs.info(f"# SSL : {self.__Config.SERVEUR_SSL} ") + self.__Logs.info(f"# SSL VER : {self.__Config.SSL_VERSION} ") + self.__Logs.info(f"# NICKNAME : {self.__Config.SERVICE_NICKNAME} ") + self.__Logs.info(f"# CHANNEL : {self.__Config.SERVICE_CHANLOG} ") + self.__Logs.info(f"# VERSION : {version} ") + self.__Logs.info(f"################################################") if self.__Base.check_for_new_version(False): self.send_priv_msg( @@ -791,11 +802,11 @@ class Unrealircd6: return None except IndexError as ie: - self.__Base.logs.error(f"{__name__} - Key Error: {ie}") + self.__Logs.error(f"{__name__} - Key Error: {ie}") except KeyError as ke: - self.__Base.logs.error(f"{__name__} - Key Error: {ke}") + self.__Logs.error(f"{__name__} - Key Error: {ke}") except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") def on_reputation(self, serverMsg: list[str]) -> None: """Handle REPUTATION coming from a server @@ -826,7 +837,7 @@ class Unrealircd6: self.__Irc.first_score = 0 self.Logs.error(f'Value Error {__name__}: {ve}') except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") def on_uid(self, serverMsg: list[str]) -> None: """Handle uid message coming from the server @@ -887,9 +898,9 @@ class Unrealircd6: ) return None except IndexError as ie: - self.__Base.logs.error(f"{__name__} - Index Error: {ie}") + self.__Logs.error(f"{__name__} - Index Error: {ie}") except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") def on_privmsg(self, serverMsg: list[str]) -> None: """Handle PRIVMSG message coming from the server @@ -910,11 +921,11 @@ class Unrealircd6: if cmd[2] == 'PRIVMSG' and cmd[4] == ':auth': data_copy = cmd.copy() data_copy[6] = '**********' - self.__Base.logs.debug(f">> {data_copy}") + self.__Logs.debug(f">> {data_copy}") else: - self.__Base.logs.debug(f">> {cmd}") + self.__Logs.debug(f">> {cmd}") else: - self.__Base.logs.debug(f">> {cmd}") + self.__Logs.debug(f">> {cmd}") get_uid_or_nickname = str(cmd[0].replace(':','')) user_trigger = self.__Irc.User.get_nickname(get_uid_or_nickname) @@ -929,7 +940,7 @@ class Unrealircd6: arg = convert_to_string.split() arg.remove(f':{self.__Config.SERVICE_PREFIX}') if not arg[0].lower() in self.__Irc.module_commands_list: - self.__Base.logs.debug(f"This command {arg[0]} is not available") + self.__Logs.debug(f"This command {arg[0]} is not available") self.send_notice( nick_from=self.__Config.SERVICE_NICKNAME, nick_to=user_trigger, @@ -968,7 +979,7 @@ class Unrealircd6: return False if not arg[0].lower() in self.__Irc.module_commands_list: - self.__Base.logs.debug(f"This command {arg[0]} sent by {user_trigger} is not available") + self.__Logs.debug(f"This command {arg[0]} sent by {user_trigger} is not available") return False cmd_to_send = convert_to_string.replace(':','') @@ -982,11 +993,11 @@ class Unrealircd6: return None except KeyError as ke: - self.__Base.logs.error(f"Key Error: {ke}") + self.__Logs.error(f"Key Error: {ke}") except AttributeError as ae: - self.__Base.logs.error(f"Attribute Error: {ae}") + self.__Logs.error(f"Attribute Error: {ae}") except Exception as err: - self.__Base.logs.error(f"General Error: {err} - {srv_msg}") + self.__Logs.error(f"General Error: {err} - {srv_msg}") def on_server_ping(self, serverMsg: list[str]) -> None: """Send a PONG message to the server @@ -1001,7 +1012,7 @@ class Unrealircd6: return None except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") def on_version(self, serverMsg: list[str]) -> None: """Sending Server Version to the server @@ -1013,7 +1024,7 @@ class Unrealircd6: # Réponse a un CTCP VERSION try: - nickname = self.__Irc.User.get_nickname(self.__Base.clean_uid(serverMsg[1])) + nickname = self.__Irc.User.get_nickname(self.__Utils.clean_uid(serverMsg[1])) dnickname = self.__Config.SERVICE_NICKNAME arg = serverMsg[4].replace(':', '') @@ -1025,7 +1036,7 @@ class Unrealircd6: return None except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") def on_time(self, serverMsg: list[str]) -> None: """Sending TIME answer to a requestor @@ -1037,7 +1048,7 @@ class Unrealircd6: # Réponse a un CTCP VERSION try: - nickname = self.__Irc.User.get_nickname(self.__Base.clean_uid(serverMsg[1])) + nickname = self.__Irc.User.get_nickname(self.__Utils.clean_uid(serverMsg[1])) dnickname = self.__Config.SERVICE_NICKNAME arg = serverMsg[4].replace(':', '') current_datetime = self.__Utils.get_sdatetime() @@ -1050,7 +1061,7 @@ class Unrealircd6: return None except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") def on_ping(self, serverMsg: list[str]) -> None: """Sending a PING answer to requestor @@ -1062,7 +1073,7 @@ class Unrealircd6: # Réponse a un CTCP VERSION try: - nickname = self.__Irc.User.get_nickname(self.__Base.clean_uid(serverMsg[1])) + nickname = self.__Irc.User.get_nickname(self.__Utils.clean_uid(serverMsg[1])) dnickname = self.__Config.SERVICE_NICKNAME arg = serverMsg[4].replace(':', '') @@ -1071,7 +1082,7 @@ class Unrealircd6: if arg == '\x01PING': recieved_unixtime = int(serverMsg[5].replace('\x01','')) - current_unixtime = self.__Base.get_unixtime() + current_unixtime = self.__Utils.get_unixtime() ping_response = current_unixtime - recieved_unixtime # self.__Irc.send2socket(f':{dnickname} NOTICE {nickname} :\x01PING {ping_response} secs\x01') @@ -1083,7 +1094,7 @@ class Unrealircd6: return None except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") def on_version_msg(self, serverMsg: list[str]) -> None: """Handle version coming from the server @@ -1097,7 +1108,7 @@ class Unrealircd6: if '@' in list(serverMsg_copy[0])[0]: serverMsg_copy.pop(0) - getUser = self.__Irc.User.get_User(self.__Irc.User.clean_uid(serverMsg_copy[0])) + getUser = self.__Irc.User.get_User(self.__Utils.clean_uid(serverMsg_copy[0])) if getUser is None: return None @@ -1115,4 +1126,4 @@ class Unrealircd6: return None except Exception as err: - self.__Base.logs.error(f"{__name__} - General Error: {err}") + self.__Logs.error(f"{__name__} - General Error: {err}") diff --git a/core/classes/reputation.py b/core/classes/reputation.py index 408f95a..859f62c 100644 --- a/core/classes/reputation.py +++ b/core/classes/reputation.py @@ -1,14 +1,16 @@ -from typing import Optional +from typing import TYPE_CHECKING, Optional from core.definition import MReputation -from core.base import Base + +if TYPE_CHECKING: + from core.loader import Loader class Reputation: UID_REPUTATION_DB: list[MReputation] = [] - def __init__(self, base: Base): + def __init__(self, loader: 'Loader'): - self.Logs = base.logs + self.Logs = loader.Logs self.MReputation: MReputation = MReputation def insert(self, new_reputation_user: MReputation) -> bool: diff --git a/core/classes/user.py b/core/classes/user.py index a582780..379a851 100644 --- a/core/classes/user.py +++ b/core/classes/user.py @@ -3,17 +3,17 @@ from typing import Any, Optional, TYPE_CHECKING from datetime import datetime if TYPE_CHECKING: - from core.base import Base + from core.loader import Loader from core.definition import MUser class User: UID_DB: list['MUser'] = [] - def __init__(self, base: 'Base'): + def __init__(self, loader: 'Loader'): - self.Logs = base.logs - self.Base = base + self.Logs = loader.Logs + self.Base = loader.Base def insert(self, new_user: 'MUser') -> bool: """Insert a new User object diff --git a/core/irc.py b/core/irc.py index 17bad7b..6995e25 100644 --- a/core/irc.py +++ b/core/irc.py @@ -54,7 +54,7 @@ class Irc: self.Base = self.Loader.Base # Logger - self.Logs = self.Loader.Base.logs + self.Logs = self.Loader.Logs # Get Settings. self.Settings = self.Base.Settings @@ -216,7 +216,7 @@ class Irc: protocol=self.Config.SERVEUR_PROTOCOL, ircInstance=self.ircObject ).Protocol - self.Protocol.link() # Etablir le link en fonction du protocol choisi + self.Protocol.send_link() # Etablir le link en fonction du protocol choisi self.signal = True # Une variable pour initier la boucle infinie self.__join_saved_channels() # Join existing channels self.load_existing_modules() # Charger les modules existant dans la base de données @@ -240,7 +240,7 @@ class Irc: self.init_service_user() self.__create_socket() - self.Protocol.link() + self.Protocol.send_link() self.__join_saved_channels() self.load_existing_modules() self.Config.DEFENDER_RESTART = 0 @@ -302,7 +302,7 @@ class Irc: if result_query: for chan_name in result_query: chan = chan_name[0] - self.Protocol.sjoin(channel=chan) + self.Protocol.send_sjoin(channel=chan) def send_response(self, responses:list[bytes]) -> None: try: @@ -359,13 +359,10 @@ class Irc: p = self.Protocol admin_obj = self.Admin.get_admin(nickname) dnickname = self.Config.SERVICE_NICKNAME - color_bold = self.Config.COLORS.bold color_nogc = self.Config.COLORS.nogc - color_blue = self.Config.COLORS.blue color_black = self.Config.COLORS.black - color_underline = self.Config.COLORS.underline current_level = 0 - count = 0 + if admin_obj is not None: current_level = admin_obj.level @@ -377,39 +374,11 @@ class Irc: for cmd in self.Commands.get_commands_by_level(current_level): if module is None or cmd.module_name.lower() == module.lower(): p.send_notice( - nick_from=dnickname, - nick_to=nickname, - msg=f" {color_black}{cmd.command_level:<8}{color_nogc}| {cmd.command_name:<25}| {cmd.module_name:<15}| {cmd.description:<35}" - ) - - return - - for level, modules in self.module_commands.items(): - if level > current_level: - break - - if count > 0: - p.send_notice(nick_from=dnickname, nick_to=nickname, msg=" ") - - p.send_notice( - nick_from=dnickname, - nick_to=nickname, - msg=f"{color_blue}{color_bold}Level {level}:{color_nogc}" - ) - - for module_name, commands in modules.items(): - if module is None or module.lower() == module_name.lower(): - p.send_notice( nick_from=dnickname, nick_to=nickname, - msg=f"{color_black} {color_underline}Module: {module_name}{color_nogc}" + msg=f" {color_black}{cmd.command_level:<8}{color_nogc}| {cmd.command_name:<25}| {cmd.module_name:<15}| {cmd.description:<35}" ) - for command, description in commands.items(): - p.send_notice(nick_from=dnickname, nick_to=nickname, msg=f" {command:<20}: {description}") - - count += 1 - - p.send_notice(nick_from=dnickname,nick_to=nickname,msg=f" ***************** FIN DES COMMANDES *****************") + return None def generate_help_menu_bakcup(self, nickname: str) -> None: @@ -888,16 +857,13 @@ class Irc: case 'PING': self.Protocol.on_server_ping(serverMsg=original_response) - self.Logs.debug(f"** handle {parsed_protocol}") return None case 'SJOIN': self.Protocol.on_sjoin(serverMsg=original_response) - self.Logs.debug(f"** handle {parsed_protocol}") case 'EOS': self.Protocol.on_eos(serverMsg=original_response) - self.Logs.debug(f"** handle {parsed_protocol}") case 'UID': try: @@ -906,48 +872,37 @@ class Irc: for classe_name, classe_object in self.loaded_classes.items(): classe_object.cmd(original_response) - self.Logs.debug(f"** handle {parsed_protocol}") - except Exception as err: self.Logs.error(f'General Error: {err}') case 'QUIT': self.Protocol.on_quit(serverMsg=original_response) - self.Logs.debug(f"** handle {parsed_protocol}") case 'PROTOCTL': self.Protocol.on_protoctl(serverMsg=original_response) - self.Logs.debug(f"** handle {parsed_protocol}") case 'SVS2MODE': # >> [':00BAAAAAG', 'SVS2MODE', '001U01R03', '-r'] self.Protocol.on_svs2mode(serverMsg=original_response) - self.Logs.debug(f"** handle {parsed_protocol}") case 'SQUIT': self.Protocol.on_squit(serverMsg=original_response) - self.Logs.debug(f"** handle {parsed_protocol}") case 'PART': self.Protocol.on_part(serverMsg=parsed_protocol) - self.Logs.debug(f"** handle {parsed_protocol}") case 'VERSION': self.Protocol.on_version_msg(serverMsg=original_response) - self.Logs.debug(f"** handle {parsed_protocol}") case 'UMODE2': # [':adator_', 'UMODE2', '-i'] self.Protocol.on_umode2(serverMsg=original_response) - self.Logs.debug(f"** handle {parsed_protocol}") case 'NICK': self.Protocol.on_nick(serverMsg=original_response) - self.Logs.debug(f"** handle {parsed_protocol}") case 'REPUTATION': self.Protocol.on_reputation(serverMsg=original_response) - self.Logs.debug(f"** handle {parsed_protocol}") case 'SLOG': # TODO self.Logs.debug(f"** handle {parsed_protocol}") @@ -957,7 +912,6 @@ class Irc: case 'PRIVMSG': self.Protocol.on_privmsg(serverMsg=original_response) - self.Logs.debug(f"** handle {parsed_protocol}") case 'PONG': # TODO self.Logs.debug(f"** handle {parsed_protocol}") @@ -985,10 +939,9 @@ class Irc: classe_object.cmd(original_response) except IndexError as ie: - self.Logs.error(f"{ie} / {original_response} / length {str(len(original_response))}") + self.Logs.error(f"IndexError: {ie}") except Exception as err: - self.Logs.error(f"General Error: {err}") - self.Logs.error(f"General Error: {traceback.format_exc()}") + self.Logs.error(f"General Error: {err}", exc_info=True) def hcmds(self, user: str, channel: Union[str, None], cmd: list, fullcmd: list = []) -> None: """Create @@ -1461,9 +1414,6 @@ class Irc: module_name = str(cmd[1]) if len(cmd) == 2 else None self.generate_help_menu(nickname=fromuser, module=module_name) - for com in self.Commands.get_ordered_commands(): - print(com) - case 'load': try: # Load a module ex: .load mod_defender @@ -1513,7 +1463,7 @@ class Irc: nick_to=fromuser, msg=f"Arrêt du service {dnickname}" ) - self.Protocol.squit(server_id=self.Config.SERVEUR_ID, server_link=self.Config.SERVEUR_LINK, reason=final_reason) + self.Protocol.send_squit(server_id=self.Config.SERVEUR_ID, server_link=self.Config.SERVEUR_LINK, reason=final_reason) self.Logs.info(f'Arrêt du server {dnickname}') self.Config.DEFENDER_RESTART = 0 self.signal = False @@ -1540,7 +1490,7 @@ class Irc: self.Channel.UID_CHANNEL_DB.clear() # Clear Channel Object self.Base.delete_logger(self.Config.LOGGING_NAME) - self.Protocol.squit(server_id=self.Config.SERVEUR_ID, server_link=self.Config.SERVEUR_LINK, reason=final_reason) + self.Protocol.send_squit(server_id=self.Config.SERVEUR_ID, server_link=self.Config.SERVEUR_LINK, reason=final_reason) self.Logs.info(f'Redémarrage du server {dnickname}') self.loaded_classes.clear() self.Config.DEFENDER_RESTART = 1 # Set restart status to 1 saying that the service will restart @@ -1592,14 +1542,16 @@ class Irc: restart_flag = True if service_nickname != self.Config.SERVICE_NICKNAME: - self.Protocol.set_nick(self.Config.SERVICE_NICKNAME) + self.Protocol.send_set_nick(self.Config.SERVICE_NICKNAME) if restart_flag: self.Config.SERVEUR_ID = serveur_id self.Protocol.send_priv_msg(nick_from=self.Config.SERVICE_NICKNAME, msg='You need to restart defender !', channel=self.Config.SERVICE_CHANLOG) - self.Base.delete_logger(self.Config.LOGGING_NAME) - self.Base = self.Loader.BaseModule.Base(self.Config, self.Settings) + self.Loader.ServiceLogging.remove_logger() + self.Loader.ServiceLogging = self.Loader.LoggingModule.ServiceLogging() + self.Loader.Logs = self.Loader.ServiceLogging.get_logger() + self.Base = self.Loader.BaseModule.Base(self.Loader) importlib.reload(mod_unreal6) importlib.reload(mod_protocol) diff --git a/core/loader.py b/core/loader.py index b45183f..f502417 100644 --- a/core/loader.py +++ b/core/loader.py @@ -1,4 +1,6 @@ +from logging import Logger from core.classes import user, admin, client, channel, reputation, settings, commands +import core.logs as logs import core.definition as df import core.utils as utils import core.base as base_module @@ -9,29 +11,35 @@ class Loader: def __init__(self): # Load Main Modules - self.Definition: df = df + self.Definition: df = df - self.ConfModule: conf_module = conf_module + self.ConfModule: conf_module = conf_module - self.BaseModule: base_module = base_module + self.BaseModule: base_module = base_module - self.Utils: utils = utils + self.Utils: utils = utils + + self.LoggingModule: logs = logs # Load Classes + self.ServiceLogging: logs.ServiceLogging = logs.ServiceLogging() + + self.Logs: Logger = self.ServiceLogging.get_logger() + self.Settings: settings.Settings = settings.Settings() self.Config: df.MConfig = self.ConfModule.Configuration().ConfigObject - self.Base: base_module.Base = self.BaseModule.Base(self) + self.Base: base_module.Base = self.BaseModule.Base(self) - self.User: user.User = user.User(self.Base) + self.User: user.User = user.User(self) - self.Client: client.Client = client.Client(self.Base) + self.Client: client.Client = client.Client(self) - self.Admin: admin.Admin = admin.Admin(self.Base) + self.Admin: admin.Admin = admin.Admin(self) - self.Channel: channel.Channel = channel.Channel(self.Base) + self.Channel: channel.Channel = channel.Channel(self) - self.Reputation: reputation.Reputation = reputation.Reputation(self.Base) + self.Reputation: reputation.Reputation = reputation.Reputation(self) - self.Commands: commands.Command = commands.Command(self.Base) + self.Commands: commands.Command = commands.Command(self) diff --git a/core/logs.py b/core/logs.py new file mode 100644 index 0000000..b100bdc --- /dev/null +++ b/core/logs.py @@ -0,0 +1,108 @@ +import logging +from os import path, makedirs, sep + +class ServiceLogging: + + def __init__(self, loggin_name: str = "defender"): + """Create the Logging object + """ + self.OS_SEP = sep + self.LOGGING_NAME = loggin_name + self.DEBUG_LEVEL, self.DEBUG_FILE_LEVEL, self.DEBUG_STDOUT_LEVEL = (10, 10, 10) + self.SERVER_PREFIX = None + self.LOGGING_CONSOLE = True + + self.LOG_FILTERS: list[str] = ['PING', f":{self.SERVER_PREFIX}auth", "['PASS'"] + + self.file_handler = None + self.stdout_handler = None + + self.logs: logging.Logger = self.start_log_system() + + def get_logger(self) -> logging.Logger: + + logs_obj: logging.Logger = self.logs + + return logs_obj + + def remove_logger(self) -> None: + + # Récupérer le logger + logger = logging.getLogger(self.LOGGING_NAME) + + # Retirer tous les gestionnaires du logger et les fermer + for handler in logger.handlers[:]: # Utiliser une copie de la liste + # print(handler) + logger.removeHandler(handler) + handler.close() + + # Supprimer le logger du dictionnaire global + logging.Logger.manager.loggerDict.pop(self.LOGGING_NAME, None) + + return None + + def start_log_system(self) -> logging.Logger: + + os_sep = self.OS_SEP + logging_name = self.LOGGING_NAME + debug_level = self.DEBUG_LEVEL + debug_file_level = self.DEBUG_FILE_LEVEL + debug_stdout_level = self.DEBUG_STDOUT_LEVEL + + # Create folder if not available + logs_directory = f'logs{os_sep}' + if not path.exists(f'{logs_directory}'): + makedirs(logs_directory) + + # Init logs object + logs = logging.getLogger(logging_name) + logs.setLevel(debug_level) + + # Add Handlers + self.file_handler = logging.FileHandler(f'logs{os_sep}{logging_name}.log',encoding='UTF-8') + self.file_handler.setLevel(debug_file_level) + + self.stdout_handler = logging.StreamHandler() + self.stdout_handler.setLevel(debug_stdout_level) + + # Define log format + formatter = logging.Formatter( + fmt='%(asctime)s - %(levelname)s - %(message)s (%(filename)s:%(funcName)s:%(lineno)d)', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + # Apply log format + self.file_handler.setFormatter(formatter) + self.stdout_handler.setFormatter(formatter) + + # Add handler to logs + logs.addHandler(self.file_handler) + logs.addHandler(self.stdout_handler) + + # Apply the filter + logs.addFilter(self.replace_filter) + + logs.info(f'#################### STARTING {self.LOGGING_NAME} ####################') + + return logs + + def set_stdout_handler_level(self, level: int) -> None: + self.stdout_handler.setLevel(level) + + def set_file_handler_level(self, level: int) -> None: + self.file_handler.setLevel(level) + + def replace_filter(self, record: logging.LogRecord) -> bool: + + response = True + filter: list[str] = ['PING', f":{self.SERVER_PREFIX}auth", "['PASS'"] + + # record.msg = record.getMessage().replace("PING", "[REDACTED]") + # if self.LOGGING_CONSOLE: + # print(record.getMessage()) + + for f in filter: + if f in record.getMessage(): + response = False + + return response # Retourne True pour permettre l'affichage du message diff --git a/core/utils.py b/core/utils.py index d5aec4b..b832335 100644 --- a/core/utils.py +++ b/core/utils.py @@ -2,8 +2,9 @@ Main utils library. ''' from pathlib import Path +from re import sub from typing import Literal, Optional, Any -from datetime import datetime +from datetime import datetime, timedelta, timezone from time import time from random import choice from hashlib import md5, sha3_512 @@ -29,6 +30,9 @@ def get_unixtime() -> int: Returns: int: Current time in seconds since the Epoch (int) """ + cet_offset = timezone(timedelta(hours=2)) + now_cet = datetime.now(cet_offset) + unixtime_cet = int(now_cet.timestamp()) return int(time()) def get_sdatetime() -> str: @@ -89,4 +93,21 @@ def get_all_modules() -> list[str]: list[str]: List of module names. """ base_path = Path('mods') - return [file.name.replace('.py', '') for file in base_path.rglob('mod_*.py')] \ No newline at end of file + return [file.name.replace('.py', '') for file in base_path.rglob('mod_*.py')] + +def clean_uid(uid: str) -> Optional[str]: + """Clean UID by removing @ / % / + / ~ / * / : + + Args: + uid (str): The UID to clean + + Returns: + str: Clean UID without any sign + """ + if uid is None: + return None + + pattern = fr'[:|@|%|\+|~|\*]*' + parsed_UID = sub(pattern, '', uid) + + return parsed_UID diff --git a/mods/clone/mod_clone.py b/mods/clone/mod_clone.py index 40e2ff2..78a4591 100644 --- a/mods/clone/mod_clone.py +++ b/mods/clone/mod_clone.py @@ -28,7 +28,7 @@ class Clone: self.Base = irc_instance.Base # Add logs object to the module (Mandatory) - self.Logs = irc_instance.Base.logs + self.Logs = irc_instance.Loader.Logs # Add User object to the module (Mandatory) self.User = irc_instance.User diff --git a/mods/clone/utils.py b/mods/clone/utils.py index a636b68..ce509a1 100644 --- a/mods/clone/utils.py +++ b/mods/clone/utils.py @@ -156,7 +156,7 @@ def create_new_clone(uplink: 'Clone', faker_instance: 'Faker', group: str = 'Def uid = generate_uid_for_clone(faker, uplink.Config.SERVEUR_ID) checkUid = uplink.Clone.uid_exists(uid=uid) - clone = uplink.Definition.MClone( + clone = uplink.Schemas.MClone( connected=False, nickname=nickname, username=username, @@ -176,7 +176,7 @@ def create_new_clone(uplink: 'Clone', faker_instance: 'Faker', group: str = 'Def def handle_on_privmsg(uplink: 'Clone', srvmsg: list[str]): - uid_sender = uplink.User.clean_uid(srvmsg[1]) + uid_sender = uplink.Irc.Utils.clean_uid(srvmsg[1]) senderObj = uplink.User.get_User(uid_sender) if senderObj.hostname in uplink.Config.CLONE_LOG_HOST_EXEMPT: diff --git a/mods/command/mod_command.py b/mods/command/mod_command.py index 3f5d9d3..daadbbc 100644 --- a/mods/command/mod_command.py +++ b/mods/command/mod_command.py @@ -34,8 +34,11 @@ class Command: # Add Base object to the module (Mandatory) self.Base = ircInstance.Base + # Add main Utils to the module + self.MainUtils = ircInstance.Utils + # Add logs object to the module (Mandatory) - self.Logs = ircInstance.Base.logs + self.Logs = ircInstance.Loader.Logs # Add User object to the module (Mandatory) self.User = ircInstance.User @@ -296,7 +299,7 @@ class Command: except Exception as err: self.Logs.error(f"General Error: {err}") - def hcmds(self, uidornickname: str, channel_name: Optional[str], cmd: list, fullcmd: list = []) -> None: + def hcmds(self, uidornickname: str, channel_name: Optional[str], cmd: list, fullcmd: list = []): command = str(cmd[0]).lower() dnickname = self.Config.SERVICE_NICKNAME @@ -307,7 +310,8 @@ class Command: fromchannel = channel_name match command: - case "automode": + + case 'automode': # automode set nickname [+/-mode] #channel # automode set adator +o #channel try: @@ -346,7 +350,7 @@ class Command: if db_result is not None: if sign == '+': - db_data = {"updated_on": self.Base.get_datetime(), "nickname": nickname, "channel": chan, "mode": mode} + db_data = {"updated_on": self.MainUtils.get_datetime(), "nickname": nickname, "channel": chan, "mode": mode} db_result = self.Base.db_execute_query(query="UPDATE command_automode SET mode = :mode, updated_on = :updated_on WHERE nickname = :nickname and channel = :channel", params=db_data) if db_result.rowcount > 0: @@ -432,7 +436,7 @@ class Command: for uid in uids_split: for i in range(0, len(uid)): mode += set_mode - users += f'{self.User.get_nickname(self.Base.clean_uid(uid[i]))} ' + users += f'{self.User.get_nickname(self.MainUtils.clean_uid(uid[i]))} ' if i == len(uid) - 1: self.Protocol.send2socket(f":{service_id} MODE {fromchannel} +{mode} {users}") mode = '' @@ -454,7 +458,7 @@ class Command: for uid in uids_split: for i in range(0, len(uid)): mode += set_mode - users += f'{self.User.get_nickname(self.Base.clean_uid(uid[i]))} ' + users += f'{self.User.get_nickname(self.MainUtils.clean_uid(uid[i]))} ' if i == len(uid) - 1: self.Protocol.send2socket(f":{service_id} MODE {fromchannel} +{mode} {users}") mode = '' @@ -1146,7 +1150,7 @@ class Command: # .svsnick nickname newnickname nickname = str(cmd[1]) newnickname = str(cmd[2]) - unixtime = self.Base.get_unixtime() + unixtime = self.MainUtils.get_unixtime() if self.User.get_nickname(nickname) is None: self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" This nickname do not exist") @@ -1192,7 +1196,7 @@ class Command: nickname = str(cmd[1]) hostname = str(cmd[2]) - set_at_timestamp = self.Base.get_unixtime() + set_at_timestamp = self.MainUtils.get_unixtime() expire_time = (60 * 60 * 24) + set_at_timestamp gline_reason = ' '.join(cmd[3:]) @@ -1201,7 +1205,7 @@ class Command: self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" /msg {dnickname} {command.upper()} nickname host reason") return None - self.Protocol.gline(nickname=nickname, hostname=hostname, set_by=dnickname, expire_timestamp=expire_time, set_at_timestamp=set_at_timestamp, reason=gline_reason) + self.Protocol.send_gline(nickname=nickname, hostname=hostname, set_by=dnickname, expire_timestamp=expire_time, set_at_timestamp=set_at_timestamp, reason=gline_reason) except KeyError as ke: self.Logs.error(ke) @@ -1222,7 +1226,7 @@ class Command: hostname = str(cmd[2]) # self.Protocol.send2socket(f":{self.Config.SERVEUR_ID} TKL - G {nickname} {hostname} {dnickname}") - self.Protocol.ungline(nickname=nickname, hostname=hostname) + self.Protocol.send_ungline(nickname=nickname, hostname=hostname) except KeyError as ke: self.Logs.error(ke) @@ -1240,7 +1244,7 @@ class Command: nickname = str(cmd[1]) hostname = str(cmd[2]) - set_at_timestamp = self.Base.get_unixtime() + set_at_timestamp = self.MainUtils.get_unixtime() expire_time = (60 * 60 * 24) + set_at_timestamp gline_reason = ' '.join(cmd[3:]) @@ -1249,7 +1253,7 @@ class Command: self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" /msg {dnickname} {command.upper()} nickname host reason") return None - self.Protocol.kline(nickname=nickname, hostname=hostname, set_by=dnickname, expire_timestamp=expire_time, set_at_timestamp=set_at_timestamp, reason=gline_reason) + self.Protocol.send_kline(nickname=nickname, hostname=hostname, set_by=dnickname, expire_timestamp=expire_time, set_at_timestamp=set_at_timestamp, reason=gline_reason) except KeyError as ke: self.Logs.error(ke) @@ -1269,7 +1273,7 @@ class Command: nickname = str(cmd[1]) hostname = str(cmd[2]) - self.Protocol.unkline(nickname=nickname, hostname=hostname) + self.Protocol.send_unkline(nickname=nickname, hostname=hostname) except KeyError as ke: self.Logs.error(ke) @@ -1288,7 +1292,7 @@ class Command: nickname = str(cmd[1]) hostname = str(cmd[2]) - set_at_timestamp = self.Base.get_unixtime() + set_at_timestamp = self.MainUtils.get_unixtime() expire_time = (60 * 60 * 24) + set_at_timestamp shun_reason = ' '.join(cmd[3:]) diff --git a/mods/defender/mod_defender.py b/mods/defender/mod_defender.py index dae03a1..d12ea8a 100644 --- a/mods/defender/mod_defender.py +++ b/mods/defender/mod_defender.py @@ -30,7 +30,7 @@ class Defender: self.Base = irc_instance.Base # Add logs object to the module (Mandatory) - self.Logs = irc_instance.Base.logs + self.Logs = irc_instance.Loader.Logs # Add User object to the module (Mandatory) self.User = irc_instance.User @@ -118,7 +118,7 @@ class Defender: self.Base.create_thread(func=thds.thread_autolimit, func_args=(self, )) if self.ModConfig.reputation == 1: - self.Protocol.sjoin(self.Config.SALON_JAIL) + self.Protocol.send_sjoin(self.Config.SALON_JAIL) self.Protocol.send2socket(f":{self.Config.SERVICE_NICKNAME} SAMODE {self.Config.SALON_JAIL} +o {self.Config.SERVICE_NICKNAME}") return None @@ -176,7 +176,7 @@ class Defender: self.Schemas.DB_FREEIPAPI_USERS = freeipapi if cloudfilt: - self.Schemas.DB_CLOUD_FILT_USERS = cloudfilt + self.Schemas.DB_CLOUDFILT_USERS = cloudfilt if psutils: self.Schemas.DB_PSUTIL_USERS = psutils @@ -239,7 +239,7 @@ class Defender: for channel in channels: chan = channel[0] - self.Protocol.sjoin(chan) + self.Protocol.send_sjoin(chan) if chan == jail_chan: self.Protocol.send2socket(f":{service_id} SAMODE {jail_chan} +{dumodes} {dnickname}") self.Protocol.send2socket(f":{service_id} MODE {jail_chan} +{jail_chan_mode}") @@ -931,15 +931,15 @@ class Defender: case 'info': try: + if len(cmd) < 2: + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Syntax. /msg {dnickname} INFO [nickname]") + return None + nickoruid = cmd[1] UserObject = self.User.get_User(nickoruid) if UserObject is not None: - channels: list = [] - for chan in self.Channel.UID_CHANNEL_DB: - for uid_in_chan in chan.uids: - if self.Base.clean_uid(uid_in_chan) == UserObject.uid: - channels.append(chan.name) + channels: list = [chan.name for chan in self.Channel.UID_CHANNEL_DB for uid_in_chan in chan.uids if self.Loader.Utils.clean_uid(uid_in_chan) == UserObject.uid] self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' UID : {UserObject.uid}') self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' NICKNAME : {UserObject.nickname}') @@ -956,7 +956,7 @@ class Defender: self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' CHANNELS : {channels}') self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' CONNECTION TIME : {UserObject.connexion_datetime}') else: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f":{dnickname} NOTICE {fromuser} : This user {nickoruid} doesn't exist") + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"This user {nickoruid} doesn't exist") except KeyError as ke: self.Logs.warning(f"Key error info user : {ke}") diff --git a/mods/defender/utils.py b/mods/defender/utils.py index 08cc470..9855791 100644 --- a/mods/defender/utils.py +++ b/mods/defender/utils.py @@ -81,7 +81,7 @@ def handle_on_sjoin(uplink: 'Defender', srvmsg: list[str]): confmodel = uplink.ModConfig parsed_chan = srvmsg[4] if irc.Channel.is_valid_channel(srvmsg[4]) else None - parsed_UID = irc.User.clean_uid(srvmsg[5]) + parsed_UID = uplink.Loader.Utils.clean_uid(srvmsg[5]) if parsed_chan is None or parsed_UID is None: return @@ -145,7 +145,7 @@ def handle_on_nick(uplink: 'Defender', srvmsg: list[str]): srvmsg (list[str]): The Server MSG confmodel (ModConfModel): The Module Configuration """ - uid = uplink.User.clean_uid(str(srvmsg[1])) + uid = uplink.Loader.Utils.clean_uid(str(srvmsg[1])) p = uplink.Protocol confmodel = uplink.ModConfig @@ -180,7 +180,7 @@ def handle_on_quit(uplink: 'Defender', srvmsg: list[str]): confmodel = uplink.ModConfig ban_all_chan = uplink.Base.int_if_possible(confmodel.reputation_ban_all_chan) - final_UID = uplink.User.clean_uid(str(srvmsg[1])) + final_UID = uplink.Loader.Utils.clean_uid(str(srvmsg[1])) jail_salon = uplink.Config.SALON_JAIL service_id = uplink.Config.SERVICE_ID get_user_reputation = uplink.Reputation.get_Reputation(final_UID) @@ -238,7 +238,7 @@ def handle_on_uid(uplink: 'Defender', srvmsg: list[str]): irc.Reputation.insert( irc.Loader.Definition.MReputation( **_User.to_dict(), - secret_code=irc.Base.get_random(8) + secret_code=irc.Utils.generate_random_string(8) ) ) if irc.Reputation.is_exist(_User.uid): @@ -278,7 +278,7 @@ def action_on_flood(uplink: 'Defender', srvmsg: list[str]): get_detected_uid = User.uid get_detected_nickname = User.nickname - unixtime = irc.Base.get_unixtime() + unixtime = irc.Utils.get_unixtime() get_diff_secondes = 0 def get_flood_user(uid: str) -> Optional[FloodUser]: diff --git a/mods/jsonrpc/mod_jsonrpc.py b/mods/jsonrpc/mod_jsonrpc.py index 5ed288a..e7181cd 100644 --- a/mods/jsonrpc/mod_jsonrpc.py +++ b/mods/jsonrpc/mod_jsonrpc.py @@ -33,7 +33,7 @@ class Jsonrpc(): self.Base = ircInstance.Base # Add logs object to the module (Mandatory) - self.Logs = ircInstance.Base.logs + self.Logs = ircInstance.Loader.Logs # Add User object to the module (Mandatory) self.User = ircInstance.User diff --git a/mods/test/mod_test.py b/mods/test/mod_test.py index 275f845..8372e6f 100644 --- a/mods/test/mod_test.py +++ b/mods/test/mod_test.py @@ -34,7 +34,7 @@ class Test(): self.Base = ircInstance.Base # Add logs object to the module (Mandatory) - self.Logs = ircInstance.Base.logs + self.Logs = ircInstance.Loader.Logs # Add User object to the module (Mandatory) self.User = ircInstance.User diff --git a/mods/votekick/mod_votekick.py b/mods/votekick/mod_votekick.py index 5a9aa36..54acd2d 100644 --- a/mods/votekick/mod_votekick.py +++ b/mods/votekick/mod_votekick.py @@ -10,7 +10,9 @@ """ import re import mods.votekick.schemas as schemas +import mods.votekick.utils as utils from mods.votekick.votekick_manager import VotekickManager +import mods.votekick.threads as thds from typing import TYPE_CHECKING, Any, Optional if TYPE_CHECKING: @@ -40,7 +42,7 @@ class Votekick: self.Base = uplink.Base # Add logs object to the module - self.Logs = uplink.Base.logs + self.Logs = uplink.Logs # Add User object to the module self.User = uplink.User @@ -51,9 +53,15 @@ class Votekick: # Add Utils. self.Utils = uplink.Utils + # Add Utils module + self.ModUtils = utils + # Add Schemas module self.Schemas = schemas + # Add Threads module + self.Threads = thds + # Add VoteKick Manager self.VoteKickManager = VotekickManager(self) @@ -77,7 +85,7 @@ class Votekick: # Add admin object to retrieve admin users self.Admin = self.Irc.Admin self.__create_tables() - self.join_saved_channels() + self.ModUtils.join_saved_channels(self) return None @@ -126,139 +134,28 @@ class Votekick: except Exception as err: self.Logs.error(f'General Error: {err}') - def init_vote_system(self, channel: str) -> bool: - - response = False - for chan in self.VoteKickManager.VOTE_CHANNEL_DB: - if chan.channel_name == channel: - chan.target_user = '' - chan.voter_users = [] - chan.vote_against = 0 - chan.vote_for = 0 - response = True - - return response - - def insert_vote_channel(self, channel_obj: schemas.VoteChannelModel) -> bool: - result = False - found = False - for chan in self.VoteKickManager.VOTE_CHANNEL_DB: - if chan.channel_name == channel_obj.channel_name: - found = True - - if not found: - self.VoteKickManager.VOTE_CHANNEL_DB.append(channel_obj) - self.Logs.debug(f"The channel has been added {channel_obj}") - # self.db_add_vote_channel(ChannelObject.channel_name) - - return result - - def db_add_vote_channel(self, channel: str) -> bool: - """Cette fonction ajoute les salons ou seront autoriser les votes - - Args: - channel (str): le salon à enregistrer. - """ - current_datetime = self.Utils.get_sdatetime() - mes_donnees = {'channel': channel} - - response = self.Base.db_execute_query("SELECT id FROM votekick_channel WHERE channel = :channel", mes_donnees) - - is_channel_exist = response.fetchone() - - if is_channel_exist is None: - mes_donnees = {'datetime': current_datetime, 'channel': channel} - insert = self.Base.db_execute_query(f"INSERT INTO votekick_channel (datetime, channel) VALUES (:datetime, :channel)", mes_donnees) - if insert.rowcount > 0: - return True - else: - return False - else: - return False - - def db_delete_vote_channel(self, channel: str) -> bool: - """Cette fonction supprime les salons de join de Defender - - Args: - channel (str): le salon à enregistrer. - """ - mes_donnes = {'channel': channel} - response = self.Base.db_execute_query("DELETE FROM votekick_channel WHERE channel = :channel", mes_donnes) - - affected_row = response.rowcount - - if affected_row > 0: - return True - else: - return False - - def join_saved_channels(self) -> None: - - param = {'module_name': self.module_name} - result = self.Base.db_execute_query(f"SELECT id, channel_name FROM {self.Config.TABLE_CHANNEL} WHERE module_name = :module_name", param) - - channels = result.fetchall() - - for channel in channels: - id_, chan = channel - self.insert_vote_channel(self.Schemas.VoteChannelModel(channel_name=chan, target_user='', voter_users=[], vote_for=0, vote_against=0)) - self.Protocol.sjoin(channel=chan) - self.Protocol.send2socket(f":{self.Config.SERVICE_NICKNAME} SAMODE {chan} +o {self.Config.SERVICE_NICKNAME}") - - return None - - def is_vote_ongoing(self, channel: str) -> bool: - - response = False - for vote in self.VoteKickManager.VOTE_CHANNEL_DB: - if vote.channel_name == channel: - if vote.target_user: - response = True - - return response - - def timer_vote_verdict(self, channel: str) -> None: - - dnickname = self.Config.SERVICE_NICKNAME - - if not self.is_vote_ongoing(channel): - return None - - for chan in self.VoteKickManager.VOTE_CHANNEL_DB: - if chan.channel_name == channel: - target_user = self.User.get_nickname(chan.target_user) - if chan.vote_for > chan.vote_against: - self.Protocol.send_priv_msg( - nick_from=dnickname, - msg=f"User {self.Config.COLORS.bold}{target_user}{self.Config.COLORS.nogc} has {chan.vote_against} votes against and {chan.vote_for} votes for. For this reason, it'll be kicked from the channel", - channel=channel - ) - self.Protocol.send2socket(f":{dnickname} KICK {channel} {target_user} Following the vote, you are not welcome in {channel}") - self.Channel.delete_user_from_channel(channel, self.User.get_uid(target_user)) - elif chan.vote_for <= chan.vote_against: - self.Protocol.send_priv_msg( - nick_from=dnickname, - msg=f"User {self.Config.COLORS.bold}{target_user}{self.Config.COLORS.nogc} has {chan.vote_against} votes against and {chan.vote_for} votes for. For this reason, it\'ll remain in the channel", - channel=channel - ) - - # Init the system - if self.init_vote_system(channel): - self.Protocol.send_priv_msg( - nick_from=dnickname, - msg="System vote re initiated", - channel=channel - ) - - return None - def cmd(self, data: list) -> None: - cmd = list(data).copy() + if not data or len(data) < 2: + return None + + cmd = data.copy() if isinstance(data, list) else list(data).copy() + index, command = self.Irc.Protocol.get_ircd_protocol_poisition(cmd) + if index == -1: + return None try: - return None + match command: + + case 'PRIVMSG': + return None + + case 'QUIT': + return None + + case _: + return None except KeyError as ke: self.Logs.error(f"Key Error: {ke}") @@ -307,24 +204,17 @@ class Votekick: if sentchannel is None: self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f" The correct command is {self.Config.SERVICE_PREFIX}{command} {option} #CHANNEL") - self.insert_vote_channel( - self.Schemas.VoteChannelModel( - channel_name=sentchannel, - target_user='', - voter_users=[], - vote_for=0, - vote_against=0 - ) - ) + if self.VoteKickManager.activate_new_channel(sentchannel): + self.Channel.db_query_channel('add', self.module_name, sentchannel) + self.Protocol.send_join_chan(uidornickname=dnickname, channel=sentchannel) + self.Protocol.send2socket(f":{dnickname} SAMODE {sentchannel} +o {dnickname}") + self.Protocol.send_priv_msg(nick_from=dnickname, + msg="You can now use !submit to decide if he will stay or not on this channel ", + channel=sentchannel + ) - self.Channel.db_query_channel('add', self.module_name, sentchannel) + return None - self.Protocol.send_join_chan(uidornickname=dnickname, channel=sentchannel) - self.Protocol.send2socket(f":{dnickname} SAMODE {sentchannel} +o {dnickname}") - self.Protocol.send_priv_msg(nick_from=dnickname, - msg="You can now use !submit to decide if he will stay or not on this channel ", - channel=sentchannel - ) except Exception as err: self.Logs.error(f'{err}') self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} {command} {option} #channel') @@ -344,12 +234,10 @@ class Votekick: self.Protocol.send2socket(f":{dnickname} SAMODE {sentchannel} -o {dnickname}") self.Protocol.send_part_chan(uidornickname=dnickname, channel=sentchannel) - for chan in self.VoteKickManager.VOTE_CHANNEL_DB: - if chan.channel_name == sentchannel: - self.VoteKickManager.VOTE_CHANNEL_DB.remove(chan) - self.Channel.db_query_channel('del', self.module_name, chan.channel_name) + if self.VoteKickManager.drop_vote_channel_model(sentchannel): + self.Channel.db_query_channel('del', self.module_name, sentchannel) + return None - self.Logs.debug(f"The Channel {sentchannel} has been deactivated from the vote system") except Exception as err: self.Logs.error(f'{err}') self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f" /msg {dnickname} {command} {option} #channel") @@ -359,20 +247,11 @@ class Votekick: try: # vote + channel = fromchannel - for chan in self.VoteKickManager.VOTE_CHANNEL_DB: - if chan.channel_name == channel: - if fromuser in chan.voter_users: - self.Protocol.send_priv_msg(nick_from=dnickname, - msg="You already submitted a vote", - channel=channel - ) - else: - chan.vote_for += 1 - chan.voter_users.append(fromuser) - self.Protocol.send_priv_msg(nick_from=dnickname, - msg="Vote recorded, thank you", - channel=channel - ) + if self.VoteKickManager.action_vote(channel, fromuser, '+'): + self.Protocol.send_priv_msg(nick_from=dnickname, msg="Vote recorded, thank you",channel=channel) + else: + self.Protocol.send_priv_msg(nick_from=dnickname, msg="You already submitted a vote", channel=channel) + except Exception as err: self.Logs.error(f'{err}') self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} {command} {option}') @@ -382,20 +261,11 @@ class Votekick: try: # vote - channel = fromchannel - for chan in self.VoteKickManager.VOTE_CHANNEL_DB: - if chan.channel_name == channel: - if fromuser in chan.voter_users: - self.Protocol.send_priv_msg(nick_from=dnickname, - msg="You already submitted a vote", - channel=channel - ) - else: - chan.vote_against += 1 - chan.voter_users.append(fromuser) - self.Protocol.send_priv_msg(nick_from=dnickname, - msg="Vote recorded, thank you", - channel=channel - ) + if self.VoteKickManager.action_vote(channel, fromuser, '-'): + self.Protocol.send_priv_msg(nick_from=dnickname, msg="Vote recorded, thank you",channel=channel) + else: + self.Protocol.send_priv_msg(nick_from=dnickname, msg="You already submitted a vote", channel=channel) + except Exception as err: self.Logs.error(f'{err}') self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} {command} {option}') @@ -414,11 +284,11 @@ class Votekick: for vote in self.VoteKickManager.VOTE_CHANNEL_DB: if vote.channel_name == channel: - self.init_vote_system(channel) - self.Protocol.send_priv_msg(nick_from=dnickname, - msg="Vote system re-initiated", - channel=channel - ) + if self.VoteKickManager.init_vote_system(channel): + self.Protocol.send_priv_msg(nick_from=dnickname, + msg="Vote system re-initiated", + channel=channel + ) except Exception as err: self.Logs.error(f'{err}') @@ -452,16 +322,15 @@ class Votekick: ongoing_user = None # check if there is an ongoing vote - if self.is_vote_ongoing(channel): - for vote in self.VoteKickManager.VOTE_CHANNEL_DB: - if vote.channel_name == channel: - ongoing_user = self.User.get_nickname(vote.target_user) - - self.Protocol.send_priv_msg(nick_from=dnickname, - msg=f"There is an ongoing vote on {ongoing_user}", - channel=channel - ) - return None + if self.VoteKickManager.is_vote_ongoing(channel): + votec = self.VoteKickManager.get_vote_channel_model(channel) + if votec: + ongoing_user = self.User.get_nickname(votec.target_user) + self.Protocol.send_priv_msg(nick_from=dnickname, + msg=f"There is an ongoing vote on {ongoing_user}", + channel=channel + ) + return None # check if the user exist if user_submitted is None: @@ -471,7 +340,7 @@ class Votekick: ) return None - uid_cleaned = self.Base.clean_uid(uid_submitted) + uid_cleaned = self.Loader.Utils.clean_uid(uid_submitted) channel_obj = self.Channel.get_channel(channel) if channel_obj is None: self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' This channel [{channel}] do not exist in the Channel Object') @@ -479,7 +348,7 @@ class Votekick: clean_uids_in_channel: list = [] for uid in channel_obj.uids: - clean_uids_in_channel.append(self.Base.clean_uid(uid)) + clean_uids_in_channel.append(self.Loader.Utils.clean_uid(uid)) if not uid_cleaned in clean_uids_in_channel: self.Protocol.send_priv_msg(nick_from=dnickname, @@ -507,7 +376,7 @@ class Votekick: channel=channel ) - self.Base.create_timer(60, self.timer_vote_verdict, (channel, )) + self.Base.create_timer(60, self.Threads.timer_vote_verdict, (self, channel)) self.Protocol.send_priv_msg(nick_from=dnickname, msg="This vote will end after 60 secondes", channel=channel @@ -524,30 +393,31 @@ class Votekick: if self.Admin.get_admin(fromuser) is None: self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f'Your are not allowed to execute this command') return None - - for chan in self.VoteKickManager.VOTE_CHANNEL_DB: - if chan.channel_name == channel: - target_user = self.User.get_nickname(chan.target_user) - if chan.vote_for > chan.vote_against: - self.Protocol.send_priv_msg(nick_from=dnickname, - msg=f"User {self.Config.COLORS.bold}{target_user}{self.Config.COLORS.nogc} has {chan.vote_against} votes against and {chan.vote_for} votes for. For this reason, it\'ll be kicked from the channel", + + votec = self.VoteKickManager.get_vote_channel_model(channel) + if votec: + target_user = self.User.get_nickname(votec.target_user) + if votec.vote_for >= votec.vote_against: + self.Protocol.send_priv_msg(nick_from=dnickname, + msg=f"User {self.Config.COLORS.bold}{target_user}{self.Config.COLORS.nogc} has {votec.vote_against} votes against and {votec.vote_for} votes for. For this reason, it\'ll be kicked from the channel", channel=channel ) - self.Protocol.send2socket(f":{dnickname} KICK {channel} {target_user} Following the vote, you are not welcome in {channel}") - elif chan.vote_for <= chan.vote_against: - self.Protocol.send_priv_msg( + self.Protocol.send2socket(f":{dnickname} KICK {channel} {target_user} Following the vote, you are not welcome in {channel}") + else: + self.Protocol.send_priv_msg( nick_from=dnickname, - msg=f"User {self.Config.COLORS.bold}{target_user}{self.Config.COLORS.nogc} has {chan.vote_against} votes against and {chan.vote_for} votes for. For this reason, it\'ll remain in the channel", + msg=f"User {self.Config.COLORS.bold}{target_user}{self.Config.COLORS.nogc} has {votec.vote_against} votes against and {votec.vote_for} votes for. For this reason, it\'ll remain in the channel", channel=channel ) - - # Init the system - if self.init_vote_system(channel): - self.Protocol.send_priv_msg( + + if self.VoteKickManager.init_vote_system(channel): + self.Protocol.send_priv_msg( nick_from=dnickname, msg="System vote re initiated", channel=channel ) + return None + except Exception as err: self.Logs.error(f'{err}') self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} {command} {option}') diff --git a/mods/votekick/threads.py b/mods/votekick/threads.py new file mode 100644 index 0000000..66d4edc --- /dev/null +++ b/mods/votekick/threads.py @@ -0,0 +1,40 @@ +from typing import TYPE_CHECKING + + +if TYPE_CHECKING: + from mods.votekick.mod_votekick import Votekick + +def timer_vote_verdict(uplink: 'Votekick', channel: str) -> None: + + dnickname = uplink.Config.SERVICE_NICKNAME + + if not uplink.VoteKickManager.is_vote_ongoing(channel): + return None + + votec = uplink.VoteKickManager.get_vote_channel_model(channel) + if votec: + target_user = uplink.User.get_nickname(votec.target_user) + + if votec.vote_for >= votec.vote_against and votec.vote_for != 0: + uplink.Protocol.send_priv_msg(nick_from=dnickname, + msg=f"User {uplink.Config.COLORS.bold}{target_user}{uplink.Config.COLORS.nogc} has {votec.vote_against} votes against and {votec.vote_for} votes for. For this reason, it\'ll be kicked from the channel", + channel=channel + ) + uplink.Protocol.send2socket(f":{dnickname} KICK {channel} {target_user} Following the vote, you are not welcome in {channel}") + else: + uplink.Protocol.send_priv_msg( + nick_from=dnickname, + msg=f"User {uplink.Config.COLORS.bold}{target_user}{uplink.Config.COLORS.nogc} has {votec.vote_against} votes against and {votec.vote_for} votes for. For this reason, it\'ll remain in the channel", + channel=channel + ) + + if uplink.VoteKickManager.init_vote_system(channel): + uplink.Protocol.send_priv_msg( + nick_from=dnickname, + msg="System vote re initiated", + channel=channel + ) + + return None + + return None \ No newline at end of file diff --git a/mods/votekick/utils.py b/mods/votekick/utils.py new file mode 100644 index 0000000..7f28bc9 --- /dev/null +++ b/mods/votekick/utils.py @@ -0,0 +1,74 @@ +from typing import TYPE_CHECKING + + +if TYPE_CHECKING: + from mods.votekick.mod_votekick import Votekick + +def add_vote_channel_to_database(uplink: 'Votekick', channel: str) -> bool: + """Adds a new channel to the votekick database if it doesn't already exist. + + This function checks if the specified channel is already registered in the + `votekick_channel` table. If not, it inserts a new entry with the current timestamp. + + Args: + uplink (Votekick): The main votekick system instance that provides access to utilities and database operations. + channel (str): The name of the channel to be added to the database. + + Returns: + bool: True if the channel was successfully inserted into the database. + False if the channel already exists or the insertion failed. + """ + current_datetime = uplink.Utils.get_sdatetime() + mes_donnees = {'channel': channel} + + response = uplink.Base.db_execute_query("SELECT id FROM votekick_channel WHERE channel = :channel", mes_donnees) + + is_channel_exist = response.fetchone() + + if is_channel_exist is None: + mes_donnees = {'datetime': current_datetime, 'channel': channel} + insert = uplink.Base.db_execute_query(f"INSERT INTO votekick_channel (datetime, channel) VALUES (:datetime, :channel)", mes_donnees) + if insert.rowcount > 0: + return True + else: + return False + else: + return False + +def delete_vote_channel_from_database(uplink: 'Votekick', channel: str) -> bool: + """Deletes a channel entry from the votekick database. + + This function removes the specified channel from the `votekick_channel` table + if it exists. It returns True if the deletion was successful. + + Args: + uplink (Votekick): The main votekick system instance used to execute the database operation. + channel (str): The name of the channel to be removed from the database. + + Returns: + bool: True if the channel was successfully deleted, False if no rows were affected. + """ + mes_donnes = {'channel': channel} + response = uplink.Base.db_execute_query("DELETE FROM votekick_channel WHERE channel = :channel", mes_donnes) + + affected_row = response.rowcount + + if affected_row > 0: + return True + else: + return False + +def join_saved_channels(uplink: 'Votekick') -> None: + + param = {'module_name': uplink.module_name} + result = uplink.Base.db_execute_query(f"SELECT id, channel_name FROM {uplink.Config.TABLE_CHANNEL} WHERE module_name = :module_name", param) + + channels = result.fetchall() + + for channel in channels: + id_, chan = channel + uplink.VoteKickManager.activate_new_channel(chan) + uplink.Protocol.send_sjoin(channel=chan) + uplink.Protocol.send2socket(f":{uplink.Config.SERVICE_NICKNAME} SAMODE {chan} +o {uplink.Config.SERVICE_NICKNAME}") + + return None \ No newline at end of file diff --git a/mods/votekick/votekick_manager.py b/mods/votekick/votekick_manager.py index 8694ffe..1eb7098 100644 --- a/mods/votekick/votekick_manager.py +++ b/mods/votekick/votekick_manager.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Literal, Optional from mods.votekick.schemas import VoteChannelModel if TYPE_CHECKING: @@ -38,7 +38,32 @@ class VotekickManager: return True return False - + + def init_vote_system(self, channel_name: str) -> bool: + """Initializes or resets the votekick system for a given channel. + + This method clears the current target, voter list, and vote counts + in preparation for a new votekick session. + + Args: + channel_name (str): The name of the channel for which the votekick system should be initialized. + + Returns: + bool: True if the votekick system was successfully initialized, False if the channel is not found. + """ + votec = self.get_vote_channel_model(channel_name) + + if votec is None: + self.Logs.debug(f"[VOTEKICK MANAGER] The channel ({channel_name}) is not active!") + return False + + votec.target_user = '' + votec.voter_users = [] + votec.vote_for = 0 + votec.vote_against = 0 + self.Logs.debug(f"[VOTEKICK MANAGER] The channel ({channel_name}) has been successfully initialized!") + return True + def get_vote_channel_model(self, channel_name: str) -> Optional[VoteChannelModel]: """Get Vote Channel Object model @@ -96,3 +121,43 @@ class VotekickManager: self.Logs.debug(f'[VOTEKICK MANAGER] {channel_name} is activated but there is no ongoing vote!') return False + + def action_vote(self, channel_name: str, nickname: str, action: Literal['+', '-']) -> bool: + """ + Registers a vote (for or against) in an active votekick session on a channel. + + Args: + channel_name (str): The name of the channel where the votekick session is active. + nickname (str): The nickname of the user casting the vote. + action (Literal['+', '-']): The vote action. Use '+' to vote for kicking, '-' to vote against. + + Returns: + bool: True if the vote was successfully registered, False otherwise. + This can fail if: + - The action is invalid (not '+' or '-') + - The user has already voted + - The channel has no active votekick session + """ + if action not in ['+', '-']: + self.Logs.debug(f"[VOTEKICK MANAGER] The action must be + or - while you have provided ({action})") + return False + votec = self.get_vote_channel_model(channel_name) + + if votec: + client_obj = self.uplink.User.get_User(votec.target_user) + client_to_punish = votec.target_user if client_obj is None else client_obj.nickname + if nickname in votec.voter_users: + self.Logs.debug(f"[VOTEKICK MANAGER] This nickname ({nickname}) has already voted for ({client_to_punish})") + return False + else: + if action == '+': + votec.vote_for += 1 + elif action == '-': + votec.vote_against += 1 + + votec.voter_users.append(nickname) + self.Logs.debug(f"[VOTEKICK MANAGER] The ({nickname}) has voted to ban ({client_to_punish})") + return True + else: + self.Logs.debug(f"[VOTEKICK MANAGER] This channel {channel_name} is not active!") + return False