diff --git a/core/classes/protocols/inspircd.py b/core/classes/protocols/inspircd.py index a4957b2..e628a14 100644 --- a/core/classes/protocols/inspircd.py +++ b/core/classes/protocols/inspircd.py @@ -4,32 +4,18 @@ from re import match, findall, search from datetime import datetime from typing import TYPE_CHECKING, Any, Optional from ssl import SSLEOFError, SSLError -from core.classes.protocols.command_handler import CommandHandler from core.classes.protocols.interface import IProtocol from core.utils import tr if TYPE_CHECKING: - from core.irc import Irc from core.definition import MSasl, MClient class Inspircd(IProtocol): - def __init__(self, uplink: 'Irc'): - """ - - Args: - uplink (Irc): The Irc object - """ + def init_protocol(self): self.name = 'InspIRCd-4' self.protocol_version = 1206 - self.__Irc = uplink - self.__Config = uplink.Config - self.__Base = uplink.Base - self.__Utils = uplink.Loader.Utils - self.__Settings = uplink.Settings - self.__Logs = uplink.Loader.Logs - self.known_protocol: set[str] = {'UID', 'ERROR', 'PRIVMSG', 'SINFO', 'FJOIN', 'PING', 'PONG', 'SASL', 'PART', 'CAPAB', 'ENDBURST', @@ -37,10 +23,6 @@ class Inspircd(IProtocol): 'MODE', 'QUIT', 'SQUIT', 'VERSION'} - self.Handler = CommandHandler(uplink.Loader) - - self.__Logs.info(f"[PROTOCOL] Protocol [{__name__}] loaded!") - def get_ircd_protocol_poisition(self, cmd: list[str], log: bool = False) -> tuple[int, Optional[str]]: """Get the position of known commands @@ -56,12 +38,12 @@ class Inspircd(IProtocol): return index, token.upper() if log: - self.__Logs.debug(f"[IRCD LOGS] You need to handle this response: {cmd}") + self._Logs.debug(f"[IRCD LOGS] You need to handle this response: {cmd}") return -1, None def register_command(self): - m = self.__Irc.Loader.Definition.MIrcdCommand + m = self._Irc.Loader.Definition.MIrcdCommand self.Handler.register(m('PING', self.on_server_ping)) self.Handler.register(m('NICK', self.on_nick)) self.Handler.register(m('SASL', self.on_sasl)) @@ -84,29 +66,29 @@ class Inspircd(IProtocol): print_log (bool): if True print the log. """ try: - with self.__Base.lock: - self.__Irc.IrcSocket.send(f"{message}\r\n".encode(self.__Config.SERVEUR_CHARSET[0])) + with self._Base.lock: + self._Irc.IrcSocket.send(f"{message}\r\n".encode(self._Config.SERVEUR_CHARSET[0])) if print_log: - self.__Logs.debug(f'<< {message}') + self._Logs.debug(f'<< {message}') except UnicodeDecodeError as ude: - self.__Logs.error(f'Decode Error try iso-8859-1 - {ude} - {message}') - self.__Irc.IrcSocket.send(f"{message}\r\n".encode(self.__Config.SERVEUR_CHARSET[1],'replace')) + self._Logs.error(f'Decode Error try iso-8859-1 - {ude} - {message}') + self._Irc.IrcSocket.send(f"{message}\r\n".encode(self._Config.SERVEUR_CHARSET[1],'replace')) except UnicodeEncodeError as uee: - self.__Logs.error(f'Encode Error try iso-8859-1 - {uee} - {message}') - self.__Irc.IrcSocket.send(f"{message}\r\n".encode(self.__Config.SERVEUR_CHARSET[1],'replace')) + self._Logs.error(f'Encode Error try iso-8859-1 - {uee} - {message}') + self._Irc.IrcSocket.send(f"{message}\r\n".encode(self._Config.SERVEUR_CHARSET[1],'replace')) except AssertionError as ae: - self.__Logs.warning(f'Assertion Error {ae} - message: {message}') + self._Logs.warning(f'Assertion Error {ae} - message: {message}') except SSLEOFError as soe: - self.__Logs.error(f"SSLEOFError: {soe} - {message}") + self._Logs.error(f"SSLEOFError: {soe} - {message}") except SSLError as se: - self.__Logs.error(f"SSLError: {se} - {message}") + self._Logs.error(f"SSLError: {se} - {message}") except OSError as oe: - self.__Logs.error(f"OSError: {oe} - {message}") + self._Logs.error(f"OSError: {oe} - {message}") if oe.errno == 10053: sys.exit(oe.__str__()) except AttributeError as ae: - self.__Logs.critical(f"Attribute Error: {ae}") + self._Logs.critical(f"Attribute Error: {ae}") def send_priv_msg(self, nick_from: str, msg: str, channel: str = None, nick_to: str = None): """Sending PRIVMSG to a channel or to a nickname by batches @@ -118,12 +100,12 @@ class Inspircd(IProtocol): nick_to (str, optional): The reciever nickname. Defaults to None. """ try: - batch_size = self.__Config.BATCH_SIZE - user_from = self.__Irc.User.get_user(nick_from) - user_to = self.__Irc.User.get_user(nick_to) if nick_to is not None else None + batch_size = self._Config.BATCH_SIZE + user_from = self._Irc.User.get_user(nick_from) + user_to = self._Irc.User.get_user(nick_to) if nick_to is not None else None if user_from is None: - self.__Logs.error(f"The sender nickname [{nick_from}] do not exist") + self._Logs.error(f"The sender nickname [{nick_from}] do not exist") return None if not channel is None: @@ -136,7 +118,7 @@ class Inspircd(IProtocol): batch = str(msg)[i:i+batch_size] self.send2socket(f":{nick_from} PRIVMSG {user_to.uid} :{batch}") except Exception as err: - self.__Logs.error(f"General Error: {err}") + self._Logs.error(f"General Error: {err}") def send_notice(self, nick_from: str, nick_to: str, msg: str) -> None: """Sending NOTICE by batches @@ -147,12 +129,12 @@ class Inspircd(IProtocol): nick_to (str): The reciever nickname """ try: - batch_size = self.__Config.BATCH_SIZE - user_from = self.__Irc.User.get_user(nick_from) - user_to = self.__Irc.User.get_user(nick_to) + batch_size = self._Config.BATCH_SIZE + user_from = self._Irc.User.get_user(nick_from) + user_to = self._Irc.User.get_user(nick_to) if user_from is None or user_to is None: - self.__Logs.error(f"The sender [{nick_from}] or the Reciever [{nick_to}] do not exist") + self._Logs.error(f"The sender [{nick_from}] or the Reciever [{nick_to}] do not exist") return None for i in range(0, len(str(msg)), batch_size): @@ -160,28 +142,28 @@ class Inspircd(IProtocol): self.send2socket(f":{user_from.uid} NOTICE {user_to.uid} :{batch}") except Exception as err: - self.__Logs.error(f"General Error: {err}") + self._Logs.error(f"General Error: {err}") def send_link(self): """Créer le link et envoyer les informations nécessaires pour la connexion au serveur. """ - service_id = self.__Config.SERVICE_ID - service_nickname = self.__Config.SERVICE_NICKNAME - service_username = self.__Config.SERVICE_USERNAME - service_realname = self.__Config.SERVICE_REALNAME - service_info = self.__Config.SERVICE_INFO - service_smodes = self.__Config.SERVICE_SMODES - service_hostname = self.__Config.SERVICE_HOST - service_name = self.__Config.SERVICE_NAME + service_id = self._Config.SERVICE_ID + service_nickname = self._Config.SERVICE_NICKNAME + service_username = self._Config.SERVICE_USERNAME + service_realname = self._Config.SERVICE_REALNAME + service_info = self._Config.SERVICE_INFO + service_smodes = self._Config.SERVICE_SMODES + service_hostname = self._Config.SERVICE_HOST + service_name = self._Config.SERVICE_NAME - server_password = self.__Config.SERVEUR_PASSWORD - server_link = self.__Config.SERVEUR_LINK - server_id = self.__Config.SERVEUR_ID - server_hostname = self.__Settings.MAIN_SERVER_HOSTNAME = self.__Config.SERVEUR_HOSTNAME + server_password = self._Config.SERVEUR_PASSWORD + server_link = self._Config.SERVEUR_LINK + server_id = self._Config.SERVEUR_ID + server_hostname = self._Settings.MAIN_SERVER_HOSTNAME = self._Config.SERVEUR_HOSTNAME - version = self.__Config.CURRENT_VERSION - unixtime = self.__Utils.get_unixtime() + version = self._Config.CURRENT_VERSION + unixtime = self._Utils.get_unixtime() self.send2socket(f"CAPAB START {self.protocol_version}") self.send2socket(f"CAPAB MODULES :services") @@ -197,18 +179,18 @@ class Inspircd(IProtocol): self.send2socket(f":{server_id} ENDBURST") # self.send_sjoin(chan) - self.__Logs.debug(f'>> {__name__} Link information sent to the server') + self._Logs.debug(f'>> {__name__} Link information sent to the server') def gline(self, nickname: str, hostname: str, set_by: str, expire_timestamp: int, set_at_timestamp: int, reason: str) -> None: # TKL + G user host set_by expire_timestamp set_at_timestamp :reason - self.send2socket(f":{self.__Config.SERVEUR_ID} TKL + G {nickname} {hostname} {set_by} {expire_timestamp} {set_at_timestamp} :{reason}") + self.send2socket(f":{self._Config.SERVEUR_ID} TKL + G {nickname} {hostname} {set_by} {expire_timestamp} {set_at_timestamp} :{reason}") return None def send_set_nick(self, newnickname: str) -> None: - self.send2socket(f":{self.__Config.SERVICE_NICKNAME} NICK {newnickname}") + self.send2socket(f":{self._Config.SERVICE_NICKNAME} NICK {newnickname}") return None def send_set_mode(self, modes: str, *, nickname: Optional[str] = None, channel_name: Optional[str] = None, params: Optional[str] = None) -> None: @@ -220,21 +202,21 @@ class Inspircd(IProtocol): channel_name (Optional[str]): The channel name params (Optional[str]): Params to pass to the mode """ - service_id = self.__Config.SERVICE_ID + service_id = self._Config.SERVICE_ID params = '' if params is None else params if modes[0] not in ['+', '-']: - self.__Logs.error(f"[MODE ERROR] The mode you have provided is missing the sign: {modes}") + self._Logs.error(f"[MODE ERROR] The mode you have provided is missing the sign: {modes}") return None if nickname and channel_name: # :98KAAAAAB MODE #services +o defenderdev - if not self.__Irc.Channel.is_valid_channel(channel_name): - self.__Logs.error(f"[MODE ERROR] The channel is not valid: {channel_name}") + if not self._Irc.Channel.is_valid_channel(channel_name): + self._Logs.error(f"[MODE ERROR] The channel is not valid: {channel_name}") return None - if not all(mode in self.__Settings.PROTOCTL_PREFIX_MODES_SIGNES for mode in list(modes.replace('+','').replace('-',''))): - self.__Logs.debug(f'[USERMODE UNVAILABLE] This mode {modes} is not available!') + if not all(mode in self._Settings.PROTOCTL_PREFIX_MODES_SIGNES for mode in list(modes.replace('+','').replace('-',''))): + self._Logs.debug(f'[USERMODE UNVAILABLE] This mode {modes} is not available!') return None self.send2socket(f":{service_id} MODE {channel_name} {modes} {nickname}") @@ -242,8 +224,8 @@ class Inspircd(IProtocol): if nickname and channel_name is None: # :98KAAAAAB MODE nickname +o - if not all(mode in self.__Settings.PROTOCTL_USER_MODES for mode in list(modes.replace('+','').replace('-',''))): - self.__Logs.debug(f'[USERMODE UNVAILABLE] This mode {modes} is not available!') + if not all(mode in self._Settings.PROTOCTL_USER_MODES for mode in list(modes.replace('+','').replace('-',''))): + self._Logs.debug(f'[USERMODE UNVAILABLE] This mode {modes} is not available!') return None self.send2socket(f":{service_id} MODE {nickname} {modes}") @@ -251,12 +233,12 @@ class Inspircd(IProtocol): if nickname is None and channel_name: # :98KAAAAAB MODE #channel +o - if not all(mode in self.__Settings.PROTOCTL_CHANNEL_MODES for mode in list(modes.replace('+','').replace('-',''))): - self.__Logs.debug(f'[USERMODE UNVAILABLE] This mode {modes} is not available!') + if not all(mode in self._Settings.PROTOCTL_CHANNEL_MODES for mode in list(modes.replace('+','').replace('-',''))): + self._Logs.debug(f'[USERMODE UNVAILABLE] This mode {modes} is not available!') return None - if not self.__Irc.Channel.is_valid_channel(channel_name): - self.__Logs.error(f"[MODE ERROR] The channel is not valid: {channel_name}") + if not self._Irc.Channel.is_valid_channel(channel_name): + self._Logs.error(f"[MODE ERROR] The channel is not valid: {channel_name}") return None self.send2socket(f":{service_id} MODE {channel_name} {modes} {params}") @@ -274,14 +256,14 @@ class Inspircd(IProtocol): def send_ungline(self, nickname:str, hostname: str) -> None: - self.send2socket(f":{self.__Config.SERVEUR_ID} TKL - G {nickname} {hostname} {self.__Config.SERVICE_NICKNAME}") + self.send2socket(f":{self._Config.SERVEUR_ID} TKL - G {nickname} {hostname} {self._Config.SERVICE_NICKNAME}") return None def send_kline(self, nickname: str, hostname: str, set_by: str, expire_timestamp: int, set_at_timestamp: int, reason: str) -> None: # TKL + k user host set_by expire_timestamp set_at_timestamp :reason - self.send2socket(f":{self.__Config.SERVEUR_ID} TKL + k {nickname} {hostname} {set_by} {expire_timestamp} {set_at_timestamp} :{reason}") + self.send2socket(f":{self._Config.SERVEUR_ID} TKL + k {nickname} {hostname} {set_by} {expire_timestamp} {set_at_timestamp} :{reason}") return None @@ -291,20 +273,20 @@ class Inspircd(IProtocol): Args: channel (str): The channel name. """ - server_id = self.__Config.SERVEUR_ID - service_nickname = self.__Config.SERVICE_NICKNAME - service_modes = self.__Config.SERVICE_UMODES - service_id = self.__Config.SERVICE_ID + server_id = self._Config.SERVEUR_ID + service_nickname = self._Config.SERVICE_NICKNAME + service_modes = self._Config.SERVICE_UMODES + service_id = self._Config.SERVICE_ID - if not self.__Irc.Channel.is_valid_channel(channel): - self.__Logs.error(f"The channel [{channel}] is not valid") + if not self._Irc.Channel.is_valid_channel(channel): + self._Logs.error(f"The channel [{channel}] is not valid") return None - self.send2socket(f":{server_id} FJOIN {channel} {self.__Utils.get_unixtime()} :o, {service_id}") + self.send2socket(f":{server_id} FJOIN {channel} {self._Utils.get_unixtime()} :o, {service_id}") self.send_set_mode(service_modes, nickname=service_nickname, channel_name=channel) # Add defender to the channel uids list - self.__Irc.Channel.insert(self.__Irc.Loader.Definition.MChannel(name=channel, uids=[service_id])) + self._Irc.Channel.insert(self._Irc.Loader.Definition.MChannel(name=channel, uids=[service_id])) return None def send_quit(self, uid: str, reason: str, print_log: bool = True) -> None: @@ -315,18 +297,18 @@ class Inspircd(IProtocol): reason (str): The reason for the quit print_log (bool): If True then print logs """ - user_obj = self.__Irc.User.get_user(uidornickname=uid) - reputation_obj = self.__Irc.Reputation.get_reputation(uidornickname=uid) + user_obj = self._Irc.User.get_user(uidornickname=uid) + reputation_obj = self._Irc.Reputation.get_reputation(uidornickname=uid) if not user_obj is None: self.send2socket(f":{user_obj.uid} QUIT :{reason}", print_log=print_log) - self.__Irc.User.delete(user_obj.uid) + self._Irc.User.delete(user_obj.uid) if not reputation_obj is None: - self.__Irc.Reputation.delete(reputation_obj.uid) + self._Irc.Reputation.delete(reputation_obj.uid) - if not self.__Irc.Channel.delete_user_from_all_channel(uid): - self.__Logs.error(f"The UID [{uid}] has not been deleted from all channels") + if not self._Irc.Channel.delete_user_from_all_channel(uid): + self._Logs.error(f"The UID [{uid}] has not been deleted from all channels") return None @@ -345,20 +327,20 @@ class Inspircd(IProtocol): print_log (bool, optional): print logs if true. Defaults to True. """ # {self.Config.SERVEUR_ID} UID - # {clone.nickname} 1 {self.__Utils.get_unixtime()} {clone.username} {clone.hostname} {clone.uid} * {clone.umodes} {clone.vhost} * {self.Base.encode_ip(clone.remote_ip)} :{clone.realname} + # {clone.nickname} 1 {self._Utils.get_unixtime()} {clone.username} {clone.hostname} {clone.uid} * {clone.umodes} {clone.vhost} * {self.Base.encode_ip(clone.remote_ip)} :{clone.realname} try: - unixtime = self.__Utils.get_unixtime() - # encoded_ip = self.__Base.encode_ip(remote_ip) + unixtime = self._Utils.get_unixtime() + # encoded_ip = self._Base.encode_ip(remote_ip) new_umodes = [] for mode in list(umodes.replace('+', '').replace('-', '')): - if mode in self.__Settings.PROTOCTL_USER_MODES: + if mode in self._Settings.PROTOCTL_USER_MODES: new_umodes.append(mode) final_umodes = '+' + ''.join(new_umodes) # Create the user - self.__Irc.User.insert( - self.__Irc.Loader.Definition.MUser( + self._Irc.User.insert( + self._Irc.Loader.Definition.MUser( uid=uid, nickname=nickname, username=username, realname=realname,hostname=hostname, umodes=final_umodes, vhost=vhost, remote_ip=remote_ip @@ -368,13 +350,13 @@ class Inspircd(IProtocol): # [:] UID []+ : # :98K UID 98KAAAAAB 1756932359 defenderdev defenderdev.deb.biz.st defenderdev.deb.biz.st Dev-PyDefender 127.0.0.1 1756932359 + :Dev Python Security # [':97K', 'UID', '97KAAAAAA', '1756926679', 'adator', '172.18.128.1', 'attila.example.org', '...', '...', '172.18.128.1', '1756926678', '+o', ':...'] - uid_msg = f":{self.__Config.SERVEUR_ID} UID {uid} {unixtime} {nickname} {hostname} {vhost} {username} {username} {remote_ip} {unixtime} {final_umodes} :{realname}" + uid_msg = f":{self._Config.SERVEUR_ID} UID {uid} {unixtime} {nickname} {hostname} {vhost} {username} {username} {remote_ip} {unixtime} {final_umodes} :{realname}" self.send2socket(uid_msg, print_log=print_log) return None except Exception as err: - self.__Logs.error(f"{__name__} - General Error: {err}") + self._Logs.error(f"{__name__} - General Error: {err}") def send_join_chan(self, uidornickname: str, channel: str, password: str = None, print_log: bool = True) -> None: """Joining a channel @@ -386,20 +368,20 @@ class Inspircd(IProtocol): print_log (bool, optional): Write logs. Defaults to True. """ - user_obj = self.__Irc.User.get_user(uidornickname) + user_obj = self._Irc.User.get_user(uidornickname) password_channel = password if not password is None else '' if user_obj is None: return None - if not self.__Irc.Channel.is_valid_channel(channel): - self.__Logs.error(f"The channel [{channel}] is not valid") + if not self._Irc.Channel.is_valid_channel(channel): + self._Logs.error(f"The channel [{channel}] is not valid") return None - self.send2socket(f":{user_obj.uid} FJOIN {channel} {self.__Utils.get_unixtime()} :,{user_obj.uid} {password_channel}", print_log=print_log) + self.send2socket(f":{user_obj.uid} FJOIN {channel} {self._Utils.get_unixtime()} :,{user_obj.uid} {password_channel}", print_log=print_log) # Add defender to the channel uids list - self.__Irc.Channel.insert(self.__Irc.Loader.Definition.MChannel(name=channel, uids=[user_obj.uid])) + self._Irc.Channel.insert(self._Irc.Loader.Definition.MChannel(name=channel, uids=[user_obj.uid])) return None def send_part_chan(self, uidornickname:str, channel: str, print_log: bool = True) -> None: @@ -411,31 +393,31 @@ class Inspircd(IProtocol): print_log (bool, optional): Write logs. Defaults to True. """ - user_obj = self.__Irc.User.get_user(uidornickname) + user_obj = self._Irc.User.get_user(uidornickname) if user_obj is None: - self.__Logs.error(f"The user [{uidornickname}] is not valid") + self._Logs.error(f"The user [{uidornickname}] is not valid") return None - if not self.__Irc.Channel.is_valid_channel(channel): - self.__Logs.error(f"The channel [{channel}] is not valid") + if not self._Irc.Channel.is_valid_channel(channel): + self._Logs.error(f"The channel [{channel}] is not valid") return None self.send2socket(f":{user_obj.uid} PART {channel}", print_log=print_log) # Add defender to the channel uids list - self.__Irc.Channel.delete_user_from_channel(channel, user_obj.uid) + self._Irc.Channel.delete_user_from_channel(channel, user_obj.uid) return None def send_unkline(self, nickname:str, hostname: str) -> None: - self.send2socket(f":{self.__Config.SERVEUR_ID} TKL - K {nickname} {hostname} {self.__Config.SERVICE_NICKNAME}") + self.send2socket(f":{self._Config.SERVEUR_ID} TKL - K {nickname} {hostname} {self._Config.SERVICE_NICKNAME}") return None def send_raw(self, raw_command: str) -> None: - self.send2socket(f":{self.__Config.SERVEUR_ID} {raw_command}") + self.send2socket(f":{self._Config.SERVEUR_ID} {raw_command}") return None # ------------------------------------------------------------------------ @@ -451,23 +433,23 @@ class Inspircd(IProtocol): try: # [':adator_', 'UMODE2', '-iwx'] - user_obj = self.__Irc.User.get_user(str(server_msg[0]).lstrip(':')) + user_obj = self._Irc.User.get_user(str(server_msg[0]).lstrip(':')) user_mode = server_msg[2] if user_obj is None: # If user is not created return None # TODO : User object should be able to update user modes - if self.__Irc.User.update_mode(user_obj.uid, user_mode): + if self._Irc.User.update_mode(user_obj.uid, user_mode): return None - # self.__Logs.debug(f"Updating user mode for [{userObj.nickname}] [{old_umodes}] => [{userObj.umodes}]") + # self._Logs.debug(f"Updating user mode for [{userObj.nickname}] [{old_umodes}] => [{userObj.umodes}]") return None except IndexError as ie: - self.__Logs.error(f"{__name__} - Index Error: {ie}") + self._Logs.error(f"{__name__} - Index Error: {ie}") except Exception as err: - self.__Logs.error(f"{__name__} - General Error: {err}") + self._Logs.error(f"{__name__} - General Error: {err}") def on_quit(self, server_msg: list[str]) -> None: """Handle quit coming from a server @@ -479,16 +461,16 @@ class Inspircd(IProtocol): uid_who_quit = str(server_msg[0]).lstrip(':') - self.__Irc.Channel.delete_user_from_all_channel(uid_who_quit) - self.__Irc.User.delete(uid_who_quit) - self.__Irc.Reputation.delete(uid_who_quit) + self._Irc.Channel.delete_user_from_all_channel(uid_who_quit) + self._Irc.User.delete(uid_who_quit) + self._Irc.Reputation.delete(uid_who_quit) return None except IndexError as ie: - self.__Logs.error(f"{__name__} - Index Error: {ie}") + self._Logs.error(f"{__name__} - Index Error: {ie}") except Exception as err: - self.__Logs.error(f"{__name__} - General Error: {err}") + self._Logs.error(f"{__name__} - General Error: {err}") def on_squit(self, server_msg: list[str]) -> None: """Handle squit coming from a server @@ -500,15 +482,15 @@ class Inspircd(IProtocol): server_hostname = server_msg[2] uid_to_delete = None - for s_user in self.__Irc.User.UID_DB: + for s_user in self._Irc.User.UID_DB: if s_user.hostname == server_hostname and 'S' in s_user.umodes: uid_to_delete = s_user.uid if uid_to_delete is None: return None - self.__Irc.User.delete(uid_to_delete) - self.__Irc.Channel.delete_user_from_all_channel(uid_to_delete) + self._Irc.User.delete(uid_to_delete) + self._Irc.Channel.delete_user_from_all_channel(uid_to_delete) return None @@ -541,9 +523,9 @@ class Inspircd(IProtocol): channel_modes.append(cmode) - self.__Settings.PROTOCTL_PREFIX_SIGNES_MODES = sign_mode - self.__Settings.PROTOCTL_PREFIX_MODES_SIGNES = mode_sign - self.__Settings.PROTOCTL_CHANNEL_MODES = list(set(channel_modes)) + self._Settings.PROTOCTL_PREFIX_SIGNES_MODES = sign_mode + self._Settings.PROTOCTL_PREFIX_MODES_SIGNES = mode_sign + self._Settings.PROTOCTL_CHANNEL_MODES = list(set(channel_modes)) # ['CAPAB', 'USERMODES', ':param-set:snomask=s', 'simple:bot=B', 'simple:invisible=i', 'simple:oper=o', 'simple:servprotect=k', # 'simple:sslqueries=z', 'simple:u_registered=r', 'simple:wallops=w'] @@ -555,7 +537,7 @@ class Inspircd(IProtocol): umode = prefix.split('=')[1] if len(prefix.split('=')) > 1 else None user_modes.append(umode) - self.__Settings.PROTOCTL_USER_MODES = list(set(user_modes)) + self._Settings.PROTOCTL_USER_MODES = list(set(user_modes)) return None @@ -576,16 +558,16 @@ class Inspircd(IProtocol): uid = str(scopy[0]).replace(':','') newnickname = scopy[2] - self.__Irc.User.update_nickname(uid, newnickname) - self.__Irc.Client.update_nickname(uid, newnickname) - self.__Irc.Admin.update_nickname(uid, newnickname) + self._Irc.User.update_nickname(uid, newnickname) + self._Irc.Client.update_nickname(uid, newnickname) + self._Irc.Admin.update_nickname(uid, newnickname) return None except IndexError as ie: - self.__Logs.error(f"{__name__} - Index Error: {ie}") + self._Logs.error(f"{__name__} - Index Error: {ie}") except Exception as err: - self.__Logs.error(f"{__name__} - General Error: {err}") + self._Logs.error(f"{__name__} - General Error: {err}") def on_sjoin(self, server_msg: list[str]) -> None: """Handle sjoin coming from a server @@ -607,8 +589,8 @@ class Inspircd(IProtocol): list_users = list(set(list_users)) if list_users: - self.__Irc.Channel.insert( - self.__Irc.Loader.Definition.MChannel( + self._Irc.Channel.insert( + self._Irc.Loader.Definition.MChannel( name=channel, uids=list_users ) @@ -616,9 +598,9 @@ class Inspircd(IProtocol): return None except IndexError as ie: - self.__Logs.error(f"{__name__} - Index Error: {ie}") + self._Logs.error(f"{__name__} - Index Error: {ie}") except Exception as err: - self.__Logs.error(f"{__name__} - General Error: {err}") + self._Logs.error(f"{__name__} - General Error: {err}") def on_endburst(self, server_msg: list[str]) -> None: """Handle EOS coming from a server @@ -630,69 +612,69 @@ class Inspircd(IProtocol): # [':97K', 'ENDBURST'] scopy = server_msg.copy() hsid = str(scopy[0]).replace(':','') - if hsid == self.__Config.HSID: - if self.__Config.DEFENDER_INIT == 1: - current_version = self.__Config.CURRENT_VERSION - latest_version = self.__Config.LATEST_VERSION - if self.__Base.check_for_new_version(False): + if hsid == self._Config.HSID: + if self._Config.DEFENDER_INIT == 1: + current_version = self._Config.CURRENT_VERSION + latest_version = self._Config.LATEST_VERSION + if self._Base.check_for_new_version(False): version = f'{current_version} >>> {latest_version}' else: version = f'{current_version}' print(f"################### DEFENDER ###################") print(f"# SERVICE CONNECTE ") - print(f"# SERVEUR : {self.__Config.SERVEUR_IP} ") - print(f"# PORT : {self.__Config.SERVEUR_PORT} ") - print(f"# SSL : {self.__Config.SERVEUR_SSL} ") - print(f"# SSL VER : {self.__Config.SSL_VERSION} ") - print(f"# NICKNAME : {self.__Config.SERVICE_NICKNAME} ") - print(f"# CHANNEL : {self.__Config.SERVICE_CHANLOG} ") + print(f"# SERVEUR : {self._Config.SERVEUR_IP} ") + print(f"# PORT : {self._Config.SERVEUR_PORT} ") + print(f"# SSL : {self._Config.SERVEUR_SSL} ") + print(f"# SSL VER : {self._Config.SSL_VERSION} ") + print(f"# NICKNAME : {self._Config.SERVICE_NICKNAME} ") + print(f"# CHANNEL : {self._Config.SERVICE_CHANLOG} ") print(f"# VERSION : {version} ") print(f"################################################") - self.__Logs.info(f"################### DEFENDER ###################") - self.__Logs.info(f"# SERVICE CONNECTE ") - self.__Logs.info(f"# SERVEUR : {self.__Config.SERVEUR_IP} ") - self.__Logs.info(f"# PORT : {self.__Config.SERVEUR_PORT} ") - self.__Logs.info(f"# SSL : {self.__Config.SERVEUR_SSL} ") - self.__Logs.info(f"# SSL VER : {self.__Config.SSL_VERSION} ") - self.__Logs.info(f"# NICKNAME : {self.__Config.SERVICE_NICKNAME} ") - self.__Logs.info(f"# CHANNEL : {self.__Config.SERVICE_CHANLOG} ") - self.__Logs.info(f"# VERSION : {version} ") - self.__Logs.info(f"################################################") + self._Logs.info(f"################### DEFENDER ###################") + self._Logs.info(f"# SERVICE CONNECTE ") + self._Logs.info(f"# SERVEUR : {self._Config.SERVEUR_IP} ") + self._Logs.info(f"# PORT : {self._Config.SERVEUR_PORT} ") + self._Logs.info(f"# SSL : {self._Config.SERVEUR_SSL} ") + self._Logs.info(f"# SSL VER : {self._Config.SSL_VERSION} ") + self._Logs.info(f"# NICKNAME : {self._Config.SERVICE_NICKNAME} ") + self._Logs.info(f"# CHANNEL : {self._Config.SERVICE_CHANLOG} ") + self._Logs.info(f"# VERSION : {version} ") + self._Logs.info(f"################################################") - self.send_sjoin(self.__Config.SERVICE_CHANLOG) + self.send_sjoin(self._Config.SERVICE_CHANLOG) - if self.__Base.check_for_new_version(False): + if self._Base.check_for_new_version(False): self.send_priv_msg( - nick_from=self.__Config.SERVICE_NICKNAME, + nick_from=self._Config.SERVICE_NICKNAME, msg=f" New Version available {version}", - channel=self.__Config.SERVICE_CHANLOG + channel=self._Config.SERVICE_CHANLOG ) # Initialisation terminé aprés le premier PING self.send_priv_msg( - nick_from=self.__Config.SERVICE_NICKNAME, - msg=tr("[ %sINFORMATION%s ] >> %s is ready!", self.__Config.COLORS.green, self.__Config.COLORS.nogc, self.__Config.SERVICE_NICKNAME), - channel=self.__Config.SERVICE_CHANLOG + nick_from=self._Config.SERVICE_NICKNAME, + msg=tr("[ %sINFORMATION%s ] >> %s is ready!", self._Config.COLORS.green, self._Config.COLORS.nogc, self._Config.SERVICE_NICKNAME), + channel=self._Config.SERVICE_CHANLOG ) - self.__Config.DEFENDER_INIT = 0 + self._Config.DEFENDER_INIT = 0 # Send EOF to other modules - for module in self.__Irc.ModuleUtils.model_get_loaded_modules().copy(): + for module in self._Irc.ModuleUtils.model_get_loaded_modules().copy(): module.class_instance.cmd(scopy) # Join saved channels & load existing modules - self.__Irc.join_saved_channels() - self.__Irc.ModuleUtils.db_load_all_existing_modules(self.__Irc) + self._Irc.join_saved_channels() + self._Irc.ModuleUtils.db_load_all_existing_modules(self._Irc) return None except IndexError as ie: - self.__Logs.error(f"{__name__} - Key Error: {ie}") + self._Logs.error(f"{__name__} - Key Error: {ie}") except KeyError as ke: - self.__Logs.error(f"{__name__} - Key Error: {ke}") + self._Logs.error(f"{__name__} - Key Error: {ke}") except Exception as err: - self.__Logs.error(f"{__name__} - General Error: {err}") + self._Logs.error(f"{__name__} - General Error: {err}") def on_part(self, server_msg: list[str]) -> None: """Handle part coming from a server @@ -706,14 +688,14 @@ class Inspircd(IProtocol): uid = str(server_msg[0]).lstrip(':') channel = str(server_msg[2]).lower() # reason = str(' '.join(server_msg[3:])) - self.__Irc.Channel.delete_user_from_channel(channel, uid) + self._Irc.Channel.delete_user_from_channel(channel, uid) return None except IndexError as ie: - self.__Logs.error(f"{__name__} - Index Error: {ie}") + self._Logs.error(f"{__name__} - Index Error: {ie}") except Exception as err: - self.__Logs.error(f"{__name__} - General Error: {err}") + self._Logs.error(f"{__name__} - General Error: {err}") def on_uid(self, server_msg: list[str]) -> None: """Handle uid message coming from the server @@ -723,9 +705,9 @@ class Inspircd(IProtocol): server_msg (list[str]): Original server message """ try: - red = self.__Config.COLORS.red - green = self.__Config.COLORS.green - nogc = self.__Config.COLORS.nogc + red = self._Config.COLORS.red + green = self._Config.COLORS.green + nogc = self._Config.COLORS.nogc is_webirc = True if 'webirc' in server_msg[0] else False is_websocket = True if 'websocket' in server_msg[0] else False @@ -737,7 +719,7 @@ class Inspircd(IProtocol): vhost = str(server_msg[6]) if not 'S' in umodes: - # remote_ip = self.__Base.decode_ip(str(serverMsg[9])) + # remote_ip = self._Base.decode_ip(str(serverMsg[9])) remote_ip = str(server_msg[9]) else: remote_ip = '127.0.0.1' @@ -756,8 +738,8 @@ class Inspircd(IProtocol): score_connexion = 0 - self.__Irc.User.insert( - self.__Irc.Loader.Definition.MUser( + self._Irc.User.insert( + self._Irc.Loader.Definition.MUser( uid=uid, nickname=nickname, username=username, @@ -774,18 +756,18 @@ class Inspircd(IProtocol): ) ) - for module in self.__Irc.ModuleUtils.model_get_loaded_modules().copy(): + for module in self._Irc.ModuleUtils.model_get_loaded_modules().copy(): module.class_instance.cmd(server_msg) # SASL authentication - dnickname = self.__Config.SERVICE_NICKNAME - dchanlog = self.__Config.SERVICE_CHANLOG + dnickname = self._Config.SERVICE_NICKNAME + dchanlog = self._Config.SERVICE_CHANLOG # uid = serverMsg[8] # nickname = serverMsg[3] - sasl_obj = self.__Irc.Sasl.get_sasl_obj(uid) + sasl_obj = self._Irc.Sasl.get_sasl_obj(uid) if sasl_obj: if sasl_obj.auth_success: - self.__Irc.insert_db_admin(sasl_obj.client_uid, sasl_obj.username, sasl_obj.level, sasl_obj.language) + self._Irc.insert_db_admin(sasl_obj.client_uid, sasl_obj.username, sasl_obj.level, sasl_obj.language) self.send_priv_msg(nick_from=dnickname, msg=tr("[ %sSASL AUTH%s ] - %s (%s) is now connected successfuly to %s", green, nogc, nickname, sasl_obj.username, dnickname), channel=dchanlog) @@ -797,14 +779,14 @@ class Inspircd(IProtocol): self.send_notice(nick_from=dnickname, nick_to=nickname, msg=tr("Wrong password!")) # Delete sasl object! - self.__Irc.Sasl.delete_sasl_client(uid) + self._Irc.Sasl.delete_sasl_client(uid) return None return None except IndexError as ie: - self.__Logs.error(f"{__name__} - Index Error: {ie}") + self._Logs.error(f"{__name__} - Index Error: {ie}") except Exception as err: - self.__Logs.error(f"{__name__} - General Error: {err}", exc_info=True) + self._Logs.error(f"{__name__} - General Error: {err}", exc_info=True) def on_privmsg(self, server_msg: list[str]) -> None: """Handle PRIVMSG message coming from the server @@ -820,32 +802,32 @@ class Inspircd(IProtocol): cmd.pop(0) get_uid_or_nickname = str(cmd[0].replace(':','')) - user_trigger = self.__Irc.User.get_nickname(get_uid_or_nickname) - # dnickname = self.__Config.SERVICE_NICKNAME - pattern = fr'(:\{self.__Config.SERVICE_PREFIX})(.*)$' + user_trigger = self._Irc.User.get_nickname(get_uid_or_nickname) + # dnickname = self._Config.SERVICE_NICKNAME + pattern = fr'(:\{self._Config.SERVICE_PREFIX})(.*)$' hcmds = search(pattern, ' '.join(cmd)) # va matcher avec tout les caractéres aprés le . if hcmds: # Commande qui commencent par le point liste_des_commandes = list(hcmds.groups()) convert_to_string = ' '.join(liste_des_commandes) arg = convert_to_string.split() - arg.remove(f":{self.__Config.SERVICE_PREFIX}") - if not self.__Irc.Commands.is_command_exist(arg[0]): - self.__Logs.debug(f"This command {arg[0]} is not available") + arg.remove(f":{self._Config.SERVICE_PREFIX}") + if not self._Irc.Commands.is_command_exist(arg[0]): + self._Logs.debug(f"This command {arg[0]} is not available") self.send_notice( - nick_from=self.__Config.SERVICE_NICKNAME, + nick_from=self._Config.SERVICE_NICKNAME, nick_to=user_trigger, - msg=f"This command [{self.__Config.COLORS.bold}{arg[0]}{self.__Config.COLORS.bold}] is not available" + msg=f"This command [{self._Config.COLORS.bold}{arg[0]}{self._Config.COLORS.bold}] is not available" ) return None cmd_to_send = convert_to_string.replace(':','') - self.__Base.log_cmd(user_trigger, cmd_to_send) + self._Base.log_cmd(user_trigger, cmd_to_send) - fromchannel = str(cmd[2]).lower() if self.__Irc.Channel.is_valid_channel(cmd[2]) else None - self.__Irc.hcmds(user_trigger, fromchannel, arg, cmd) + fromchannel = str(cmd[2]).lower() if self._Irc.Channel.is_valid_channel(cmd[2]) else None + self._Irc.hcmds(user_trigger, fromchannel, arg, cmd) - if cmd[2] == self.__Config.SERVICE_ID: + if cmd[2] == self._Config.SERVICE_ID: pattern = fr'^:.*?:(.*)$' hcmds = search(pattern, ' '.join(cmd)) @@ -869,30 +851,30 @@ class Inspircd(IProtocol): self.on_ping(srv_msg) return None - if not self.__Irc.Commands.is_command_exist(arg[0]): - self.__Logs.debug(f"This command {arg[0]} sent by {user_trigger} is not available") + if not self._Irc.Commands.is_command_exist(arg[0]): + self._Logs.debug(f"This command {arg[0]} sent by {user_trigger} is not available") return None - # if not arg[0].lower() in self.__Irc.module_commands_list: - # self.__Logs.debug(f"This command {arg[0]} sent by {user_trigger} is not available") + # if not arg[0].lower() in self._Irc.module_commands_list: + # self._Logs.debug(f"This command {arg[0]} sent by {user_trigger} is not available") # return False cmd_to_send = convert_to_string.replace(':','') - self.__Base.log_cmd(user_trigger, cmd_to_send) + self._Base.log_cmd(user_trigger, cmd_to_send) fromchannel = None if len(arg) >= 2: - fromchannel = str(arg[1]).lower() if self.__Irc.Channel.is_valid_channel(arg[1]) else None + fromchannel = str(arg[1]).lower() if self._Irc.Channel.is_valid_channel(arg[1]) else None - self.__Irc.hcmds(user_trigger, fromchannel, arg, cmd) + self._Irc.hcmds(user_trigger, fromchannel, arg, cmd) return None except KeyError as ke: - self.__Logs.error(f"Key Error: {ke}") + self._Logs.error(f"Key Error: {ke}") except AttributeError as ae: - self.__Logs.error(f"Attribute Error: {ae}") + self._Logs.error(f"Attribute Error: {ae}") except Exception as err: - self.__Logs.error(f"General Error: {err}", exc_info=True) + self._Logs.error(f"General Error: {err}", exc_info=True) def on_server_ping(self, server_msg: list[str]) -> None: """Send a PONG message to the server @@ -906,11 +888,11 @@ class Inspircd(IProtocol): # -> :808 PONG 3IN hsid = str(server_msg[0]).replace(':','') - self.send2socket(f":{self.__Config.SERVEUR_ID} PONG {hsid}", print_log=False) + self.send2socket(f":{self._Config.SERVEUR_ID} PONG {hsid}", print_log=False) return None except Exception as err: - self.__Logs.error(f"{__name__} - General Error: {err}") + self._Logs.error(f"{__name__} - General Error: {err}") def on_server(self, server_msg: list[str]) -> None: """_summary_ @@ -922,14 +904,14 @@ class Inspircd(IProtocol): """ try: param = str(server_msg[2]) - self.__Config.HSID = self.__Settings.MAIN_SERVER_ID = str(server_msg[0]).replace(':', '') + self._Config.HSID = self._Settings.MAIN_SERVER_ID = str(server_msg[0]).replace(':', '') if param == 'rawversion': - self.__Logs.debug(f">> Server Version: {server_msg[3].replace(':', '')}") + self._Logs.debug(f">> Server Version: {server_msg[3].replace(':', '')}") elif param == 'rawbranch': - self.__Logs.debug(f">> Branch Version: {server_msg[3].replace(':', '')}") + self._Logs.debug(f">> Branch Version: {server_msg[3].replace(':', '')}") except Exception as err: - self.__Logs.error(f'General Error: {err}') + self._Logs.error(f'General Error: {err}') def on_version(self, server_msg: list[str]) -> None: """Sending Server Version to the server @@ -941,19 +923,19 @@ class Inspircd(IProtocol): # Réponse a un CTCP VERSION try: - nickname = self.__Irc.User.get_nickname(self.__Utils.clean_uid(server_msg[1])) - dnickname = self.__Config.SERVICE_NICKNAME + nickname = self._Irc.User.get_nickname(self._Utils.clean_uid(server_msg[1])) + dnickname = self._Config.SERVICE_NICKNAME arg = server_msg[4].replace(':', '') if nickname is None: return None if arg == '\x01VERSION\x01': - self.send2socket(f':{dnickname} NOTICE {nickname} :\x01VERSION Service {self.__Config.SERVICE_NICKNAME} V{self.__Config.CURRENT_VERSION}\x01') + self.send2socket(f':{dnickname} NOTICE {nickname} :\x01VERSION Service {self._Config.SERVICE_NICKNAME} V{self._Config.CURRENT_VERSION}\x01') return None except Exception as err: - self.__Logs.error(f"{__name__} - General Error: {err}") + self._Logs.error(f"{__name__} - General Error: {err}") def on_time(self, server_msg: list[str]) -> None: """Sending TIME answer to a requestor @@ -965,10 +947,10 @@ class Inspircd(IProtocol): # Réponse a un CTCP VERSION try: - nickname = self.__Irc.User.get_nickname(self.__Utils.clean_uid(server_msg[1])) - dnickname = self.__Config.SERVICE_NICKNAME + nickname = self._Irc.User.get_nickname(self._Utils.clean_uid(server_msg[1])) + dnickname = self._Config.SERVICE_NICKNAME arg = server_msg[4].replace(':', '') - current_datetime = self.__Utils.get_sdatetime() + current_datetime = self._Utils.get_sdatetime() if nickname is None: return None @@ -978,7 +960,7 @@ class Inspircd(IProtocol): return None except Exception as err: - self.__Logs.error(f"{__name__} - General Error: {err}") + self._Logs.error(f"{__name__} - General Error: {err}") def on_ping(self, server_msg: list[str]) -> None: """Sending a PING answer to requestor @@ -990,8 +972,8 @@ class Inspircd(IProtocol): # Réponse a un CTCP VERSION try: - nickname = self.__Irc.User.get_nickname(self.__Utils.clean_uid(server_msg[1])) - dnickname = self.__Config.SERVICE_NICKNAME + nickname = self._Irc.User.get_nickname(self._Utils.clean_uid(server_msg[1])) + dnickname = self._Config.SERVICE_NICKNAME arg = server_msg[4].replace(':', '') if nickname is None: @@ -999,10 +981,10 @@ class Inspircd(IProtocol): if arg == '\x01PING': recieved_unixtime = int(server_msg[5].replace('\x01','')) - current_unixtime = self.__Utils.get_unixtime() + current_unixtime = self._Utils.get_unixtime() ping_response = current_unixtime - recieved_unixtime - # self.__Irc.send2socket(f':{dnickname} NOTICE {nickname} :\x01PING {ping_response} secs\x01') + # self._Irc.send2socket(f':{dnickname} NOTICE {nickname} :\x01PING {ping_response} secs\x01') self.send_notice( nick_from=dnickname, nick_to=nickname, @@ -1011,7 +993,7 @@ class Inspircd(IProtocol): return None except Exception as err: - self.__Logs.error(f"{__name__} - General Error: {err}") + self._Logs.error(f"{__name__} - General Error: {err}") def on_version_msg(self, server_msg: list[str]) -> None: """Handle version coming from the server @@ -1021,22 +1003,22 @@ class Inspircd(IProtocol): """ try: # ['@label=0073', ':0014E7P06', 'VERSION', 'PyDefender'] - user_obj = self.__Irc.User.get_user(self.__Utils.clean_uid(server_msg[1])) + user_obj = self._Irc.User.get_user(self._Utils.clean_uid(server_msg[1])) if user_obj is None: return None - response_351 = f"{self.__Config.SERVICE_NAME.capitalize()}-{self.__Config.CURRENT_VERSION} {self.__Config.SERVICE_HOST} {self.name}" - self.send2socket(f':{self.__Config.SERVICE_HOST} 351 {user_obj.nickname} {response_351}') + response_351 = f"{self._Config.SERVICE_NAME.capitalize()}-{self._Config.CURRENT_VERSION} {self._Config.SERVICE_HOST} {self.name}" + self.send2socket(f':{self._Config.SERVICE_HOST} 351 {user_obj.nickname} {response_351}') - modules = self.__Irc.ModuleUtils.get_all_available_modules() + modules = self._Irc.ModuleUtils.get_all_available_modules() response_005 = ' | '.join(modules) - self.send2socket(f':{self.__Config.SERVICE_HOST} 005 {user_obj.nickname} {response_005} are supported by this server') + self.send2socket(f':{self._Config.SERVICE_HOST} 005 {user_obj.nickname} {response_005} are supported by this server') return None except Exception as err: - self.__Logs.error(f"{__name__} - General Error: {err}") + self._Logs.error(f"{__name__} - General Error: {err}") def on_sasl(self, server_msg: list[str]) -> Optional['MSasl']: """Handle SASL coming from a server @@ -1053,9 +1035,9 @@ class Inspircd(IProtocol): # [':irc.local.org', 'SASL', 'defender-dev.deb.biz.st', '0014ZZH1F', 'S', 'EXTERNAL', 'zzzzzzzkey'] # [':irc.local.org', 'SASL', 'defender-dev.deb.biz.st', '00157Z26U', 'C', 'sasakey=='] # [':irc.local.org', 'SASL', 'defender-dev.deb.biz.st', '00157Z26U', 'D', 'A'] - psasl = self.__Irc.Sasl + psasl = self._Irc.Sasl sasl_enabled = True # Should be False - for smod in self.__Settings.SMOD_MODULES: + for smod in self._Settings.SMOD_MODULES: if smod.name == 'sasl': sasl_enabled = True break @@ -1067,7 +1049,7 @@ class Inspircd(IProtocol): client_uid = scopy[4] if len(scopy) >= 6 else None # sasl_obj = None sasl_message_type = scopy[6] if len(scopy) >= 6 else None - psasl.insert_sasl_client(self.__Irc.Loader.Definition.MSasl(client_uid=client_uid)) + psasl.insert_sasl_client(self._Irc.Loader.Definition.MSasl(client_uid=client_uid)) sasl_obj = psasl.get_sasl_obj(client_uid) if sasl_obj is None: @@ -1085,13 +1067,13 @@ class Inspircd(IProtocol): sasl_obj.mechanisme = str(scopy[7]) if sasl_obj.mechanisme == "PLAIN": - self.send2socket(f":{self.__Config.SERVEUR_ID} SASL {self.__Config.SERVEUR_HOSTNAME} {sasl_obj.client_uid} C +") + self.send2socket(f":{self._Config.SERVEUR_ID} SASL {self._Config.SERVEUR_HOSTNAME} {sasl_obj.client_uid} C +") elif sasl_obj.mechanisme == "EXTERNAL": if str(scopy[7]) == "+": return None sasl_obj.fingerprint = str(scopy[8]) - self.send2socket(f":{self.__Config.SERVEUR_ID} SASL {self.__Config.SERVEUR_HOSTNAME} {sasl_obj.client_uid} C +") + self.send2socket(f":{self._Config.SERVEUR_ID} SASL {self._Config.SERVEUR_HOSTNAME} {sasl_obj.client_uid} C +") self.on_sasl_authentication_process(sasl_obj) return sasl_obj @@ -1114,23 +1096,23 @@ class Inspircd(IProtocol): return sasl_obj except Exception as err: - self.__Logs.error(f'General Error: {err}', exc_info=True) + self._Logs.error(f'General Error: {err}', exc_info=True) def on_sasl_authentication_process(self, sasl_model: 'MSasl'): s = sasl_model - server_id = self.__Config.SERVEUR_ID - main_server_hostname = self.__Settings.MAIN_SERVER_HOSTNAME - db_admin_table = self.__Config.TABLE_ADMIN + server_id = self._Config.SERVEUR_ID + main_server_hostname = self._Settings.MAIN_SERVER_HOSTNAME + db_admin_table = self._Config.TABLE_ADMIN if sasl_model: def db_get_admin_info(*, username: Optional[str] = None, password: Optional[str] = None, fingerprint: Optional[str] = None) -> Optional[dict[str, Any]]: if fingerprint: mes_donnees = {'fingerprint': fingerprint} query = f"SELECT user, level, language FROM {db_admin_table} WHERE fingerprint = :fingerprint" else: - mes_donnees = {'user': username, 'password': self.__Utils.hash_password(password)} + mes_donnees = {'user': username, 'password': self._Utils.hash_password(password)} query = f"SELECT user, level, language FROM {db_admin_table} WHERE user = :user AND password = :password" - result = self.__Base.db_execute_query(query, mes_donnees) + result = self._Base.db_execute_query(query, mes_donnees) user_from_db = result.fetchone() if user_from_db: return {'user': user_from_db[0], 'level': user_from_db[1], 'language': user_from_db[2]} @@ -1167,7 +1149,7 @@ class Inspircd(IProtocol): self.send2socket(f":{server_id} SASL {s.username} :SASL authentication failed") def on_error(self, server_msg: list[str]) -> None: - self.__Logs.debug(f"{server_msg}") + self._Logs.debug(f"{server_msg}") def on_metedata(self, server_msg: list[str]) -> None: """_summary_ @@ -1178,25 +1160,28 @@ class Inspircd(IProtocol): # [':97K', 'METADATA', '97KAAAAAA', 'ssl_cert', ':vTrSe', 'fingerprint90753683519522875', # '/C=FR/OU=Testing/O=Test', 'Sasl/CN=localhost', '/C=FR/OU=Testing/O=Test', 'Sasl/CN=localhost'] scopy = server_msg.copy() - dnickname = self.__Config.SERVICE_NICKNAME - dchanlog = self.__Config.SERVICE_CHANLOG - green = self.__Config.COLORS.green - nogc = self.__Config.COLORS.nogc + dnickname = self._Config.SERVICE_NICKNAME + dchanlog = self._Config.SERVICE_CHANLOG + green = self._Config.COLORS.green + nogc = self._Config.COLORS.nogc if 'ssl_cert' in scopy: fingerprint = scopy[5] uid = scopy[2] - user_obj = self.__Irc.User.get_user(uid) + user_obj = self._Irc.User.get_user(uid) if user_obj: user_obj.fingerprint = fingerprint - if self.__Irc.Admin.db_auth_admin_via_fingerprint(fingerprint, uid): - admin = self.__Irc.Admin.get_admin(uid) + if self._Irc.Admin.db_auth_admin_via_fingerprint(fingerprint, uid): + admin = self._Irc.Admin.get_admin(uid) account = admin.account if admin else '' self.send_priv_msg(nick_from=dnickname, msg=tr("[ %sSASL AUTO AUTH%s ] - %s (%s) is now connected successfuly to %s", green, nogc, user_obj.nickname, account, dnickname), channel=dchanlog) self.send_notice(nick_from=dnickname, nick_to=user_obj.nickname, msg=tr("Successfuly connected to %s", dnickname)) + def on_kick(self, server_msg: list[str]) -> None: + ... + # ------------------------------------------------------------------------ # COMMON IRC PARSER # ------------------------------------------------------------------------ @@ -1289,8 +1274,8 @@ class Inspircd(IProtocol): response = { "uid_sender": scopy[0].replace(':', ''), - "uid_reciever": self.__Irc.User.get_uid(scopy[2]), - "channel": scopy[2] if self.__Irc.Channel.is_valid_channel(scopy[2]) else None, + "uid_reciever": self._Irc.User.get_uid(scopy[2]), + "channel": scopy[2] if self._Irc.Channel.is_valid_channel(scopy[2]) else None, "message": " ".join(scopy[3:]) } return response