Refactoring unreal6 code!

This commit is contained in:
adator
2025-11-11 03:47:02 +01:00
parent 8932e1441a
commit 999072a88a

View File

@@ -7,7 +7,6 @@ from core.classes.interfaces.iprotocol import IProtocol
from core.utils import tr from core.utils import tr
if TYPE_CHECKING: if TYPE_CHECKING:
from core.classes.modules.sasl import Sasl
from core.definition import MClient, MSasl, MUser, MChannel from core.definition import MClient, MSasl, MUser, MChannel
class Unrealircd6(IProtocol): class Unrealircd6(IProtocol):
@@ -40,7 +39,7 @@ class Unrealircd6(IProtocol):
if log: if log:
self._Logs.debug(f"[IRCD LOGS] You need to handle this response: {cmd}") self._Logs.debug(f"[IRCD LOGS] You need to handle this response: {cmd}")
return (-1, None) return -1, None
def register_command(self) -> None: def register_command(self) -> None:
m = self._Irc.Loader.Definition.MIrcdCommand m = self._Irc.Loader.Definition.MIrcdCommand
@@ -595,9 +594,9 @@ class Unrealircd6(IProtocol):
print_log (bool, optional): Write logs. Defaults to True. print_log (bool, optional): Write logs. Defaults to True.
""" """
userObj = self._Irc.User.get_user(uidornickname) u = self._Irc.User.get_user(uidornickname)
if userObj is None: if u is None:
self._Logs.error(f"The user [{uidornickname}] is not valid") self._Logs.error(f"The user [{uidornickname}] is not valid")
return None return None
@@ -605,10 +604,10 @@ class Unrealircd6(IProtocol):
self._Logs.error(f"The channel [{channel}] is not valid") self._Logs.error(f"The channel [{channel}] is not valid")
return None return None
self.send2socket(f":{userObj.uid} PART {channel}", print_log=print_log) self.send2socket(f":{u.uid} PART {channel}", print_log=print_log)
# Add defender to the channel uids list # Add defender to the channel uids list
self._Irc.Channel.delete_user_from_channel(channel, userObj.uid) self._Irc.Channel.delete_user_from_channel(channel, u.uid)
return None return None
def send_mode_chan(self, channel_name: str, channel_mode: str) -> None: def send_mode_chan(self, channel_name: str, channel_mode: str) -> None:
@@ -631,29 +630,29 @@ class Unrealircd6(IProtocol):
# COMMON IRC PARSER # COMMON IRC PARSER
# ------------------------------------------------------------------------ # ------------------------------------------------------------------------
def parse_uid(self, serverMsg: list[str]) -> Optional['MUser']: def parse_uid(self, server_msg: list[str]) -> Optional['MUser']:
"""Parse UID and return dictionary. """Parse UID and return dictionary.
>>> ['@s2s-md/geoip=cc=GBtag...', ':001', 'UID', 'albatros', '0', '1721564597', 'albatros', 'hostname...', '001HB8G04', '0', '+iwxz', 'hostname-vhost', 'hostname-vhost', 'MyZBwg==', ':...'] >>> ['@s2s-md/geoip=cc=GBtag...', ':001', 'UID', 'albatros', '0', '1721564597', 'albatros', 'hostname...', '001HB8G04', '0', '+iwxz', 'hostname-vhost', 'hostname-vhost', 'MyZBwg==', ':...']
Args: Args:
serverMsg (list[str]): The UID ircd response server_msg (list[str]): The UID ircd response
""" """
scopy = serverMsg.copy() scopy = server_msg.copy()
if scopy[0].startswith('@'): if scopy[0].startswith('@'):
scopy.pop(0) scopy.pop(0)
uid = scopy[7] uid = scopy[7]
return self._User.get_user(uid) return self._User.get_user(uid)
def parse_quit(self, serverMsg: list[str]) -> tuple[Optional['MUser'], str]: def parse_quit(self, server_msg: list[str]) -> tuple[Optional['MUser'], str]:
"""Parse quit and return dictionary. """Parse quit and return dictionary.
>>> # ['@unrealtag...', ':001JKNY0N', 'QUIT', ':Quit:', '....'] >>> # ['@unrealtag...', ':001JKNY0N', 'QUIT', ':Quit:', '....']
Args: Args:
serverMsg (list[str]): The server message to parse server_msg (list[str]): The server message to parse
Returns: Returns:
tuple[MUser, str]: The User Who Quit Object and the reason. tuple[MUser, str]: The User Who Quit Object and the reason.
""" """
scopy = serverMsg.copy() scopy = server_msg.copy()
if scopy[0].startswith('@'): if scopy[0].startswith('@'):
scopy.pop(0) scopy.pop(0)
@@ -664,19 +663,19 @@ class Unrealircd6(IProtocol):
return user_obj, reason return user_obj, reason
def parse_nick(self, serverMsg: list[str]) -> dict[str, str]: def parse_nick(self, server_msg: list[str]) -> dict[str, str]:
"""Parse nick changes and return dictionary. """Parse nick changes and return dictionary.
>>> ['@unrealircd.org/geoip=FR;unrealircd.org/', ':001OOU2H3', 'NICK', 'WebIrc', '1703795844'] >>> ['@unrealircd.org/geoip=FR;unrealircd.org/', ':001OOU2H3', 'NICK', 'WebIrc', '1703795844']
Args: Args:
serverMsg (list[str]): The server message to parse server_msg (list[str]): The server message to parse
Returns: Returns:
dict: The response as dictionary. dict: The response as dictionary.
>>> {"uid": "", "newnickname": "", "timestamp": ""} >>> {"uid": "", "newnickname": "", "timestamp": ""}
""" """
scopy = serverMsg.copy() scopy = server_msg.copy()
if scopy[0].startswith('@'): if scopy[0].startswith('@'):
scopy.pop(0) scopy.pop(0)
@@ -687,18 +686,18 @@ class Unrealircd6(IProtocol):
} }
return response return response
def parse_privmsg(self, serverMsg: list[str]) -> tuple[Optional['MUser'], Optional['MUser'], Optional['MChannel'], str]: def parse_privmsg(self, server_msg: list[str]) -> tuple[Optional['MUser'], Optional['MUser'], Optional['MChannel'], str]:
"""Parse PRIVMSG message. """Parse PRIVMSG message.
>>> ['@....', ':97KAAAAAE', 'PRIVMSG', '#welcome', ':This', 'is', 'my', 'public', 'message'] >>> ['@....', ':97KAAAAAE', 'PRIVMSG', '#welcome', ':This', 'is', 'my', 'public', 'message']
>>> [':97KAAAAAF', 'PRIVMSG', '98KAAAAAB', ':sasa'] >>> [':97KAAAAAF', 'PRIVMSG', '98KAAAAAB', ':sasa']
Args: Args:
serverMsg (list[str]): The server message to parse server_msg (list[str]): The server message to parse
Returns: Returns:
tuple[MUser(Sender), MUser(Reciever), MChannel, str]: Sender user model, reciever user model, Channel model, messgae . tuple[MUser(Sender), MUser(Reciever), MChannel, str]: Sender user model, reciever user model, Channel model, messgae .
""" """
scopy = serverMsg.copy() scopy = server_msg.copy()
if scopy[0].startswith('@'): if scopy[0].startswith('@'):
scopy.pop(0) scopy.pop(0)
@@ -716,25 +715,25 @@ class Unrealircd6(IProtocol):
# HANDLE EVENTS # # HANDLE EVENTS #
##################### #####################
def on_svs2mode(self, serverMsg: list[str]) -> None: def on_svs2mode(self, server_msg: list[str]) -> None:
"""Handle svs2mode coming from a server """Handle svs2mode coming from a server
>>> [':00BAAAAAG', 'SVS2MODE', '001U01R03', '-r'] >>> [':00BAAAAAG', 'SVS2MODE', '001U01R03', '-r']
Args: Args:
serverMsg (list[str]): Original server message server_msg (list[str]): Original server message
""" """
try: try:
# >> [':00BAAAAAG', 'SVS2MODE', '001U01R03', '-r'] # >> [':00BAAAAAG', 'SVS2MODE', '001U01R03', '-r']
scopy = server_msg.copy()
uid_user_to_edit = scopy[2]
umode = scopy[3]
uid_user_to_edit = serverMsg[2] u = self._Irc.User.get_user(uid_user_to_edit)
umode = serverMsg[3]
userObj = self._Irc.User.get_user(uid_user_to_edit) if u is None:
if userObj is None:
return None return None
if self._Irc.User.update_mode(userObj.uid, umode): if self._Irc.User.update_mode(u.uid, umode):
return None return None
return None return None
@@ -743,38 +742,35 @@ class Unrealircd6(IProtocol):
except Exception as err: except Exception as err:
self._Logs.error(f"{__name__} - General Error: {err}") self._Logs.error(f"{__name__} - General Error: {err}")
def on_mode(self, serverMsg: list[str]) -> None: def on_mode(self, server_msg: list[str]) -> None:
"""Handle mode coming from a server """Handle mode coming from a server
Args: Args:
serverMsg (list[str]): Original server message server_msg (list[str]): Original server message
""" """
#['@msgid=d0ySx56Yd0nc35oHts2SkC-/J9mVUA1hfM6...', ':001', 'MODE', '#a', '+nt', '1723207536'] #['@msgid=d0ySx56Yd0nc35oHts2SkC-/J9mVUA1hfM6...', ':001', 'MODE', '#a', '+nt', '1723207536']
#['@unrealircd.org/userhost=adator@localhost;...', ':001LQ0L0C', 'MODE', '#services', '-l'] #['@unrealircd.org/userhost=adator@localhost;...', ':001LQ0L0C', 'MODE', '#services', '-l']
return None return None
def on_umode2(self, serverMsg: list[str]) -> None: def on_umode2(self, server_msg: list[str]) -> None:
"""Handle umode2 coming from a server """Handle umode2 coming from a server
>>> [':adator_', 'UMODE2', '-i'] >>> [':adator_', 'UMODE2', '-i']
Args: Args:
serverMsg (list[str]): Original server message server_msg (list[str]): Original server message
""" """
try: try:
# [':adator_', 'UMODE2', '-iwx'] # [':adator_', 'UMODE2', '-iwx']
scopy = server_msg.copy()
u = self._Irc.User.get_user(str(scopy[0]).lstrip(':'))
user_mode = scopy[2]
userObj = self._Irc.User.get_user(str(serverMsg[0]).lstrip(':')) if u is None: # If user is not created
userMode = serverMsg[2]
if userObj is None: # If user is not created
return None return None
# save previous user modes
old_umodes = userObj.umodes
# TODO : User object should be able to update user modes # TODO : User object should be able to update user modes
if self._Irc.User.update_mode(userObj.uid, userMode): if self._Irc.User.update_mode(u.uid, user_mode):
return None return None
# self._Logs.debug(f"Updating user mode for [{userObj.nickname}] [{old_umodes}] => [{userObj.umodes}]") # self._Logs.debug(f"Updating user mode for [{userObj.nickname}] [{old_umodes}] => [{userObj.umodes}]")
@@ -785,16 +781,16 @@ class Unrealircd6(IProtocol):
except Exception as err: except Exception as err:
self._Logs.error(f"{__name__} - General Error: {err}") self._Logs.error(f"{__name__} - General Error: {err}")
def on_quit(self, serverMsg: list[str]) -> None: def on_quit(self, server_msg: list[str]) -> None:
"""Handle quit coming from a server """Handle quit coming from a server
Args: Args:
serverMsg (list[str]): Original server message server_msg (list[str]): Original server message
""" """
try: try:
# ['@unrealircd.org/userhost=...@192.168.1.10;unrealircd.org/userip=...@192.168.1.10;msgid=CssUrV08BzekYuq7BfvPHn;time=2024-11-02T15:03:33.182Z', ':001JKNY0N', 'QUIT', ':Quit:', '....'] # ['@unrealircd.org/userhost=...@192.168.1.10;unrealircd.org/userip=...@192.168.1.10;msgid=CssUrV08BzekYuq7BfvPHn;time=2024-11-02T15:03:33.182Z', ':001JKNY0N', 'QUIT', ':Quit:', '....']
scopy = server_msg.copy()
uid_who_quit = str(serverMsg[1]).lstrip(':') uid_who_quit = str(scopy[1]).lstrip(':')
self._Irc.Channel.delete_user_from_all_channel(uid_who_quit) self._Irc.Channel.delete_user_from_all_channel(uid_who_quit)
self._Irc.User.delete(uid_who_quit) self._Irc.User.delete(uid_who_quit)
@@ -809,15 +805,15 @@ class Unrealircd6(IProtocol):
except Exception as err: except Exception as err:
self._Logs.error(f"{__name__} - General Error: {err}") self._Logs.error(f"{__name__} - General Error: {err}")
def on_squit(self, serverMsg: list[str]) -> None: def on_squit(self, server_msg: list[str]) -> None:
"""Handle squit coming from a server """Handle squit coming from a server
Args: Args:
serverMsg (list[str]): Original server message server_msg (list[str]): Original server message
""" """
# ['@msgid=QOEolbRxdhpVW5c8qLkbAU;time=2024-09-21T17:33:16.547Z', 'SQUIT', 'defender.deb.biz.st', ':Connection', 'closed'] # ['@msgid=QOEolbRxdhpVW5c8qLkbAU;time=2024-09-21T17:33:16.547Z', 'SQUIT', 'defender.deb.biz.st', ':Connection', 'closed']
scopy = server_msg.copy()
server_hostname = serverMsg[2] server_hostname = scopy[2]
uid_to_delete = None uid_to_delete = None
for s_user in self._Irc.User.UID_DB: for s_user in self._Irc.User.UID_DB:
if s_user.hostname == server_hostname and 'S' in s_user.umodes: if s_user.hostname == server_hostname and 'S' in s_user.umodes:
@@ -831,19 +827,19 @@ class Unrealircd6(IProtocol):
return None return None
def on_protoctl(self, serverMsg: list[str]) -> None: def on_protoctl(self, server_msg: list[str]) -> None:
"""Handle protoctl coming from a server """Handle protoctl coming from a server
Args: Args:
serverMsg (list[str]): Original server message server_msg (list[str]): Original server message
""" """
# ['PROTOCTL', 'CHANMODES=beI,fkL,lFH,cdimnprstzCDGKMNOPQRSTVZ', 'USERMODES=diopqrstwxzBDGHIRSTWZ', 'BOOTED=1728815798', 'PREFIX=(qaohv)~&@%+', 'SID=001', 'MLOCK', 'TS=1730662755', 'EXTSWHOIS'] # ['PROTOCTL', 'CHANMODES=beI,fkL,lFH,cdimnprstzCDGKMNOPQRSTVZ', 'USERMODES=diopqrstwxzBDGHIRSTWZ', 'BOOTED=1728815798', 'PREFIX=(qaohv)~&@%+', 'SID=001', 'MLOCK', 'TS=1730662755', 'EXTSWHOIS']
user_modes: str = None user_modes: Optional[str] = None
prefix: str = None prefix: Optional[str] = None
host_server_id: str = None host_server_id: Optional[str] = None
for msg in server_msg:
for msg in serverMsg:
pattern = None
if msg.startswith('PREFIX='): if msg.startswith('PREFIX='):
pattern = r'^PREFIX=\((.*)\).*$' pattern = r'^PREFIX=\((.*)\).*$'
find_match = match(pattern, msg) find_match = match(pattern, msg)
@@ -855,6 +851,7 @@ class Unrealircd6(IProtocol):
pattern = r'^USERMODES=(.*)$' pattern = r'^USERMODES=(.*)$'
find_match = match(pattern, msg) find_match = match(pattern, msg)
user_modes = find_match.group(1) if find_match else None user_modes = find_match.group(1) if find_match else None
elif msg.startswith('SID='): elif msg.startswith('SID='):
host_server_id = msg.split('=')[1] host_server_id = msg.split('=')[1]
@@ -867,19 +864,19 @@ class Unrealircd6(IProtocol):
return None return None
def on_nick(self, serverMsg: list[str]) -> None: def on_nick(self, server_msg: list[str]) -> None:
"""Handle nick coming from a server """Handle nick coming from a server
new nickname new nickname
Args: Args:
serverMsg (list[str]): Original server message server_msg (list[str]): Original server message
""" """
try: try:
# ['@unrealircd.org/geoip=FR;unrealircd.org/', ':001OOU2H3', 'NICK', 'WebIrc', '1703795844'] # ['@unrealircd.org/geoip=FR;unrealircd.org/', ':001OOU2H3', 'NICK', 'WebIrc', '1703795844']
# Changement de nickname # Changement de nickname
uid = str(serverMsg[1]).lstrip(':') uid = str(server_msg[1]).lstrip(':')
newnickname = serverMsg[3] newnickname = server_msg[3]
self._Irc.User.update_nickname(uid, newnickname) self._Irc.User.update_nickname(uid, newnickname)
self._Irc.Client.update_nickname(uid, newnickname) self._Irc.Client.update_nickname(uid, newnickname)
self._Irc.Admin.update_nickname(uid, newnickname) self._Irc.Admin.update_nickname(uid, newnickname)
@@ -892,11 +889,11 @@ class Unrealircd6(IProtocol):
except Exception as err: except Exception as err:
self._Logs.error(f"{__name__} - General Error: {err}") self._Logs.error(f"{__name__} - General Error: {err}")
def on_sjoin(self, serverMsg: list[str]) -> None: def on_sjoin(self, server_msg: list[str]) -> None:
"""Handle sjoin coming from a server """Handle sjoin coming from a server
Args: Args:
serverMsg (list[str]): Original server message server_msg (list[str]): Original server message
""" """
try: try:
# ['@msgid=5sTwGdj349D82L96p749SY;time=2024-08-15T09:50:23.528Z', ':001', 'SJOIN', '1721564574', '#welcome', ':001JD94QH'] # ['@msgid=5sTwGdj349D82L96p749SY;time=2024-08-15T09:50:23.528Z', ':001', 'SJOIN', '1721564574', '#welcome', ':001JD94QH']
@@ -906,28 +903,28 @@ class Unrealircd6(IProtocol):
# '001F16WGR', '001X9YMGQ', '*+001DYPFGP', '@00BAAAAAJ', '001AAGOG9', '001FMFVG8', '001DAEEG7', # '001F16WGR', '001X9YMGQ', '*+001DYPFGP', '@00BAAAAAJ', '001AAGOG9', '001FMFVG8', '001DAEEG7',
# '&~G:unknown-users', '"~G:websocket-users', '"~G:known-users', '"~G:webirc-users'] # '&~G:unknown-users', '"~G:websocket-users', '"~G:known-users', '"~G:webirc-users']
# [':00B', 'SJOIN', '1731872579', '#services', '+', ':00BAAAAAB'] # [':00B', 'SJOIN', '1731872579', '#services', '+', ':00BAAAAAB']
serverMsg_copy = serverMsg.copy() scopy = server_msg.copy()
if serverMsg_copy[0].startswith('@'): if scopy[0].startswith('@'):
serverMsg_copy.pop(0) scopy.pop(0)
channel = str(serverMsg_copy[3]).lower() channel = str(scopy[3]).lower()
len_cmd = len(serverMsg_copy) len_cmd = len(scopy)
list_users:list = [] list_users:list = []
occurence = 0 occurence = 0
start_boucle = 0 start_boucle = 0
# Trouver le premier user # Trouver le premier user
for i in range(len_cmd): for i in range(len_cmd):
s: list = findall(fr':', serverMsg_copy[i]) s: list = findall(fr':', scopy[i])
if s: if s:
occurence += 1 occurence += 1
if occurence == 2: if occurence == 2:
start_boucle = i start_boucle = i
# Boucle qui va ajouter l'ensemble des users (UID) # Boucle qui va ajouter l'ensemble des users (UID)
for i in range(start_boucle, len(serverMsg_copy)): for i in range(start_boucle, len(scopy)):
parsed_UID = str(serverMsg_copy[i]) parsed_uid = str(scopy[i])
clean_uid = self._Utils.clean_uid(parsed_UID) clean_uid = self._Utils.clean_uid(parsed_uid)
if not clean_uid is None and len(clean_uid) == 9: if not clean_uid is None and len(clean_uid) == 9:
list_users.append(clean_uid) list_users.append(clean_uid)
@@ -945,16 +942,16 @@ class Unrealircd6(IProtocol):
except Exception as err: except Exception as err:
self._Logs.error(f"{__name__} - General Error: {err}") self._Logs.error(f"{__name__} - General Error: {err}")
def on_part(self, serverMsg: list[str]) -> None: def on_part(self, server_msg: list[str]) -> None:
"""Handle part coming from a server """Handle part coming from a server
Args: Args:
serverMsg (list[str]): Original server message server_msg (list[str]): Original server message
""" """
try: try:
# ['@unrealircd.org', ':001EPFBRD', 'PART', '#welcome', ':WEB', 'IRC', 'Paris'] # ['@unrealircd.org', ':001EPFBRD', 'PART', '#welcome', ':WEB', 'IRC', 'Paris']
uid = str(serverMsg[1]).lstrip(':') uid = str(server_msg[1]).lstrip(':')
channel = str(serverMsg[3]).lower() channel = str(server_msg[3]).lower()
self._Irc.Channel.delete_user_from_channel(channel, uid) self._Irc.Channel.delete_user_from_channel(channel, uid)
return None return None
@@ -963,15 +960,15 @@ class Unrealircd6(IProtocol):
except Exception as err: except Exception as err:
self._Logs.error(f"{__name__} - General Error: {err}") self._Logs.error(f"{__name__} - General Error: {err}")
def on_eos(self, serverMsg: list[str]) -> None: def on_eos(self, server_msg: list[str]) -> None:
"""Handle EOS coming from a server """Handle EOS coming from a server
Args: Args:
serverMsg (list[str]): Original server message server_msg (list[str]): Original server message
""" """
try: try:
# [':001', 'EOS'] # [':001', 'EOS']
server_msg_copy = serverMsg.copy() server_msg_copy = server_msg.copy()
hsid = str(server_msg_copy[0]).replace(':','') hsid = str(server_msg_copy[0]).replace(':','')
if hsid == self._Config.HSID: if hsid == self._Config.HSID:
if self._Config.DEFENDER_INIT == 1: if self._Config.DEFENDER_INIT == 1:
@@ -1039,15 +1036,15 @@ class Unrealircd6(IProtocol):
except Exception as err: except Exception as err:
self._Logs.error(f"{__name__} - General Error: {err}") self._Logs.error(f"{__name__} - General Error: {err}")
def on_reputation(self, serverMsg: list[str]) -> None: def on_reputation(self, server_msg: list[str]) -> None:
"""Handle REPUTATION coming from a server """Handle REPUTATION coming from a server
Args: Args:
serverMsg (list[str]): Original server message server_msg (list[str]): Original server message
""" """
try: try:
# :001 REPUTATION 127.0.0.1 118 # :001 REPUTATION 127.0.0.1 118
server_msg_copy = serverMsg.copy() server_msg_copy = server_msg.copy()
self._Irc.first_connexion_ip = server_msg_copy[2] self._Irc.first_connexion_ip = server_msg_copy[2]
self._Irc.first_score = 0 self._Irc.first_score = 0
@@ -1070,44 +1067,44 @@ class Unrealircd6(IProtocol):
except Exception as err: except Exception as err:
self._Logs.error(f"{__name__} - General Error: {err}") self._Logs.error(f"{__name__} - General Error: {err}")
def on_uid(self, serverMsg: list[str]) -> None: def on_uid(self, server_msg: list[str]) -> None:
"""Handle uid message coming from the server """Handle uid message coming from the server
Args: Args:
serverMsg (list[str]): Original server message server_msg (list[str]): Original server message
""" """
# ['@s2s-md/geoip=cc=GB|cd=United\\sKingdom|asn=16276|asname=OVH\\sSAS;s2s-md/tls_cipher=TLSv1.3-TLS_CHACHA20_POLY1305_SHA256;s2s-md/creationtime=1721564601', # ['@s2s-md/geoip=cc=GB|cd=United\\sKingdom|asn=16276|asname=OVH\\sSAS;s2s-md/tls_cipher=TLSv1.3-TLS_CHACHA20_POLY1305_SHA256;s2s-md/creationtime=1721564601',
# ':001', 'UID', 'albatros', '0', '1721564597', 'albatros', 'vps-91b2f28b.vps.ovh.net', # ':001', 'UID', 'albatros', '0', '1721564597', 'albatros', 'vps-91b2f28b.vps.ovh.net',
# '001HB8G04', '0', '+iwxz', 'Clk-A62F1D18.vps.ovh.net', 'Clk-A62F1D18.vps.ovh.net', 'MyZBwg==', ':...'] # '001HB8G04', '0', '+iwxz', 'Clk-A62F1D18.vps.ovh.net', 'Clk-A62F1D18.vps.ovh.net', 'MyZBwg==', ':...']
try: try:
scopy = server_msg.copy()
is_webirc = True if 'webirc' in scopy[0] else False
is_websocket = True if 'websocket' in scopy[0] else False
isWebirc = True if 'webirc' in serverMsg[0] else False uid = str(scopy[8])
isWebsocket = True if 'websocket' in serverMsg[0] else False nickname = str(scopy[3])
username = str(scopy[6])
uid = str(serverMsg[8]) hostname = str(scopy[7])
nickname = str(serverMsg[3]) umodes = str(scopy[10])
username = str(serverMsg[6]) vhost = str(scopy[11])
hostname = str(serverMsg[7]) remote_ip = '127.0.0.1' if 'S' in umodes else self._Base.decode_ip(str(scopy[13]))
umodes = str(serverMsg[10])
vhost = str(serverMsg[11])
remote_ip = '127.0.0.1' if 'S' in umodes else self._Base.decode_ip(str(serverMsg[13]))
# extract realname # extract realname
realname = ' '.join(serverMsg[14:]).lstrip(':') realname = ' '.join(scopy[14:]).lstrip(':')
# Extract Geoip information # Extract Geoip information
pattern = r'^.*geoip=cc=(\S{2}).*$' pattern = r'^.*geoip=cc=(\S{2}).*$'
geoip_match = match(pattern, serverMsg[0]) geoip_match = match(pattern, scopy[0])
geoip = geoip_match.group(1) if geoip_match else None geoip = geoip_match.group(1) if geoip_match else None
# Extract Fingerprint information # Extract Fingerprint information
pattern = r'^.*certfp=([^;]+).*$' pattern = r'^.*certfp=([^;]+).*$'
fp_match = match(pattern, serverMsg[0]) fp_match = match(pattern, scopy[0])
fingerprint = fp_match.group(1) if fp_match else None fingerprint = fp_match.group(1) if fp_match else None
# Extract tls_cipher information # Extract tls_cipher information
pattern = r'^.*tls_cipher=([^;]+).*$' pattern = r'^.*tls_cipher=([^;]+).*$'
tlsc_match = match(pattern, serverMsg[0]) tlsc_match = match(pattern, scopy[0])
tls_cipher = tlsc_match.group(1) if tlsc_match else None tls_cipher = tlsc_match.group(1) if tlsc_match else None
score_connexion = self._Irc.first_score score_connexion = self._Irc.first_score
@@ -1122,8 +1119,8 @@ class Unrealircd6(IProtocol):
vhost=vhost, vhost=vhost,
fingerprint=fingerprint, fingerprint=fingerprint,
tls_cipher=tls_cipher, tls_cipher=tls_cipher,
isWebirc=isWebirc, isWebirc=is_webirc,
isWebsocket=isWebsocket, isWebsocket=is_websocket,
remote_ip=remote_ip, remote_ip=remote_ip,
geoip=geoip, geoip=geoip,
score_connexion=score_connexion, score_connexion=score_connexion,
@@ -1134,9 +1131,9 @@ class Unrealircd6(IProtocol):
# Auto Auth admin via fingerprint # Auto Auth admin via fingerprint
dnickname = self._Config.SERVICE_NICKNAME dnickname = self._Config.SERVICE_NICKNAME
dchanlog = self._Config.SERVICE_CHANLOG dchanlog = self._Config.SERVICE_CHANLOG
GREEN = self._Config.COLORS.green green = self._Config.COLORS.green
RED = self._Config.COLORS.red red = self._Config.COLORS.red
NOGC = self._Config.COLORS.nogc nogc = self._Config.COLORS.nogc
# for module in self._Irc.ModuleUtils.model_get_loaded_modules().copy(): # for module in self._Irc.ModuleUtils.model_get_loaded_modules().copy():
# module.class_instance.cmd(serverMsg) # module.class_instance.cmd(serverMsg)
@@ -1144,19 +1141,17 @@ class Unrealircd6(IProtocol):
# SASL authentication # SASL authentication
# ['@s2s-md/..', ':001', 'UID', 'adator__', '0', '1755987444', '...', 'desktop-h1qck20.mshome.net', '001XLTT0U', '0', '+iwxz', '*', 'Clk-EC2256B2.mshome.net', 'rBKAAQ==', ':...'] # ['@s2s-md/..', ':001', 'UID', 'adator__', '0', '1755987444', '...', 'desktop-h1qck20.mshome.net', '001XLTT0U', '0', '+iwxz', '*', 'Clk-EC2256B2.mshome.net', 'rBKAAQ==', ':...']
uid = serverMsg[8]
nickname = serverMsg[3]
sasl_obj = self._Irc.Sasl.get_sasl_obj(uid) sasl_obj = self._Irc.Sasl.get_sasl_obj(uid)
if sasl_obj: if sasl_obj:
if sasl_obj.auth_success: if sasl_obj.auth_success:
self._Irc.insert_db_admin(sasl_obj.client_uid, sasl_obj.username, sasl_obj.level, sasl_obj.language) self._Irc.insert_db_admin(sasl_obj.client_uid, sasl_obj.username, sasl_obj.level, sasl_obj.language)
self.send_priv_msg(nick_from=dnickname, self.send_priv_msg(nick_from=dnickname,
msg=tr("[ %sSASL AUTH%s ] - %s (%s) is now connected successfuly to %s", GREEN, NOGC, nickname, sasl_obj.username, dnickname), msg=tr("[ %sSASL AUTH%s ] - %s (%s) is now connected successfuly to %s", green, nogc, nickname, sasl_obj.username, dnickname),
channel=dchanlog) channel=dchanlog)
self.send_notice(nick_from=dnickname, nick_to=nickname, msg=tr("Successfuly connected to %s", dnickname)) self.send_notice(nick_from=dnickname, nick_to=nickname, msg=tr("Successfuly connected to %s", dnickname))
else: else:
self.send_priv_msg(nick_from=dnickname, self.send_priv_msg(nick_from=dnickname,
msg=tr("[ %sSASL AUTH%s ] - %s provided a wrong password for this username %s", RED, NOGC, nickname, sasl_obj.username), msg=tr("[ %sSASL AUTH%s ] - %s provided a wrong password for this username %s", red, nogc, nickname, sasl_obj.username),
channel=dchanlog) channel=dchanlog)
self.send_notice(nick_from=dnickname, nick_to=nickname, msg=tr("Wrong password!")) self.send_notice(nick_from=dnickname, nick_to=nickname, msg=tr("Wrong password!"))
@@ -1169,7 +1164,7 @@ class Unrealircd6(IProtocol):
admin = self._Irc.Admin.get_admin(uid) admin = self._Irc.Admin.get_admin(uid)
account = admin.account if admin else '' account = admin.account if admin else ''
self.send_priv_msg(nick_from=dnickname, self.send_priv_msg(nick_from=dnickname,
msg=tr("[ %sFINGERPRINT AUTH%s ] - %s (%s) is now connected successfuly to %s", GREEN, NOGC, nickname, account, dnickname), msg=tr("[ %sFINGERPRINT AUTH%s ] - %s (%s) is now connected successfuly to %s", green, nogc, nickname, account, dnickname),
channel=dchanlog) channel=dchanlog)
self.send_notice(nick_from=dnickname, nick_to=nickname, msg=tr("Successfuly connected to %s", dnickname)) self.send_notice(nick_from=dnickname, nick_to=nickname, msg=tr("Successfuly connected to %s", dnickname))
@@ -1179,22 +1174,22 @@ class Unrealircd6(IProtocol):
except Exception as err: except Exception as err:
self._Logs.error(f"{__name__} - General Error: {err}") self._Logs.error(f"{__name__} - General Error: {err}")
def on_privmsg(self, serverMsg: list[str]) -> None: def on_privmsg(self, server_msg: list[str]) -> None:
"""Handle PRIVMSG message coming from the server """Handle PRIVMSG message coming from the server
Args: Args:
serverMsg (list[str]): Original server message server_msg (list[str]): Original server message
""" """
srv_msg = server_msg.copy()
cmd = server_msg.copy()
try: try:
srv_msg = serverMsg.copy()
cmd = serverMsg.copy()
# Supprimer la premiere valeur si MTAGS activé # Supprimer la premiere valeur si MTAGS activé
if cmd[0].startswith('@'): if cmd[0].startswith('@'):
cmd.pop(0) cmd.pop(0)
get_uid_or_nickname = str(cmd[0].replace(':','')) get_uid_or_nickname = str(cmd[0].replace(':',''))
user_trigger = self._Irc.User.get_nickname(get_uid_or_nickname) user_trigger = self._Irc.User.get_nickname(get_uid_or_nickname)
dnickname = self._Config.SERVICE_NICKNAME
pattern = fr'(:\{self._Config.SERVICE_PREFIX})(.*)$' pattern = fr'(:\{self._Config.SERVICE_PREFIX})(.*)$'
hcmds = search(pattern, ' '.join(cmd)) # va matcher avec tout les caractéres aprés le . hcmds = search(pattern, ' '.join(cmd)) # va matcher avec tout les caractéres aprés le .
@@ -1267,46 +1262,47 @@ class Unrealircd6(IProtocol):
except Exception as err: except Exception as err:
self._Logs.error(f"General Error: {err} - {srv_msg}" , exc_info=True) self._Logs.error(f"General Error: {err} - {srv_msg}" , exc_info=True)
def on_server_ping(self, serverMsg: list[str]) -> None: def on_server_ping(self, server_msg: list[str]) -> None:
"""Send a PONG message to the server """Send a PONG message to the server
Args: Args:
serverMsg (list[str]): List of str coming from the server server_msg (list[str]): List of str coming from the server
""" """
try: try:
pong = str(serverMsg[1]).replace(':','') scopy = server_msg.copy()
pong = str(scopy[1]).replace(':','')
self.send2socket(f"PONG :{pong}", print_log=False) self.send2socket(f"PONG :{pong}", print_log=False)
return None return None
except Exception as err: except Exception as err:
self._Logs.error(f"{__name__} - General Error: {err}") self._Logs.error(f"{__name__} - General Error: {err}")
def on_server(self, serverMsg: list[str]) -> None: def on_server(self, server_msg: list[str]) -> None:
"""_summary_ """_summary_
Args: Args:
serverMsg (list[str]): _description_ server_msg (list[str]): _description_
""" """
try: try:
# ['SERVER', 'irc.local.org', '1', ':U6100-Fhn6OoE-001', 'Local', 'Server'] # ['SERVER', 'irc.local.org', '1', ':U6100-Fhn6OoE-001', 'Local', 'Server']
sCopy = serverMsg.copy() scopy = server_msg.copy()
self._Irc.Settings.MAIN_SERVER_HOSTNAME = sCopy[1] self._Irc.Settings.MAIN_SERVER_HOSTNAME = scopy[1]
except Exception as err: except Exception as err:
self._Logs.error(f'General Error: {err}') self._Logs.error(f'General Error: {err}')
def on_version(self, serverMsg: list[str]) -> None: def on_version(self, server_msg: list[str]) -> None:
"""Sending Server Version to the server """Sending Server Version to the server
Args: Args:
serverMsg (list[str]): List of str coming from the server server_msg (list[str]): List of str coming from the server
""" """
# ['@unrealircd.org/userhost=StatServ@stats.deb.biz.st;draft/bot;bot;msgid=ehfAq3m2yjMjhgWEfi1UCS;time=2024-10-26T13:49:06.299Z', ':00BAAAAAI', 'PRIVMSG', '12ZAAAAAB', ':\x01VERSION\x01'] # ['@unrealircd.org/userhost=StatServ@stats.deb.biz.st;draft/bot;bot;msgid=ehfAq3m2yjMjhgWEfi1UCS;time=2024-10-26T13:49:06.299Z', ':00BAAAAAI', 'PRIVMSG', '12ZAAAAAB', ':\x01VERSION\x01']
# Réponse a un CTCP VERSION # Réponse a un CTCP VERSION
try: try:
scopy = server_msg.copy()
nickname = self._Irc.User.get_nickname(self._Utils.clean_uid(serverMsg[1])) nickname = self._Irc.User.get_nickname(self._Utils.clean_uid(scopy[1]))
dnickname = self._Config.SERVICE_NICKNAME dnickname = self._Config.SERVICE_NICKNAME
arg = serverMsg[4].replace(':', '') arg = scopy[4].replace(':', '')
if nickname is None: if nickname is None:
return None return None
@@ -1318,19 +1314,19 @@ class Unrealircd6(IProtocol):
except Exception as err: except Exception as err:
self._Logs.error(f"{__name__} - General Error: {err}") self._Logs.error(f"{__name__} - General Error: {err}")
def on_time(self, serverMsg: list[str]) -> None: def on_time(self, server_msg: list[str]) -> None:
"""Sending TIME answer to a requestor """Sending TIME answer to a requestor
Args: Args:
serverMsg (list[str]): List of str coming from the server server_msg (list[str]): List of str coming from the server
""" """
# ['@unrealircd.org/userhost=StatServ@stats.deb.biz.st;draft/bot;bot;msgid=ehfAq3m2yjMjhgWEfi1UCS;time=2024-10-26T13:49:06.299Z', ':00BAAAAAI', 'PRIVMSG', '12ZAAAAAB', ':\x01TIME\x01'] # ['@unrealircd.org/userhost=StatServ@stats.deb.biz.st;draft/bot;bot;msgid=ehfAq3m2yjMjhgWEfi1UCS;time=2024-10-26T13:49:06.299Z', ':00BAAAAAI', 'PRIVMSG', '12ZAAAAAB', ':\x01TIME\x01']
# Réponse a un CTCP VERSION # Réponse a un CTCP VERSION
try: try:
scopy = server_msg.copy()
nickname = self._Irc.User.get_nickname(self._Utils.clean_uid(serverMsg[1])) nickname = self._Irc.User.get_nickname(self._Utils.clean_uid(scopy[1]))
dnickname = self._Config.SERVICE_NICKNAME dnickname = self._Config.SERVICE_NICKNAME
arg = serverMsg[4].replace(':', '') arg = scopy[4].replace(':', '')
current_datetime = self._Utils.get_sdatetime() current_datetime = self._Utils.get_sdatetime()
if nickname is None: if nickname is None:
@@ -1343,26 +1339,26 @@ class Unrealircd6(IProtocol):
except Exception as err: except Exception as err:
self._Logs.error(f"{__name__} - General Error: {err}") self._Logs.error(f"{__name__} - General Error: {err}")
def on_ping(self, serverMsg: list[str]) -> None: def on_ping(self, server_msg: list[str]) -> None:
"""Sending a PING answer to requestor """Sending a PING answer to requestor
Args: Args:
serverMsg (list[str]): List of str coming from the server server_msg (list[str]): List of str coming from the server
""" """
# ['@unrealircd.org/...', ':001INC60B', 'PRIVMSG', '12ZAAAAAB', ':\x01PING', '762382207\x01'] # ['@unrealircd.org/...', ':001INC60B', 'PRIVMSG', '12ZAAAAAB', ':\x01PING', '762382207\x01']
# Réponse a un CTCP VERSION # Réponse a un CTCP VERSION
try: try:
scopy = server_msg.copy()
nickname = self._Irc.User.get_nickname(self._Utils.clean_uid(serverMsg[1])) nickname = self._Irc.User.get_nickname(self._Utils.clean_uid(scopy[1]))
dnickname = self._Config.SERVICE_NICKNAME dnickname = self._Config.SERVICE_NICKNAME
arg = serverMsg[4].replace(':', '') arg = scopy[4].replace(':', '')
if nickname is None: if nickname is None:
self._Logs.debug(serverMsg) self._Logs.debug(scopy)
return None return None
if arg == '\x01PING': if arg == '\x01PING':
recieved_unixtime = int(serverMsg[5].replace('\x01','')) recieved_unixtime = int(scopy[5].replace('\x01',''))
current_unixtime = self._Utils.get_unixtime() current_unixtime = self._Utils.get_unixtime()
ping_response = current_unixtime - recieved_unixtime ping_response = current_unixtime - recieved_unixtime
@@ -1372,69 +1368,68 @@ class Unrealircd6(IProtocol):
nick_to=nickname, nick_to=nickname,
msg=f"\x01PING {ping_response} secs\x01" msg=f"\x01PING {ping_response} secs\x01"
) )
self._Logs.debug(serverMsg) self._Logs.debug(scopy)
return None return None
except Exception as err: except Exception as err:
self._Logs.error(f"{__name__} - General Error: {err}") self._Logs.error(f"{__name__} - General Error: {err}")
def on_version_msg(self, serverMsg: list[str]) -> None: def on_version_msg(self, server_msg: list[str]) -> None:
"""Handle version coming from the server """Handle version coming from the server
\n ex. /version Defender \n ex. /version Defender
Args: Args:
serverMsg (list[str]): Original message from the server server_msg (list[str]): Original message from the server
""" """
try: try:
# ['@label=0073', ':0014E7P06', 'VERSION', 'PyDefender'] # ['@label=0073', ':0014E7P06', 'VERSION', 'PyDefender']
serverMsg_copy = serverMsg.copy() scopy = server_msg.copy()
if '@' in list(serverMsg_copy[0])[0]: if '@' in list(scopy[0])[0]:
serverMsg_copy.pop(0) scopy.pop(0)
getUser = self._Irc.User.get_user(self._Utils.clean_uid(serverMsg_copy[0])) u = self._Irc.User.get_user(self._Utils.clean_uid(scopy[0]))
if getUser is None: if u is None:
return None return None
response_351 = f"{self._Config.SERVICE_NAME.capitalize()}-{self._Config.CURRENT_VERSION} {self._Config.SERVICE_HOST} {self.name}" response_351 = f"{self._Config.SERVICE_NAME.capitalize()}-{self._Config.CURRENT_VERSION} {self._Config.SERVICE_HOST} {self.name}"
self.send2socket(f':{self._Config.SERVICE_HOST} 351 {getUser.nickname} {response_351}') self.send2socket(f':{self._Config.SERVICE_HOST} 351 {u.nickname} {response_351}')
modules = self._Irc.ModuleUtils.get_all_available_modules() modules = self._Irc.ModuleUtils.get_all_available_modules()
response_005 = ' | '.join(modules) response_005 = ' | '.join(modules)
self.send2socket(f':{self._Config.SERVICE_HOST} 005 {getUser.nickname} {response_005} are supported by this server') self.send2socket(f':{self._Config.SERVICE_HOST} 005 {u.nickname} {response_005} are supported by this server')
response_005 = ''.join(self._Settings.PROTOCTL_USER_MODES) response_005 = ''.join(self._Settings.PROTOCTL_USER_MODES)
self.send2socket(f":{self._Config.SERVICE_HOST} 005 {getUser.nickname} {response_005} are supported by this server") self.send2socket(f":{self._Config.SERVICE_HOST} 005 {u.nickname} {response_005} are supported by this server")
return None return None
except Exception as err: except Exception as err:
self._Logs.error(f"{__name__} - General Error: {err}") self._Logs.error(f"{__name__} - General Error: {err}")
def on_smod(self, serverMsg: list[str]) -> None: def on_smod(self, server_msg: list[str]) -> None:
"""Handle SMOD message coming from the server """Handle SMOD message coming from the server
Args: Args:
serverMsg (list[str]): Original server message server_msg (list[str]): Original server message
""" """
try: try:
# [':001', 'SMOD', ':L:history_backend_mem:2.0', 'L:channeldb:1.0', 'L:tkldb:1.10', 'L:staff:3.8', 'L:ircops:3.71', ...] # [':001', 'SMOD', ':L:history_backend_mem:2.0', 'L:channeldb:1.0', 'L:tkldb:1.10', 'L:staff:3.8', 'L:ircops:3.71', ...]
sCopy = serverMsg.copy() scopy = server_msg.copy()
modules = [m.lstrip(':') for m in sCopy[2:]] modules = [m.lstrip(':') for m in scopy[2:]]
for smod in modules: for smod in modules:
smod_split = smod.split(':') smod_split = smod.split(':')
sModObj = self._Irc.Loader.Definition.MSModule(type=smod_split[0], name=smod_split[1], version=smod_split[2]) smodobj = self._Irc.Loader.Definition.MSModule(type=smod_split[0], name=smod_split[1], version=smod_split[2])
self._Settings.SMOD_MODULES.append(sModObj) self._Settings.SMOD_MODULES.append(smodobj)
except Exception as err: except Exception as err:
self._Logs.error(f'General Error: {err}') self._Logs.error(f'General Error: {err}')
def on_sasl(self, serverMsg: list[str]) -> Optional['MSasl']: def on_sasl(self, server_msg: list[str]) -> Optional['MSasl']:
"""Handle SASL coming from a server """Handle SASL coming from a server
Args: Args:
serverMsg (list[str]): Original server message server_msg (list[str]): Original server message
psasl (Sasl): The SASL process object
""" """
try: try:
# [':irc.local.org', 'SASL', 'defender-dev.deb.biz.st', '00157Z26U', 'H', '172.18.128.1', '172.18.128.1'] # [':irc.local.org', 'SASL', 'defender-dev.deb.biz.st', '00157Z26U', 'H', '172.18.128.1', '172.18.128.1']
@@ -1442,6 +1437,7 @@ class Unrealircd6(IProtocol):
# [':irc.local.org', 'SASL', 'defender-dev.deb.biz.st', '0014ZZH1F', 'S', 'EXTERNAL', 'zzzzzzzkey'] # [':irc.local.org', 'SASL', 'defender-dev.deb.biz.st', '0014ZZH1F', 'S', 'EXTERNAL', 'zzzzzzzkey']
# [':irc.local.org', 'SASL', 'defender-dev.deb.biz.st', '00157Z26U', 'C', 'sasakey=='] # [':irc.local.org', 'SASL', 'defender-dev.deb.biz.st', '00157Z26U', 'C', 'sasakey==']
# [':irc.local.org', 'SASL', 'defender-dev.deb.biz.st', '00157Z26U', 'D', 'A'] # [':irc.local.org', 'SASL', 'defender-dev.deb.biz.st', '00157Z26U', 'D', 'A']
scopy = server_msg.copy()
psasl = self._Irc.Sasl psasl = self._Irc.Sasl
sasl_enabled = False sasl_enabled = False
for smod in self._Settings.SMOD_MODULES: for smod in self._Settings.SMOD_MODULES:
@@ -1452,10 +1448,8 @@ class Unrealircd6(IProtocol):
if not sasl_enabled: if not sasl_enabled:
return None return None
sCopy = serverMsg.copy() client_uid = scopy[3] if len(scopy) >= 6 else None
client_uid = sCopy[3] if len(sCopy) >= 6 else None sasl_message_type = scopy[4] if len(scopy) >= 6 else None
sasl_obj = None
sasl_message_type = sCopy[4] if len(sCopy) >= 6 else None
psasl.insert_sasl_client(self._Irc.Loader.Definition.MSasl(client_uid=client_uid)) psasl.insert_sasl_client(self._Irc.Loader.Definition.MSasl(client_uid=client_uid))
sasl_obj = psasl.get_sasl_obj(client_uid) sasl_obj = psasl.get_sasl_obj(client_uid)
@@ -1464,22 +1458,22 @@ class Unrealircd6(IProtocol):
match sasl_message_type: match sasl_message_type:
case 'H': case 'H':
sasl_obj.remote_ip = str(sCopy[5]) sasl_obj.remote_ip = str(scopy[5])
sasl_obj.message_type = sasl_message_type sasl_obj.message_type = sasl_message_type
return sasl_obj return sasl_obj
case 'S': case 'S':
sasl_obj.message_type = sasl_message_type sasl_obj.message_type = sasl_message_type
if str(sCopy[5]) in ['PLAIN', 'EXTERNAL']: if str(scopy[5]) in ['PLAIN', 'EXTERNAL']:
sasl_obj.mechanisme = str(sCopy[5]) sasl_obj.mechanisme = str(scopy[5])
if sasl_obj.mechanisme == "PLAIN": if sasl_obj.mechanisme == "PLAIN":
self.send2socket(f":{self._Config.SERVEUR_LINK} SASL {self._Settings.MAIN_SERVER_HOSTNAME} {sasl_obj.client_uid} C +") self.send2socket(f":{self._Config.SERVEUR_LINK} SASL {self._Settings.MAIN_SERVER_HOSTNAME} {sasl_obj.client_uid} C +")
elif sasl_obj.mechanisme == "EXTERNAL": elif sasl_obj.mechanisme == "EXTERNAL":
if str(sCopy[5]) == "+": if str(scopy[5]) == "+":
return None return None
sasl_obj.fingerprint = str(sCopy[6]) sasl_obj.fingerprint = str(scopy[6])
self.send2socket(f":{self._Config.SERVEUR_LINK} SASL {self._Settings.MAIN_SERVER_HOSTNAME} {sasl_obj.client_uid} C +") self.send2socket(f":{self._Config.SERVEUR_LINK} SASL {self._Settings.MAIN_SERVER_HOSTNAME} {sasl_obj.client_uid} C +")
self.on_sasl_authentication_process(sasl_obj) self.on_sasl_authentication_process(sasl_obj)
@@ -1487,7 +1481,7 @@ class Unrealircd6(IProtocol):
case 'C': case 'C':
if sasl_obj.mechanisme == "PLAIN": if sasl_obj.mechanisme == "PLAIN":
credentials = sCopy[5] credentials = scopy[5]
decoded_credentials = b64decode(credentials).decode() decoded_credentials = b64decode(credentials).decode()
user, username, password = decoded_credentials.split('\0') user, username, password = decoded_credentials.split('\0')
@@ -1506,7 +1500,7 @@ class Unrealircd6(IProtocol):
except Exception as err: except Exception as err:
self._Logs.error(f'General Error: {err}', exc_info=True) self._Logs.error(f'General Error: {err}', exc_info=True)
def on_sasl_authentication_process(self, sasl_model: 'MSasl') -> bool: def on_sasl_authentication_process(self, sasl_model: 'MSasl') -> None:
s = sasl_model s = sasl_model
if sasl_model: if sasl_model:
def db_get_admin_info(*, username: Optional[str] = None, password: Optional[str] = None, fingerprint: Optional[str] = None) -> Optional[dict[str, Any]]: def db_get_admin_info(*, username: Optional[str] = None, password: Optional[str] = None, fingerprint: Optional[str] = None) -> Optional[dict[str, Any]]:
@@ -1553,15 +1547,15 @@ class Unrealircd6(IProtocol):
self.send2socket(f":{self._Config.SERVEUR_LINK} SASL {self._Settings.MAIN_SERVER_HOSTNAME} {s.client_uid} D F") self.send2socket(f":{self._Config.SERVEUR_LINK} SASL {self._Settings.MAIN_SERVER_HOSTNAME} {s.client_uid} D F")
self.send2socket(f":{self._Config.SERVEUR_LINK} 904 {s.username} :SASL authentication failed") self.send2socket(f":{self._Config.SERVEUR_LINK} 904 {s.username} :SASL authentication failed")
def on_md(self, serverMsg: list[str]) -> None: def on_md(self, server_msg: list[str]) -> None:
"""Handle MD responses """Handle MD responses
[':001', 'MD', 'client', '001MYIZ03', 'certfp', ':d1235648...'] [':001', 'MD', 'client', '001MYIZ03', 'certfp', ':d1235648...']
Args: Args:
serverMsg (list[str]): The server reply server_msg (list[str]): The server reply
""" """
try: try:
scopy = serverMsg.copy() scopy = server_msg.copy()
available_vars = ['creationtime', 'certfp', 'tls_cipher'] # available_vars = ['creationtime', 'certfp', 'tls_cipher']
uid = str(scopy[3]) uid = str(scopy[3])
var = str(scopy[4]).lower() var = str(scopy[4]).lower()
@@ -1583,14 +1577,14 @@ class Unrealircd6(IProtocol):
except Exception as e: except Exception as e:
self._Logs.error(f"General Error: {e}") self._Logs.error(f"General Error: {e}")
def on_kick(self, serverMsg: list[str]) -> None: def on_kick(self, server_msg: list[str]) -> None:
"""When a user is kicked out from a channel """When a user is kicked out from a channel
['@unrealircd.org/issued-by=RPC:admin-for-test@...', ':001', 'KICK', '#jsonrpc', '001ELW13T', ':Kicked', 'from', 'JSONRPC', 'User'] ['@unrealircd.org/issued-by=RPC:admin-for-test@...', ':001', 'KICK', '#jsonrpc', '001ELW13T', ':Kicked', 'from', 'JSONRPC', 'User']
Args: Args:
serverMsg (list[str]): The server message server_msg (list[str]): The server message
""" """
scopy = serverMsg.copy() scopy = server_msg.copy()
uid = scopy[4] uid = scopy[4]
channel = scopy[3] channel = scopy[3]