From 999072a88a3b79074638399ac12559834770ad6d Mon Sep 17 00:00:00 2001 From: adator <85586985+adator85@users.noreply.github.com> Date: Tue, 11 Nov 2025 03:47:02 +0100 Subject: [PATCH] Refactoring unreal6 code! --- core/classes/protocols/unreal6.py | 336 +++++++++++++++--------------- 1 file changed, 165 insertions(+), 171 deletions(-) diff --git a/core/classes/protocols/unreal6.py b/core/classes/protocols/unreal6.py index 28adbe0..4886816 100644 --- a/core/classes/protocols/unreal6.py +++ b/core/classes/protocols/unreal6.py @@ -7,7 +7,6 @@ from core.classes.interfaces.iprotocol import IProtocol from core.utils import tr if TYPE_CHECKING: - from core.classes.modules.sasl import Sasl from core.definition import MClient, MSasl, MUser, MChannel class Unrealircd6(IProtocol): @@ -40,7 +39,7 @@ class Unrealircd6(IProtocol): if log: self._Logs.debug(f"[IRCD LOGS] You need to handle this response: {cmd}") - return (-1, None) + return -1, None def register_command(self) -> None: m = self._Irc.Loader.Definition.MIrcdCommand @@ -595,9 +594,9 @@ class Unrealircd6(IProtocol): print_log (bool, optional): Write logs. Defaults to True. """ - userObj = self._Irc.User.get_user(uidornickname) + u = self._Irc.User.get_user(uidornickname) - if userObj is None: + if u is None: self._Logs.error(f"The user [{uidornickname}] is not valid") return None @@ -605,10 +604,10 @@ class Unrealircd6(IProtocol): self._Logs.error(f"The channel [{channel}] is not valid") return None - self.send2socket(f":{userObj.uid} PART {channel}", print_log=print_log) + self.send2socket(f":{u.uid} PART {channel}", print_log=print_log) # Add defender to the channel uids list - self._Irc.Channel.delete_user_from_channel(channel, userObj.uid) + self._Irc.Channel.delete_user_from_channel(channel, u.uid) return None def send_mode_chan(self, channel_name: str, channel_mode: str) -> None: @@ -631,29 +630,29 @@ class Unrealircd6(IProtocol): # COMMON IRC PARSER # ------------------------------------------------------------------------ - def parse_uid(self, serverMsg: list[str]) -> Optional['MUser']: + def parse_uid(self, server_msg: list[str]) -> Optional['MUser']: """Parse UID and return dictionary. >>> ['@s2s-md/geoip=cc=GBtag...', ':001', 'UID', 'albatros', '0', '1721564597', 'albatros', 'hostname...', '001HB8G04', '0', '+iwxz', 'hostname-vhost', 'hostname-vhost', 'MyZBwg==', ':...'] Args: - serverMsg (list[str]): The UID ircd response + server_msg (list[str]): The UID ircd response """ - scopy = serverMsg.copy() + scopy = server_msg.copy() if scopy[0].startswith('@'): scopy.pop(0) uid = scopy[7] return self._User.get_user(uid) - def parse_quit(self, serverMsg: list[str]) -> tuple[Optional['MUser'], str]: + def parse_quit(self, server_msg: list[str]) -> tuple[Optional['MUser'], str]: """Parse quit and return dictionary. >>> # ['@unrealtag...', ':001JKNY0N', 'QUIT', ':Quit:', '....'] Args: - serverMsg (list[str]): The server message to parse + server_msg (list[str]): The server message to parse Returns: tuple[MUser, str]: The User Who Quit Object and the reason. """ - scopy = serverMsg.copy() + scopy = server_msg.copy() if scopy[0].startswith('@'): scopy.pop(0) @@ -664,19 +663,19 @@ class Unrealircd6(IProtocol): return user_obj, reason - def parse_nick(self, serverMsg: list[str]) -> dict[str, str]: + def parse_nick(self, server_msg: list[str]) -> dict[str, str]: """Parse nick changes and return dictionary. >>> ['@unrealircd.org/geoip=FR;unrealircd.org/', ':001OOU2H3', 'NICK', 'WebIrc', '1703795844'] Args: - serverMsg (list[str]): The server message to parse + server_msg (list[str]): The server message to parse Returns: dict: The response as dictionary. >>> {"uid": "", "newnickname": "", "timestamp": ""} """ - scopy = serverMsg.copy() + scopy = server_msg.copy() if scopy[0].startswith('@'): scopy.pop(0) @@ -687,18 +686,18 @@ class Unrealircd6(IProtocol): } return response - def parse_privmsg(self, serverMsg: list[str]) -> tuple[Optional['MUser'], Optional['MUser'], Optional['MChannel'], str]: + def parse_privmsg(self, server_msg: list[str]) -> tuple[Optional['MUser'], Optional['MUser'], Optional['MChannel'], str]: """Parse PRIVMSG message. >>> ['@....', ':97KAAAAAE', 'PRIVMSG', '#welcome', ':This', 'is', 'my', 'public', 'message'] >>> [':97KAAAAAF', 'PRIVMSG', '98KAAAAAB', ':sasa'] Args: - serverMsg (list[str]): The server message to parse + server_msg (list[str]): The server message to parse Returns: tuple[MUser(Sender), MUser(Reciever), MChannel, str]: Sender user model, reciever user model, Channel model, messgae . """ - scopy = serverMsg.copy() + scopy = server_msg.copy() if scopy[0].startswith('@'): scopy.pop(0) @@ -716,25 +715,25 @@ class Unrealircd6(IProtocol): # HANDLE EVENTS # ##################### - def on_svs2mode(self, serverMsg: list[str]) -> None: + def on_svs2mode(self, server_msg: list[str]) -> None: """Handle svs2mode coming from a server >>> [':00BAAAAAG', 'SVS2MODE', '001U01R03', '-r'] Args: - serverMsg (list[str]): Original server message + server_msg (list[str]): Original server message """ try: # >> [':00BAAAAAG', 'SVS2MODE', '001U01R03', '-r'] + scopy = server_msg.copy() + uid_user_to_edit = scopy[2] + umode = scopy[3] - uid_user_to_edit = serverMsg[2] - umode = serverMsg[3] + u = self._Irc.User.get_user(uid_user_to_edit) - userObj = self._Irc.User.get_user(uid_user_to_edit) - - if userObj is None: + if u is None: return None - if self._Irc.User.update_mode(userObj.uid, umode): + if self._Irc.User.update_mode(u.uid, umode): return None return None @@ -743,38 +742,35 @@ class Unrealircd6(IProtocol): except Exception as err: self._Logs.error(f"{__name__} - General Error: {err}") - def on_mode(self, serverMsg: list[str]) -> None: + def on_mode(self, server_msg: list[str]) -> None: """Handle mode coming from a server Args: - serverMsg (list[str]): Original server message + server_msg (list[str]): Original server message """ #['@msgid=d0ySx56Yd0nc35oHts2SkC-/J9mVUA1hfM6...', ':001', 'MODE', '#a', '+nt', '1723207536'] #['@unrealircd.org/userhost=adator@localhost;...', ':001LQ0L0C', 'MODE', '#services', '-l'] return None - def on_umode2(self, serverMsg: list[str]) -> None: + def on_umode2(self, server_msg: list[str]) -> None: """Handle umode2 coming from a server >>> [':adator_', 'UMODE2', '-i'] Args: - serverMsg (list[str]): Original server message + server_msg (list[str]): Original server message """ try: # [':adator_', 'UMODE2', '-iwx'] + scopy = server_msg.copy() + u = self._Irc.User.get_user(str(scopy[0]).lstrip(':')) + user_mode = scopy[2] - userObj = self._Irc.User.get_user(str(serverMsg[0]).lstrip(':')) - userMode = serverMsg[2] - - if userObj is None: # If user is not created + if u is None: # If user is not created return None - # save previous user modes - old_umodes = userObj.umodes - # TODO : User object should be able to update user modes - if self._Irc.User.update_mode(userObj.uid, userMode): + if self._Irc.User.update_mode(u.uid, user_mode): return None # self._Logs.debug(f"Updating user mode for [{userObj.nickname}] [{old_umodes}] => [{userObj.umodes}]") @@ -785,16 +781,16 @@ class Unrealircd6(IProtocol): except Exception as err: self._Logs.error(f"{__name__} - General Error: {err}") - def on_quit(self, serverMsg: list[str]) -> None: + def on_quit(self, server_msg: list[str]) -> None: """Handle quit coming from a server Args: - serverMsg (list[str]): Original server message + server_msg (list[str]): Original server message """ try: # ['@unrealircd.org/userhost=...@192.168.1.10;unrealircd.org/userip=...@192.168.1.10;msgid=CssUrV08BzekYuq7BfvPHn;time=2024-11-02T15:03:33.182Z', ':001JKNY0N', 'QUIT', ':Quit:', '....'] - - uid_who_quit = str(serverMsg[1]).lstrip(':') + scopy = server_msg.copy() + uid_who_quit = str(scopy[1]).lstrip(':') self._Irc.Channel.delete_user_from_all_channel(uid_who_quit) self._Irc.User.delete(uid_who_quit) @@ -809,15 +805,15 @@ class Unrealircd6(IProtocol): except Exception as err: self._Logs.error(f"{__name__} - General Error: {err}") - def on_squit(self, serverMsg: list[str]) -> None: + def on_squit(self, server_msg: list[str]) -> None: """Handle squit coming from a server Args: - serverMsg (list[str]): Original server message + server_msg (list[str]): Original server message """ # ['@msgid=QOEolbRxdhpVW5c8qLkbAU;time=2024-09-21T17:33:16.547Z', 'SQUIT', 'defender.deb.biz.st', ':Connection', 'closed'] - - server_hostname = serverMsg[2] + scopy = server_msg.copy() + server_hostname = scopy[2] uid_to_delete = None for s_user in self._Irc.User.UID_DB: if s_user.hostname == server_hostname and 'S' in s_user.umodes: @@ -831,19 +827,19 @@ class Unrealircd6(IProtocol): return None - def on_protoctl(self, serverMsg: list[str]) -> None: + def on_protoctl(self, server_msg: list[str]) -> None: """Handle protoctl coming from a server Args: - serverMsg (list[str]): Original server message + server_msg (list[str]): Original server message """ # ['PROTOCTL', 'CHANMODES=beI,fkL,lFH,cdimnprstzCDGKMNOPQRSTVZ', 'USERMODES=diopqrstwxzBDGHIRSTWZ', 'BOOTED=1728815798', 'PREFIX=(qaohv)~&@%+', 'SID=001', 'MLOCK', 'TS=1730662755', 'EXTSWHOIS'] - user_modes: str = None - prefix: str = None - host_server_id: str = None + user_modes: Optional[str] = None + prefix: Optional[str] = None + host_server_id: Optional[str] = None + + for msg in server_msg: - for msg in serverMsg: - pattern = None if msg.startswith('PREFIX='): pattern = r'^PREFIX=\((.*)\).*$' find_match = match(pattern, msg) @@ -855,6 +851,7 @@ class Unrealircd6(IProtocol): pattern = r'^USERMODES=(.*)$' find_match = match(pattern, msg) user_modes = find_match.group(1) if find_match else None + elif msg.startswith('SID='): host_server_id = msg.split('=')[1] @@ -867,19 +864,19 @@ class Unrealircd6(IProtocol): return None - def on_nick(self, serverMsg: list[str]) -> None: + def on_nick(self, server_msg: list[str]) -> None: """Handle nick coming from a server new nickname Args: - serverMsg (list[str]): Original server message + server_msg (list[str]): Original server message """ try: # ['@unrealircd.org/geoip=FR;unrealircd.org/', ':001OOU2H3', 'NICK', 'WebIrc', '1703795844'] # Changement de nickname - uid = str(serverMsg[1]).lstrip(':') - newnickname = serverMsg[3] + uid = str(server_msg[1]).lstrip(':') + newnickname = server_msg[3] self._Irc.User.update_nickname(uid, newnickname) self._Irc.Client.update_nickname(uid, newnickname) self._Irc.Admin.update_nickname(uid, newnickname) @@ -892,11 +889,11 @@ class Unrealircd6(IProtocol): except Exception as err: self._Logs.error(f"{__name__} - General Error: {err}") - def on_sjoin(self, serverMsg: list[str]) -> None: + def on_sjoin(self, server_msg: list[str]) -> None: """Handle sjoin coming from a server Args: - serverMsg (list[str]): Original server message + server_msg (list[str]): Original server message """ try: # ['@msgid=5sTwGdj349D82L96p749SY;time=2024-08-15T09:50:23.528Z', ':001', 'SJOIN', '1721564574', '#welcome', ':001JD94QH'] @@ -906,28 +903,28 @@ class Unrealircd6(IProtocol): # '001F16WGR', '001X9YMGQ', '*+001DYPFGP', '@00BAAAAAJ', '001AAGOG9', '001FMFVG8', '001DAEEG7', # '&~G:unknown-users', '"~G:websocket-users', '"~G:known-users', '"~G:webirc-users'] # [':00B', 'SJOIN', '1731872579', '#services', '+', ':00BAAAAAB'] - serverMsg_copy = serverMsg.copy() - if serverMsg_copy[0].startswith('@'): - serverMsg_copy.pop(0) + scopy = server_msg.copy() + if scopy[0].startswith('@'): + scopy.pop(0) - channel = str(serverMsg_copy[3]).lower() - len_cmd = len(serverMsg_copy) + channel = str(scopy[3]).lower() + len_cmd = len(scopy) list_users:list = [] occurence = 0 start_boucle = 0 # Trouver le premier user for i in range(len_cmd): - s: list = findall(fr':', serverMsg_copy[i]) + s: list = findall(fr':', scopy[i]) if s: occurence += 1 if occurence == 2: start_boucle = i # Boucle qui va ajouter l'ensemble des users (UID) - for i in range(start_boucle, len(serverMsg_copy)): - parsed_UID = str(serverMsg_copy[i]) - clean_uid = self._Utils.clean_uid(parsed_UID) + for i in range(start_boucle, len(scopy)): + parsed_uid = str(scopy[i]) + clean_uid = self._Utils.clean_uid(parsed_uid) if not clean_uid is None and len(clean_uid) == 9: list_users.append(clean_uid) @@ -945,16 +942,16 @@ class Unrealircd6(IProtocol): except Exception as err: self._Logs.error(f"{__name__} - General Error: {err}") - def on_part(self, serverMsg: list[str]) -> None: + def on_part(self, server_msg: list[str]) -> None: """Handle part coming from a server Args: - serverMsg (list[str]): Original server message + server_msg (list[str]): Original server message """ try: # ['@unrealircd.org', ':001EPFBRD', 'PART', '#welcome', ':WEB', 'IRC', 'Paris'] - uid = str(serverMsg[1]).lstrip(':') - channel = str(serverMsg[3]).lower() + uid = str(server_msg[1]).lstrip(':') + channel = str(server_msg[3]).lower() self._Irc.Channel.delete_user_from_channel(channel, uid) return None @@ -963,15 +960,15 @@ class Unrealircd6(IProtocol): except Exception as err: self._Logs.error(f"{__name__} - General Error: {err}") - def on_eos(self, serverMsg: list[str]) -> None: + def on_eos(self, server_msg: list[str]) -> None: """Handle EOS coming from a server Args: - serverMsg (list[str]): Original server message + server_msg (list[str]): Original server message """ try: # [':001', 'EOS'] - server_msg_copy = serverMsg.copy() + server_msg_copy = server_msg.copy() hsid = str(server_msg_copy[0]).replace(':','') if hsid == self._Config.HSID: if self._Config.DEFENDER_INIT == 1: @@ -1039,15 +1036,15 @@ class Unrealircd6(IProtocol): except Exception as err: self._Logs.error(f"{__name__} - General Error: {err}") - def on_reputation(self, serverMsg: list[str]) -> None: + def on_reputation(self, server_msg: list[str]) -> None: """Handle REPUTATION coming from a server Args: - serverMsg (list[str]): Original server message + server_msg (list[str]): Original server message """ try: # :001 REPUTATION 127.0.0.1 118 - server_msg_copy = serverMsg.copy() + server_msg_copy = server_msg.copy() self._Irc.first_connexion_ip = server_msg_copy[2] self._Irc.first_score = 0 @@ -1070,44 +1067,44 @@ class Unrealircd6(IProtocol): except Exception as err: self._Logs.error(f"{__name__} - General Error: {err}") - def on_uid(self, serverMsg: list[str]) -> None: + def on_uid(self, server_msg: list[str]) -> None: """Handle uid message coming from the server Args: - serverMsg (list[str]): Original server message + server_msg (list[str]): Original server message """ # ['@s2s-md/geoip=cc=GB|cd=United\\sKingdom|asn=16276|asname=OVH\\sSAS;s2s-md/tls_cipher=TLSv1.3-TLS_CHACHA20_POLY1305_SHA256;s2s-md/creationtime=1721564601', # ':001', 'UID', 'albatros', '0', '1721564597', 'albatros', 'vps-91b2f28b.vps.ovh.net', # '001HB8G04', '0', '+iwxz', 'Clk-A62F1D18.vps.ovh.net', 'Clk-A62F1D18.vps.ovh.net', 'MyZBwg==', ':...'] try: + scopy = server_msg.copy() + is_webirc = True if 'webirc' in scopy[0] else False + is_websocket = True if 'websocket' in scopy[0] else False - isWebirc = True if 'webirc' in serverMsg[0] else False - isWebsocket = True if 'websocket' in serverMsg[0] else False - - uid = str(serverMsg[8]) - nickname = str(serverMsg[3]) - username = str(serverMsg[6]) - hostname = str(serverMsg[7]) - umodes = str(serverMsg[10]) - vhost = str(serverMsg[11]) - remote_ip = '127.0.0.1' if 'S' in umodes else self._Base.decode_ip(str(serverMsg[13])) + uid = str(scopy[8]) + nickname = str(scopy[3]) + username = str(scopy[6]) + hostname = str(scopy[7]) + umodes = str(scopy[10]) + vhost = str(scopy[11]) + remote_ip = '127.0.0.1' if 'S' in umodes else self._Base.decode_ip(str(scopy[13])) # extract realname - realname = ' '.join(serverMsg[14:]).lstrip(':') + realname = ' '.join(scopy[14:]).lstrip(':') # Extract Geoip information pattern = r'^.*geoip=cc=(\S{2}).*$' - geoip_match = match(pattern, serverMsg[0]) + geoip_match = match(pattern, scopy[0]) geoip = geoip_match.group(1) if geoip_match else None # Extract Fingerprint information pattern = r'^.*certfp=([^;]+).*$' - fp_match = match(pattern, serverMsg[0]) + fp_match = match(pattern, scopy[0]) fingerprint = fp_match.group(1) if fp_match else None # Extract tls_cipher information pattern = r'^.*tls_cipher=([^;]+).*$' - tlsc_match = match(pattern, serverMsg[0]) + tlsc_match = match(pattern, scopy[0]) tls_cipher = tlsc_match.group(1) if tlsc_match else None score_connexion = self._Irc.first_score @@ -1122,8 +1119,8 @@ class Unrealircd6(IProtocol): vhost=vhost, fingerprint=fingerprint, tls_cipher=tls_cipher, - isWebirc=isWebirc, - isWebsocket=isWebsocket, + isWebirc=is_webirc, + isWebsocket=is_websocket, remote_ip=remote_ip, geoip=geoip, score_connexion=score_connexion, @@ -1134,9 +1131,9 @@ class Unrealircd6(IProtocol): # Auto Auth admin via fingerprint dnickname = self._Config.SERVICE_NICKNAME dchanlog = self._Config.SERVICE_CHANLOG - GREEN = self._Config.COLORS.green - RED = self._Config.COLORS.red - NOGC = self._Config.COLORS.nogc + green = self._Config.COLORS.green + red = self._Config.COLORS.red + nogc = self._Config.COLORS.nogc # for module in self._Irc.ModuleUtils.model_get_loaded_modules().copy(): # module.class_instance.cmd(serverMsg) @@ -1144,19 +1141,17 @@ class Unrealircd6(IProtocol): # SASL authentication # ['@s2s-md/..', ':001', 'UID', 'adator__', '0', '1755987444', '...', 'desktop-h1qck20.mshome.net', '001XLTT0U', '0', '+iwxz', '*', 'Clk-EC2256B2.mshome.net', 'rBKAAQ==', ':...'] - uid = serverMsg[8] - nickname = serverMsg[3] sasl_obj = self._Irc.Sasl.get_sasl_obj(uid) if sasl_obj: if sasl_obj.auth_success: self._Irc.insert_db_admin(sasl_obj.client_uid, sasl_obj.username, sasl_obj.level, sasl_obj.language) self.send_priv_msg(nick_from=dnickname, - msg=tr("[ %sSASL AUTH%s ] - %s (%s) is now connected successfuly to %s", GREEN, NOGC, nickname, sasl_obj.username, dnickname), + msg=tr("[ %sSASL AUTH%s ] - %s (%s) is now connected successfuly to %s", green, nogc, nickname, sasl_obj.username, dnickname), channel=dchanlog) self.send_notice(nick_from=dnickname, nick_to=nickname, msg=tr("Successfuly connected to %s", dnickname)) else: self.send_priv_msg(nick_from=dnickname, - msg=tr("[ %sSASL AUTH%s ] - %s provided a wrong password for this username %s", RED, NOGC, nickname, sasl_obj.username), + msg=tr("[ %sSASL AUTH%s ] - %s provided a wrong password for this username %s", red, nogc, nickname, sasl_obj.username), channel=dchanlog) self.send_notice(nick_from=dnickname, nick_to=nickname, msg=tr("Wrong password!")) @@ -1169,7 +1164,7 @@ class Unrealircd6(IProtocol): admin = self._Irc.Admin.get_admin(uid) account = admin.account if admin else '' self.send_priv_msg(nick_from=dnickname, - msg=tr("[ %sFINGERPRINT AUTH%s ] - %s (%s) is now connected successfuly to %s", GREEN, NOGC, nickname, account, dnickname), + msg=tr("[ %sFINGERPRINT AUTH%s ] - %s (%s) is now connected successfuly to %s", green, nogc, nickname, account, dnickname), channel=dchanlog) self.send_notice(nick_from=dnickname, nick_to=nickname, msg=tr("Successfuly connected to %s", dnickname)) @@ -1179,22 +1174,22 @@ class Unrealircd6(IProtocol): except Exception as err: self._Logs.error(f"{__name__} - General Error: {err}") - def on_privmsg(self, serverMsg: list[str]) -> None: + def on_privmsg(self, server_msg: list[str]) -> None: """Handle PRIVMSG message coming from the server Args: - serverMsg (list[str]): Original server message + server_msg (list[str]): Original server message """ + srv_msg = server_msg.copy() + cmd = server_msg.copy() try: - srv_msg = serverMsg.copy() - cmd = serverMsg.copy() + # Supprimer la premiere valeur si MTAGS activé if cmd[0].startswith('@'): cmd.pop(0) get_uid_or_nickname = str(cmd[0].replace(':','')) user_trigger = self._Irc.User.get_nickname(get_uid_or_nickname) - dnickname = self._Config.SERVICE_NICKNAME pattern = fr'(:\{self._Config.SERVICE_PREFIX})(.*)$' hcmds = search(pattern, ' '.join(cmd)) # va matcher avec tout les caractéres aprés le . @@ -1267,46 +1262,47 @@ class Unrealircd6(IProtocol): except Exception as err: self._Logs.error(f"General Error: {err} - {srv_msg}" , exc_info=True) - def on_server_ping(self, serverMsg: list[str]) -> None: + def on_server_ping(self, server_msg: list[str]) -> None: """Send a PONG message to the server Args: - serverMsg (list[str]): List of str coming from the server + server_msg (list[str]): List of str coming from the server """ try: - pong = str(serverMsg[1]).replace(':','') + scopy = server_msg.copy() + pong = str(scopy[1]).replace(':','') self.send2socket(f"PONG :{pong}", print_log=False) return None except Exception as err: self._Logs.error(f"{__name__} - General Error: {err}") - def on_server(self, serverMsg: list[str]) -> None: + def on_server(self, server_msg: list[str]) -> None: """_summary_ Args: - serverMsg (list[str]): _description_ + server_msg (list[str]): _description_ """ try: # ['SERVER', 'irc.local.org', '1', ':U6100-Fhn6OoE-001', 'Local', 'Server'] - sCopy = serverMsg.copy() - self._Irc.Settings.MAIN_SERVER_HOSTNAME = sCopy[1] + scopy = server_msg.copy() + self._Irc.Settings.MAIN_SERVER_HOSTNAME = scopy[1] except Exception as err: self._Logs.error(f'General Error: {err}') - def on_version(self, serverMsg: list[str]) -> None: + def on_version(self, server_msg: list[str]) -> None: """Sending Server Version to the server Args: - serverMsg (list[str]): List of str coming from the server + server_msg (list[str]): List of str coming from the server """ # ['@unrealircd.org/userhost=StatServ@stats.deb.biz.st;draft/bot;bot;msgid=ehfAq3m2yjMjhgWEfi1UCS;time=2024-10-26T13:49:06.299Z', ':00BAAAAAI', 'PRIVMSG', '12ZAAAAAB', ':\x01VERSION\x01'] # Réponse a un CTCP VERSION try: - - nickname = self._Irc.User.get_nickname(self._Utils.clean_uid(serverMsg[1])) + scopy = server_msg.copy() + nickname = self._Irc.User.get_nickname(self._Utils.clean_uid(scopy[1])) dnickname = self._Config.SERVICE_NICKNAME - arg = serverMsg[4].replace(':', '') + arg = scopy[4].replace(':', '') if nickname is None: return None @@ -1318,19 +1314,19 @@ class Unrealircd6(IProtocol): except Exception as err: self._Logs.error(f"{__name__} - General Error: {err}") - def on_time(self, serverMsg: list[str]) -> None: + def on_time(self, server_msg: list[str]) -> None: """Sending TIME answer to a requestor Args: - serverMsg (list[str]): List of str coming from the server + server_msg (list[str]): List of str coming from the server """ # ['@unrealircd.org/userhost=StatServ@stats.deb.biz.st;draft/bot;bot;msgid=ehfAq3m2yjMjhgWEfi1UCS;time=2024-10-26T13:49:06.299Z', ':00BAAAAAI', 'PRIVMSG', '12ZAAAAAB', ':\x01TIME\x01'] # Réponse a un CTCP VERSION try: - - nickname = self._Irc.User.get_nickname(self._Utils.clean_uid(serverMsg[1])) + scopy = server_msg.copy() + nickname = self._Irc.User.get_nickname(self._Utils.clean_uid(scopy[1])) dnickname = self._Config.SERVICE_NICKNAME - arg = serverMsg[4].replace(':', '') + arg = scopy[4].replace(':', '') current_datetime = self._Utils.get_sdatetime() if nickname is None: @@ -1343,26 +1339,26 @@ class Unrealircd6(IProtocol): except Exception as err: self._Logs.error(f"{__name__} - General Error: {err}") - def on_ping(self, serverMsg: list[str]) -> None: + def on_ping(self, server_msg: list[str]) -> None: """Sending a PING answer to requestor Args: - serverMsg (list[str]): List of str coming from the server + server_msg (list[str]): List of str coming from the server """ # ['@unrealircd.org/...', ':001INC60B', 'PRIVMSG', '12ZAAAAAB', ':\x01PING', '762382207\x01'] # Réponse a un CTCP VERSION try: - - nickname = self._Irc.User.get_nickname(self._Utils.clean_uid(serverMsg[1])) + scopy = server_msg.copy() + nickname = self._Irc.User.get_nickname(self._Utils.clean_uid(scopy[1])) dnickname = self._Config.SERVICE_NICKNAME - arg = serverMsg[4].replace(':', '') + arg = scopy[4].replace(':', '') if nickname is None: - self._Logs.debug(serverMsg) + self._Logs.debug(scopy) return None if arg == '\x01PING': - recieved_unixtime = int(serverMsg[5].replace('\x01','')) + recieved_unixtime = int(scopy[5].replace('\x01','')) current_unixtime = self._Utils.get_unixtime() ping_response = current_unixtime - recieved_unixtime @@ -1372,69 +1368,68 @@ class Unrealircd6(IProtocol): nick_to=nickname, msg=f"\x01PING {ping_response} secs\x01" ) - self._Logs.debug(serverMsg) + self._Logs.debug(scopy) return None except Exception as err: self._Logs.error(f"{__name__} - General Error: {err}") - def on_version_msg(self, serverMsg: list[str]) -> None: + def on_version_msg(self, server_msg: list[str]) -> None: """Handle version coming from the server \n ex. /version Defender Args: - serverMsg (list[str]): Original message from the server + server_msg (list[str]): Original message from the server """ try: # ['@label=0073', ':0014E7P06', 'VERSION', 'PyDefender'] - serverMsg_copy = serverMsg.copy() - if '@' in list(serverMsg_copy[0])[0]: - serverMsg_copy.pop(0) + scopy = server_msg.copy() + if '@' in list(scopy[0])[0]: + scopy.pop(0) - getUser = self._Irc.User.get_user(self._Utils.clean_uid(serverMsg_copy[0])) + u = self._Irc.User.get_user(self._Utils.clean_uid(scopy[0])) - if getUser is None: + if u is None: return None response_351 = f"{self._Config.SERVICE_NAME.capitalize()}-{self._Config.CURRENT_VERSION} {self._Config.SERVICE_HOST} {self.name}" - self.send2socket(f':{self._Config.SERVICE_HOST} 351 {getUser.nickname} {response_351}') + self.send2socket(f':{self._Config.SERVICE_HOST} 351 {u.nickname} {response_351}') modules = self._Irc.ModuleUtils.get_all_available_modules() response_005 = ' | '.join(modules) - self.send2socket(f':{self._Config.SERVICE_HOST} 005 {getUser.nickname} {response_005} are supported by this server') + self.send2socket(f':{self._Config.SERVICE_HOST} 005 {u.nickname} {response_005} are supported by this server') response_005 = ''.join(self._Settings.PROTOCTL_USER_MODES) - self.send2socket(f":{self._Config.SERVICE_HOST} 005 {getUser.nickname} {response_005} are supported by this server") + self.send2socket(f":{self._Config.SERVICE_HOST} 005 {u.nickname} {response_005} are supported by this server") return None except Exception as err: self._Logs.error(f"{__name__} - General Error: {err}") - def on_smod(self, serverMsg: list[str]) -> None: + def on_smod(self, server_msg: list[str]) -> None: """Handle SMOD message coming from the server Args: - serverMsg (list[str]): Original server message + server_msg (list[str]): Original server message """ try: # [':001', 'SMOD', ':L:history_backend_mem:2.0', 'L:channeldb:1.0', 'L:tkldb:1.10', 'L:staff:3.8', 'L:ircops:3.71', ...] - sCopy = serverMsg.copy() - modules = [m.lstrip(':') for m in sCopy[2:]] + scopy = server_msg.copy() + modules = [m.lstrip(':') for m in scopy[2:]] for smod in modules: smod_split = smod.split(':') - sModObj = self._Irc.Loader.Definition.MSModule(type=smod_split[0], name=smod_split[1], version=smod_split[2]) - self._Settings.SMOD_MODULES.append(sModObj) + smodobj = self._Irc.Loader.Definition.MSModule(type=smod_split[0], name=smod_split[1], version=smod_split[2]) + self._Settings.SMOD_MODULES.append(smodobj) except Exception as err: self._Logs.error(f'General Error: {err}') - def on_sasl(self, serverMsg: list[str]) -> Optional['MSasl']: + def on_sasl(self, server_msg: list[str]) -> Optional['MSasl']: """Handle SASL coming from a server Args: - serverMsg (list[str]): Original server message - psasl (Sasl): The SASL process object + server_msg (list[str]): Original server message """ try: # [':irc.local.org', 'SASL', 'defender-dev.deb.biz.st', '00157Z26U', 'H', '172.18.128.1', '172.18.128.1'] @@ -1442,6 +1437,7 @@ class Unrealircd6(IProtocol): # [':irc.local.org', 'SASL', 'defender-dev.deb.biz.st', '0014ZZH1F', 'S', 'EXTERNAL', 'zzzzzzzkey'] # [':irc.local.org', 'SASL', 'defender-dev.deb.biz.st', '00157Z26U', 'C', 'sasakey=='] # [':irc.local.org', 'SASL', 'defender-dev.deb.biz.st', '00157Z26U', 'D', 'A'] + scopy = server_msg.copy() psasl = self._Irc.Sasl sasl_enabled = False for smod in self._Settings.SMOD_MODULES: @@ -1452,10 +1448,8 @@ class Unrealircd6(IProtocol): if not sasl_enabled: return None - sCopy = serverMsg.copy() - client_uid = sCopy[3] if len(sCopy) >= 6 else None - sasl_obj = None - sasl_message_type = sCopy[4] if len(sCopy) >= 6 else None + client_uid = scopy[3] if len(scopy) >= 6 else None + sasl_message_type = scopy[4] if len(scopy) >= 6 else None psasl.insert_sasl_client(self._Irc.Loader.Definition.MSasl(client_uid=client_uid)) sasl_obj = psasl.get_sasl_obj(client_uid) @@ -1464,22 +1458,22 @@ class Unrealircd6(IProtocol): match sasl_message_type: case 'H': - sasl_obj.remote_ip = str(sCopy[5]) + sasl_obj.remote_ip = str(scopy[5]) sasl_obj.message_type = sasl_message_type return sasl_obj case 'S': sasl_obj.message_type = sasl_message_type - if str(sCopy[5]) in ['PLAIN', 'EXTERNAL']: - sasl_obj.mechanisme = str(sCopy[5]) + if str(scopy[5]) in ['PLAIN', 'EXTERNAL']: + sasl_obj.mechanisme = str(scopy[5]) if sasl_obj.mechanisme == "PLAIN": self.send2socket(f":{self._Config.SERVEUR_LINK} SASL {self._Settings.MAIN_SERVER_HOSTNAME} {sasl_obj.client_uid} C +") elif sasl_obj.mechanisme == "EXTERNAL": - if str(sCopy[5]) == "+": + if str(scopy[5]) == "+": return None - sasl_obj.fingerprint = str(sCopy[6]) + sasl_obj.fingerprint = str(scopy[6]) self.send2socket(f":{self._Config.SERVEUR_LINK} SASL {self._Settings.MAIN_SERVER_HOSTNAME} {sasl_obj.client_uid} C +") self.on_sasl_authentication_process(sasl_obj) @@ -1487,7 +1481,7 @@ class Unrealircd6(IProtocol): case 'C': if sasl_obj.mechanisme == "PLAIN": - credentials = sCopy[5] + credentials = scopy[5] decoded_credentials = b64decode(credentials).decode() user, username, password = decoded_credentials.split('\0') @@ -1506,7 +1500,7 @@ class Unrealircd6(IProtocol): except Exception as err: self._Logs.error(f'General Error: {err}', exc_info=True) - def on_sasl_authentication_process(self, sasl_model: 'MSasl') -> bool: + def on_sasl_authentication_process(self, sasl_model: 'MSasl') -> None: s = sasl_model if sasl_model: def db_get_admin_info(*, username: Optional[str] = None, password: Optional[str] = None, fingerprint: Optional[str] = None) -> Optional[dict[str, Any]]: @@ -1553,15 +1547,15 @@ class Unrealircd6(IProtocol): self.send2socket(f":{self._Config.SERVEUR_LINK} SASL {self._Settings.MAIN_SERVER_HOSTNAME} {s.client_uid} D F") self.send2socket(f":{self._Config.SERVEUR_LINK} 904 {s.username} :SASL authentication failed") - def on_md(self, serverMsg: list[str]) -> None: + def on_md(self, server_msg: list[str]) -> None: """Handle MD responses [':001', 'MD', 'client', '001MYIZ03', 'certfp', ':d1235648...'] Args: - serverMsg (list[str]): The server reply + server_msg (list[str]): The server reply """ try: - scopy = serverMsg.copy() - available_vars = ['creationtime', 'certfp', 'tls_cipher'] + scopy = server_msg.copy() + # available_vars = ['creationtime', 'certfp', 'tls_cipher'] uid = str(scopy[3]) var = str(scopy[4]).lower() @@ -1583,14 +1577,14 @@ class Unrealircd6(IProtocol): except Exception as e: self._Logs.error(f"General Error: {e}") - def on_kick(self, serverMsg: list[str]) -> None: + def on_kick(self, server_msg: list[str]) -> None: """When a user is kicked out from a channel ['@unrealircd.org/issued-by=RPC:admin-for-test@...', ':001', 'KICK', '#jsonrpc', '001ELW13T', ':Kicked', 'from', 'JSONRPC', 'User'] Args: - serverMsg (list[str]): The server message + server_msg (list[str]): The server message """ - scopy = serverMsg.copy() + scopy = server_msg.copy() uid = scopy[4] channel = scopy[3]