Add command handler system. Starting adapt the modules to fit other protocls.

This commit is contained in:
adator
2025-09-09 22:37:41 +02:00
parent 6b7fd16a44
commit fd9643eddc
18 changed files with 1319 additions and 262 deletions

5
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,5 @@
{
"editor.fontFamily": "Fira Code",
"editor.fontSize": 14,
"editor.cursorStyle": "block"
}

View File

@@ -0,0 +1,48 @@
from typing import Optional, TYPE_CHECKING
if TYPE_CHECKING:
from core.definition import MIrcdCommand
from core.loader import Loader
class CommandHandler:
DB_IRCDCOMMS: list['MIrcdCommand'] = []
DB_SUBSCRIBE: list = []
def __init__(self, loader: 'Loader'):
self.__Logs = loader.Logs
def register(self, ircd_command_model: 'MIrcdCommand') -> None:
"""Register a new command in the Handler
Args:
ircd_command_model (MIrcdCommand): The IRCD Command Object
"""
ircd_command = self.get_registred_ircd_command(ircd_command_model.command_name)
if ircd_command is None:
self.__Logs.debug(f'[IRCD COMMAND HANDLER] New IRCD command ({ircd_command_model.command_name}) added to the handler.')
self.DB_IRCDCOMMS.append(ircd_command_model)
return None
else:
self.__Logs.debug(f'[IRCD COMMAND HANDLER] This IRCD command ({ircd_command.command_name}) already exist in the handler.')
def get_registred_ircd_command(self, command_name: str) -> Optional['MIrcdCommand']:
"""Get the registred IRCD command model
Returns:
MIrcdCommand: The IRCD Command object
"""
com = command_name.upper()
for ircd_com in self.DB_IRCDCOMMS:
if com == ircd_com.command_name.upper():
return ircd_com
return None
def get_ircd_commands(self) -> list['MIrcdCommand']:
"""Get the list of IRCD Commands
Returns:
list[MIrcdCommand]: a list of all registred commands
"""
return self.DB_IRCDCOMMS.copy()

File diff suppressed because it is too large Load Diff

View File

@@ -3,21 +3,31 @@ from typing import Optional, TYPE_CHECKING
if TYPE_CHECKING: if TYPE_CHECKING:
from core.classes.sasl import Sasl from core.classes.sasl import Sasl
from core.definition import MClient, MSasl from core.definition import MClient, MSasl, MRegister
from core.classes.protocols.command_handler import CommandHandler
class IProtocol(ABC): class IProtocol(ABC):
DB_REGISTER: list['MRegister'] = []
Handler: Optional['CommandHandler'] = None
@abstractmethod @abstractmethod
def get_ircd_protocol_poisition(self, cmd: list[str]) -> tuple[int, Optional[str]]: def get_ircd_protocol_poisition(self, cmd: list[str], log: bool = False) -> tuple[int, Optional[str]]:
"""Get the position of known commands """Get the position of known commands
Args: Args:
cmd (list[str]): The server response cmd (list[str]): The server response
log (bool): If true it will log in the logger
Returns: Returns:
tuple[int, Optional[str]]: The position and the command. tuple[int, Optional[str]]: The position and the command.
""" """
@abstractmethod
def register_command(self):
"""Register all commands that you need to handle
"""
@abstractmethod @abstractmethod
def send2socket(self, message: str, print_log: bool = True) -> None: def send2socket(self, message: str, print_log: bool = True) -> None:
"""Envoit les commandes à envoyer au serveur. """Envoit les commandes à envoyer au serveur.
@@ -74,6 +84,17 @@ class IProtocol(ABC):
newnickname (str): New nickname of the server newnickname (str): New nickname of the server
""" """
@abstractmethod
def send_set_mode(self, modes: str, *, nickname: Optional[str] = None, channel_name: Optional[str] = None, params: Optional[str] = None) -> None:
"""Set a mode to channel or to a nickname or for a user in a channel
Args:
modes (str): The selected mode
nickname (Optional[str]): The nickname
channel_name (Optional[str]): The channel name
params (Optional[str]): Parameters like password.
"""
@abstractmethod @abstractmethod
def send_squit(self, server_id: str, server_link: str, reason: str) -> None: def send_squit(self, server_id: str, server_link: str, reason: str) -> None:
"""_summary_ """_summary_
@@ -256,15 +277,72 @@ class IProtocol(ABC):
@abstractmethod @abstractmethod
def send_raw(self, raw_command: str) -> None: def send_raw(self, raw_command: str) -> None:
"""_summary_ """Send raw message to the server
Args: Args:
raw_command (str): _description_ raw_command (str): The raw command you want to send.
""" """
##################### # ------------------------------------------------------------------------
# HANDLE EVENTS # # COMMON IRC PARSER
##################### # ------------------------------------------------------------------------
@abstractmethod
def parse_uid(self, serverMsg: list[str]) -> dict[str, str]:
"""Parse UID and return dictionary.
Args:
serverMsg (list[str]): The UID IRCD message
Returns:
dict[str, str]: The response as dictionary.
"""
@abstractmethod
def parse_quit(self, serverMsg: list[str]) -> dict[str, str]:
"""Parse quit and return dictionary.
>>> [':97KAAAAAB', 'QUIT', ':Quit:', 'this', 'is', 'my', 'reason', 'to', 'quit']
Args:
serverMsg (list[str]): The server message to parse
Returns:
dict[str, str]: The response as dictionary.
"""
@abstractmethod
def parse_nick(self, serverMsg: list[str]) -> dict[str, str]:
"""Parse nick changes and return dictionary.
>>> [':97KAAAAAC', 'NICK', 'testinspir', '1757360740']
Args:
serverMsg (list[str]): The server message to parse
Returns:
dict[str, str]: The response as dictionary.
"""
@abstractmethod
def parse_privmsg(self, serverMsg: list[str]) -> dict[str, str]:
"""Parse PRIVMSG message.
>>> [':97KAAAAAE', 'PRIVMSG', '#welcome', ':This', 'is', 'my', 'public', 'message']
Args:
serverMsg (list[str]): The server message to parse
Returns:
dict[str, str]: The response as dictionary.
```python
response = {
"uid": '97KAAAAAE',
"channel": '#welcome',
"message": 'This is my public message'
}
```
"""
# ------------------------------------------------------------------------
# EVENT HANDLER
# ------------------------------------------------------------------------
@abstractmethod @abstractmethod
def on_svs2mode(self, serverMsg: list[str]) -> None: def on_svs2mode(self, serverMsg: list[str]) -> None:
@@ -438,6 +516,17 @@ class IProtocol(ABC):
psasl (Sasl): The SASL process object psasl (Sasl): The SASL process object
""" """
@abstractmethod
def on_sasl_authentication_process(self, sasl_model: 'MSasl') -> bool:
"""Finalize sasl authentication
Args:
sasl_model (MSasl): The sasl dataclass model
Returns:
bool: True if success
"""
@abstractmethod @abstractmethod
def on_md(self, serverMsg: list[str]) -> None: def on_md(self, serverMsg: list[str]) -> None:
"""Handle MD responses """Handle MD responses

View File

@@ -1,9 +1,10 @@
from base64 import b64decode from base64 import b64decode
from re import match, findall, search from re import match, findall, search
from datetime import datetime from datetime import datetime
from typing import TYPE_CHECKING, Optional from typing import TYPE_CHECKING, Any, Optional
from ssl import SSLEOFError, SSLError from ssl import SSLEOFError, SSLError
from core.classes.protocols.command_handler import CommandHandler
from core.classes.protocols.interface import IProtocol from core.classes.protocols.interface import IProtocol
from core.utils import tr from core.utils import tr
@@ -32,13 +33,16 @@ class Unrealircd6(IProtocol):
'PROTOCTL', 'SERVER', 'SMOD', 'TKL', 'NETINFO', 'PROTOCTL', 'SERVER', 'SMOD', 'TKL', 'NETINFO',
'006', '007', '018'} '006', '007', '018'}
self.__Logs.info(f"** Loading protocol [{__name__}]") self.Handler = CommandHandler(ircInstance.Loader)
def get_ircd_protocol_poisition(self, cmd: list[str]) -> tuple[int, Optional[str]]: self.__Logs.info(f"[PROTOCOL] Protocol [{__name__}] loaded!")
def get_ircd_protocol_poisition(self, cmd: list[str], log: bool = False) -> tuple[int, Optional[str]]:
"""Get the position of known commands """Get the position of known commands
Args: Args:
cmd (list[str]): The server response cmd (list[str]): The server response
log (bool): If true it will log in the logger
Returns: Returns:
tuple[int, Optional[str]]: The position and the command. tuple[int, Optional[str]]: The position and the command.
@@ -47,10 +51,34 @@ class Unrealircd6(IProtocol):
if token.upper() in self.known_protocol: if token.upper() in self.known_protocol:
return index, token.upper() return index, token.upper()
self.__Logs.debug(f"[IRCD LOGS] You need to handle this response: {cmd}") if log:
self.__Logs.debug(f"[IRCD LOGS] You need to handle this response: {cmd}")
return (-1, None) return (-1, None)
def register_command(self) -> None:
m = self.__Irc.Loader.Definition.MIrcdCommand
self.Handler.register(m(command_name="PING", func=self.on_server_ping))
self.Handler.register(m(command_name="UID", func=self.on_uid))
self.Handler.register(m(command_name="QUIT", func=self.on_quit))
self.Handler.register(m(command_name="SERVER", func=self.on_server))
self.Handler.register(m(command_name="SJOIN", func=self.on_sjoin))
self.Handler.register(m(command_name="EOS", func=self.on_eos))
self.Handler.register(m(command_name="PROTOCTL", func=self.on_protoctl))
self.Handler.register(m(command_name="SVS2MODE", func=self.on_svs2mode))
self.Handler.register(m(command_name="SQUIT", func=self.on_squit))
self.Handler.register(m(command_name="PART", func=self.on_part))
self.Handler.register(m(command_name="VERSION", func=self.on_version_msg))
self.Handler.register(m(command_name="UMODE2", func=self.on_umode2))
self.Handler.register(m(command_name="NICK", func=self.on_nick))
self.Handler.register(m(command_name="REPUTATION", func=self.on_reputation))
self.Handler.register(m(command_name="SMOD", func=self.on_smod))
self.Handler.register(m(command_name="SASL", func=self.on_sasl))
self.Handler.register(m(command_name="MD", func=self.on_md))
self.Handler.register(m(command_name="PRIVMSG", func=self.on_privmsg))
return None
def parse_server_msg(self, server_msg: list[str]) -> Optional[str]: def parse_server_msg(self, server_msg: list[str]) -> Optional[str]:
"""Parse the server message and return the command """Parse the server message and return the command
@@ -229,6 +257,42 @@ class Unrealircd6(IProtocol):
self.__Irc.User.update_nickname(userObj.uid, newnickname) self.__Irc.User.update_nickname(userObj.uid, newnickname)
return None return None
def send_set_mode(self, modes: str, *, nickname: Optional[str] = None, channel_name: Optional[str] = None, params: Optional[str] = None) -> None:
"""Set a mode to channel or to a nickname or for a user in a channel
Args:
modes (str): The selected mode
nickname (Optional[str]): The nickname
channel_name (Optional[str]): The channel name
params (Optional[str]): Parameters like password.
"""
service_id = self.__Config.SERVICE_ID
if modes[0] not in ['+', '-']:
self.__Logs.error(f"[MODE ERROR] The mode you have provided is missing the sign: {modes}")
return None
if nickname and channel_name:
# :98KAAAAAB MODE #services +o defenderdev
if not self.__Irc.Channel.is_valid_channel(channel_name):
self.__Logs.error(f"[MODE ERROR] The channel is not valid: {channel_name}")
return None
self.send2socket(f":{service_id} MODE {channel_name} {modes} {nickname}")
return None
if nickname and channel_name is None:
self.send2socket(f":{service_id} MODE {nickname} {modes}")
return None
if nickname is None and channel_name:
if not self.__Irc.Channel.is_valid_channel(channel_name):
self.__Logs.error(f"[MODE ERROR] The channel is not valid: {channel_name}")
return None
self.send2socket(f":{service_id} MODE {channel_name} {modes} {params}")
return None
return None
def send_squit(self, server_id: str, server_link: str, reason: str) -> None: def send_squit(self, server_id: str, server_link: str, reason: str) -> None:
if not reason: if not reason:
@@ -431,7 +495,7 @@ class Unrealircd6(IProtocol):
reason (str): The reason for the quit reason (str): The reason for the quit
""" """
user_obj = self.__Irc.User.get_user(uidornickname=uid) user_obj = self.__Irc.User.get_user(uidornickname=uid)
reputationObj = self.__Irc.Reputation.get_Reputation(uidornickname=uid) reputationObj = self.__Irc.Reputation.get_reputation(uidornickname=uid)
if not user_obj is None: if not user_obj is None:
self.send2socket(f":{user_obj.uid} QUIT :{reason}", print_log=print_log) self.send2socket(f":{user_obj.uid} QUIT :{reason}", print_log=print_log)
@@ -565,6 +629,103 @@ class Unrealircd6(IProtocol):
return None return None
# ------------------------------------------------------------------------
# COMMON IRC PARSER
# ------------------------------------------------------------------------
def parse_uid(self, serverMsg: list[str]) -> dict[str, str]:
"""Parse UID and return dictionary.
>>> ['@s2s-md/geoip=cc=GBtag...', ':001', 'UID', 'albatros', '0', '1721564597', 'albatros', 'hostname...', '001HB8G04', '0', '+iwxz', 'hostname-vhost', 'hostname-vhost', 'MyZBwg==', ':...']
Args:
serverMsg (list[str]): The UID ircd response
"""
umodes = str(serverMsg[10])
remote_ip = self.__Base.decode_ip(str(serverMsg[13])) if 'S' not in umodes else '127.0.0.1'
# Extract Geoip information
pattern = r'^.*geoip=cc=(\S{2}).*$'
geoip_match = match(pattern, serverMsg[0])
geoip = geoip_match.group(1) if geoip_match else None
response = {
'uid': str(serverMsg[8]),
'nickname': str(serverMsg[3]),
'username': str(serverMsg[6]),
'hostname': str(serverMsg[7]),
'umodes': umodes,
'vhost': str(serverMsg[11]),
'ip': remote_ip,
'realname': ' '.join(serverMsg[12:]).lstrip(':'),
'geoip': geoip,
'reputation_score': 0,
'iswebirc': True if 'webirc' in serverMsg[0] else False,
'iswebsocket': True if 'websocket' in serverMsg[0] else False
}
return response
def parse_quit(self, serverMsg: list[str]) -> dict[str, str]:
"""Parse quit and return dictionary.
>>> # ['@unrealtag...', ':001JKNY0N', 'QUIT', ':Quit:', '....']
Args:
serverMsg (list[str]): The server message to parse
Returns:
dict[str, str]: The dictionary.
"""
scopy = serverMsg.copy()
if scopy[0].startswith('@'):
scopy.pop(0)
response = {
"uid": scopy[0].replace(':', ''),
"reason": " ".join(scopy[3:])
}
return response
def parse_nick(self, serverMsg: list[str]) -> dict[str, str]:
"""Parse nick changes and return dictionary.
>>> ['@unrealircd.org/geoip=FR;unrealircd.org/', ':001OOU2H3', 'NICK', 'WebIrc', '1703795844']
Args:
serverMsg (list[str]): The server message to parse
Returns:
dict[str, str]: The response as dictionary.
"""
scopy = serverMsg.copy()
if scopy[0].startswith('@'):
scopy.pop(0)
response = {
"uid": scopy[0].replace(':', ''),
"newnickname": scopy[2],
"timestamp": scopy[3]
}
return response
def parse_privmsg(self, serverMsg: list[str]) -> dict[str, str]:
"""Parse PRIVMSG message.
>>> ['@....', ':97KAAAAAE', 'PRIVMSG', '#welcome', ':This', 'is', 'my', 'public', 'message']
>>> [':97KAAAAAF', 'PRIVMSG', '98KAAAAAB', ':sasa']
Args:
serverMsg (list[str]): The server message to parse
Returns:
dict[str, str]: The response as dictionary.
"""
scopy = serverMsg.copy()
if scopy[0].startswith('@'):
scopy.pop(0)
response = {
"uid_sender": scopy[0].replace(':', ''),
"uid_reciever": self.__Irc.User.get_uid(scopy[2]),
"channel": scopy[2] if self.__Irc.Channel.is_valid_channel(scopy[2]) else None,
"message": " ".join(scopy[3:])
}
return response
##################### #####################
# HANDLE EVENTS # # HANDLE EVENTS #
##################### #####################
@@ -736,6 +897,7 @@ class Unrealircd6(IProtocol):
self.__Irc.User.update_nickname(uid, newnickname) self.__Irc.User.update_nickname(uid, newnickname)
self.__Irc.Client.update_nickname(uid, newnickname) self.__Irc.Client.update_nickname(uid, newnickname)
self.__Irc.Admin.update_nickname(uid, newnickname) self.__Irc.Admin.update_nickname(uid, newnickname)
self.__Irc.Reputation.update(uid, newnickname)
return None return None
@@ -856,6 +1018,8 @@ class Unrealircd6(IProtocol):
self.__Logs.info(f"# VERSION : {version} ") self.__Logs.info(f"# VERSION : {version} ")
self.__Logs.info(f"################################################") self.__Logs.info(f"################################################")
self.send_sjoin(self.__Config.SERVICE_CHANLOG)
if self.__Base.check_for_new_version(False): if self.__Base.check_for_new_version(False):
self.send_priv_msg( self.send_priv_msg(
nick_from=self.__Config.SERVICE_NICKNAME, nick_from=self.__Config.SERVICE_NICKNAME,
@@ -875,6 +1039,10 @@ class Unrealircd6(IProtocol):
for module in self.__Irc.ModuleUtils.model_get_loaded_modules().copy(): for module in self.__Irc.ModuleUtils.model_get_loaded_modules().copy():
module.class_instance.cmd(server_msg_copy) module.class_instance.cmd(server_msg_copy)
# Join saved channels & load existing modules
self.__Irc.join_saved_channels()
self.__Irc.ModuleUtils.db_load_all_existing_modules(self.__Irc)
return None return None
except IndexError as ie: except IndexError as ie:
self.__Logs.error(f"{__name__} - Key Error: {ie}") self.__Logs.error(f"{__name__} - Key Error: {ie}")
@@ -988,14 +1156,42 @@ class Unrealircd6(IProtocol):
dnickname = self.__Config.SERVICE_NICKNAME dnickname = self.__Config.SERVICE_NICKNAME
dchanlog = self.__Config.SERVICE_CHANLOG dchanlog = self.__Config.SERVICE_CHANLOG
GREEN = self.__Config.COLORS.green GREEN = self.__Config.COLORS.green
RED = self.__Config.COLORS.red
NOGC = self.__Config.COLORS.nogc NOGC = self.__Config.COLORS.nogc
for module in self.__Irc.ModuleUtils.model_get_loaded_modules().copy():
module.class_instance.cmd(serverMsg)
# SASL authentication
# ['@s2s-md/..', ':001', 'UID', 'adator__', '0', '1755987444', '...', 'desktop-h1qck20.mshome.net', '001XLTT0U', '0', '+iwxz', '*', 'Clk-EC2256B2.mshome.net', 'rBKAAQ==', ':...']
uid = serverMsg[8]
nickname = serverMsg[3]
sasl_obj = self.__Irc.Sasl.get_sasl_obj(uid)
if sasl_obj:
if sasl_obj.auth_success:
self.__Irc.insert_db_admin(sasl_obj.client_uid, sasl_obj.username, sasl_obj.level, sasl_obj.language)
self.send_priv_msg(nick_from=dnickname,
msg=tr("[ %sSASL AUTH%s ] - %s (%s) is now connected successfuly to %s", GREEN, NOGC, nickname, sasl_obj.username, dnickname),
channel=dchanlog)
self.send_notice(nick_from=dnickname, nick_to=nickname, msg=tr("Successfuly connected to %s", dnickname))
else:
self.send_priv_msg(nick_from=dnickname,
msg=tr("[ %sSASL AUTH%s ] - %s provided a wrong password for this username %s", RED, NOGC, nickname, sasl_obj.username),
channel=dchanlog)
self.send_notice(nick_from=dnickname, nick_to=nickname, msg=tr("Wrong password!"))
# Delete sasl object!
self.__Irc.Sasl.delete_sasl_client(uid)
return None
# If no sasl authentication then auto connect via fingerprint
if self.__Irc.Admin.db_auth_admin_via_fingerprint(fingerprint, uid): if self.__Irc.Admin.db_auth_admin_via_fingerprint(fingerprint, uid):
admin = self.__Irc.Admin.get_admin(uid) admin = self.__Irc.Admin.get_admin(uid)
account = admin.account if admin else '' account = admin.account if admin else ''
self.send_priv_msg(nick_from=dnickname, self.send_priv_msg(nick_from=dnickname,
msg=tr("[ %sSASL AUTO AUTH%s ] - %s (%s) is now connected successfuly to %s", GREEN, NOGC, nickname, account, dnickname), msg=tr("[ %sSASL AUTO AUTH%s ] - %s (%s) is now connected successfuly to %s", GREEN, NOGC, nickname, account, dnickname),
channel=dchanlog) channel=dchanlog)
self.send_notice(nick_from=dnickname, nick_to=nickname, msg=tr("Successfuly connected to %s", dnickname)) self.send_notice(nick_from=dnickname, nick_to=nickname, msg=tr("Successfuly connected to %s", dnickname))
return None return None
@@ -1254,7 +1450,7 @@ class Unrealircd6(IProtocol):
except Exception as err: except Exception as err:
self.__Logs.error(f'General Error: {err}') self.__Logs.error(f'General Error: {err}')
def on_sasl(self, serverMsg: list[str], psasl: 'Sasl') -> Optional['MSasl']: def on_sasl(self, serverMsg: list[str]) -> Optional['MSasl']:
"""Handle SASL coming from a server """Handle SASL coming from a server
Args: Args:
@@ -1267,7 +1463,7 @@ class Unrealircd6(IProtocol):
# [':irc.local.org', 'SASL', 'defender-dev.deb.biz.st', '0014ZZH1F', 'S', 'EXTERNAL', 'zzzzzzzkey'] # [':irc.local.org', 'SASL', 'defender-dev.deb.biz.st', '0014ZZH1F', 'S', 'EXTERNAL', 'zzzzzzzkey']
# [':irc.local.org', 'SASL', 'defender-dev.deb.biz.st', '00157Z26U', 'C', 'sasakey=='] # [':irc.local.org', 'SASL', 'defender-dev.deb.biz.st', '00157Z26U', 'C', 'sasakey==']
# [':irc.local.org', 'SASL', 'defender-dev.deb.biz.st', '00157Z26U', 'D', 'A'] # [':irc.local.org', 'SASL', 'defender-dev.deb.biz.st', '00157Z26U', 'D', 'A']
psasl = self.__Irc.Sasl
sasl_enabled = False sasl_enabled = False
for smod in self.__Settings.SMOD_MODULES: for smod in self.__Settings.SMOD_MODULES:
if smod.name == 'sasl': if smod.name == 'sasl':
@@ -1307,6 +1503,7 @@ class Unrealircd6(IProtocol):
sasl_obj.fingerprint = str(sCopy[6]) sasl_obj.fingerprint = str(sCopy[6])
self.send2socket(f":{self.__Config.SERVEUR_LINK} SASL {self.__Settings.MAIN_SERVER_HOSTNAME} {sasl_obj.client_uid} C +") self.send2socket(f":{self.__Config.SERVEUR_LINK} SASL {self.__Settings.MAIN_SERVER_HOSTNAME} {sasl_obj.client_uid} C +")
self.on_sasl_authentication_process(sasl_obj)
return sasl_obj return sasl_obj
case 'C': case 'C':
@@ -1319,14 +1516,64 @@ class Unrealircd6(IProtocol):
sasl_obj.username = username sasl_obj.username = username
sasl_obj.password = password sasl_obj.password = password
self.on_sasl_authentication_process(sasl_obj)
return sasl_obj return sasl_obj
elif sasl_obj.mechanisme == "EXTERNAL": elif sasl_obj.mechanisme == "EXTERNAL":
sasl_obj.message_type = sasl_message_type sasl_obj.message_type = sasl_message_type
self.on_sasl_authentication_process(sasl_obj)
return sasl_obj return sasl_obj
except Exception as err: except Exception as err:
self.__Logs.error(f'General Error: {err}', exc_info=True) self.__Logs.error(f'General Error: {err}', exc_info=True)
def on_sasl_authentication_process(self, sasl_model: 'MSasl') -> bool:
s = sasl_model
if sasl_model:
def db_get_admin_info(*, username: Optional[str] = None, password: Optional[str] = None, fingerprint: Optional[str] = None) -> Optional[dict[str, Any]]:
if fingerprint:
mes_donnees = {'fingerprint': fingerprint}
query = f"SELECT user, level, language FROM {self.__Config.TABLE_ADMIN} WHERE fingerprint = :fingerprint"
else:
mes_donnees = {'user': username, 'password': self.__Utils.hash_password(password)}
query = f"SELECT user, level, language FROM {self.__Config.TABLE_ADMIN} WHERE user = :user AND password = :password"
result = self.__Base.db_execute_query(query, mes_donnees)
user_from_db = result.fetchone()
if user_from_db:
return {'user': user_from_db[0], 'level': user_from_db[1], 'language': user_from_db[2]}
else:
return None
if s.message_type == 'C' and s.mechanisme == 'PLAIN':
# Connection via PLAIN
admin_info = db_get_admin_info(username=s.username, password=s.password)
if admin_info is not None:
s.auth_success = True
s.level = admin_info.get('level', 0)
s.language = admin_info.get('language', 'EN')
self.send2socket(f":{self.__Config.SERVEUR_LINK} SASL {self.__Settings.MAIN_SERVER_HOSTNAME} {s.client_uid} D S")
self.send2socket(f":{self.__Config.SERVEUR_LINK} 903 {s.username} :SASL authentication successful")
else:
self.send2socket(f":{self.__Config.SERVEUR_LINK} SASL {self.__Settings.MAIN_SERVER_HOSTNAME} {s.client_uid} D F")
self.send2socket(f":{self.__Config.SERVEUR_LINK} 904 {s.username} :SASL authentication failed")
elif s.message_type == 'S' and s.mechanisme == 'EXTERNAL':
# Connection using fingerprints
admin_info = db_get_admin_info(fingerprint=s.fingerprint)
if admin_info is not None:
s.auth_success = True
s.level = admin_info.get('level', 0)
s.username = admin_info.get('user', None)
s.language = admin_info.get('language', 'EN')
self.send2socket(f":{self.__Config.SERVEUR_LINK} SASL {self.__Settings.MAIN_SERVER_HOSTNAME} {s.client_uid} D S")
self.send2socket(f":{self.__Config.SERVEUR_LINK} 903 {s.username} :SASL authentication successful")
else:
# "904 <nick> :SASL authentication failed"
self.send2socket(f":{self.__Config.SERVEUR_LINK} SASL {self.__Settings.MAIN_SERVER_HOSTNAME} {s.client_uid} D F")
self.send2socket(f":{self.__Config.SERVEUR_LINK} 904 {s.username} :SASL authentication failed")
def on_md(self, serverMsg: list[str]) -> None: def on_md(self, serverMsg: list[str]) -> None:
"""Handle MD responses """Handle MD responses
[':001', 'MD', 'client', '001MYIZ03', 'certfp', ':d1235648...'] [':001', 'MD', 'client', '001MYIZ03', 'certfp', ':d1235648...']

View File

@@ -33,14 +33,8 @@ def restart_service(uplink: 'Irc', reason: str = "Restarting with no reason!") -
uplink.Client.CLIENT_DB.clear() # Clear Client object uplink.Client.CLIENT_DB.clear() # Clear Client object
uplink.Base.garbage_collector_thread() uplink.Base.garbage_collector_thread()
# Reload configuration
uplink.Config = uplink.Loader.ConfModule.Configuration(uplink.Loader).get_config_model()
uplink.Base = uplink.Loader.BaseModule.Base(uplink.Loader)
uplink.Protocol = uplink.Loader.PFactory.get()
uplink.Logs.debug(f'[{uplink.Config.SERVICE_NICKNAME} RESTART]: Reloading configuration!') uplink.Logs.debug(f'[{uplink.Config.SERVICE_NICKNAME} RESTART]: Reloading configuration!')
uplink.Protocol.send_squit(server_id=uplink.Config.SERVEUR_ID, server_link=uplink.Config.SERVEUR_LINK, reason="Defender Power off") uplink.Protocol.send_squit(server_id=uplink.Config.SERVEUR_ID, server_link=uplink.Config.SERVEUR_LINK, reason="Defender Power off")
uplink.Logs.debug('Restarting Defender ...') uplink.Logs.debug('Restarting Defender ...')
uplink.IrcSocket.shutdown(socket.SHUT_RDWR) uplink.IrcSocket.shutdown(socket.SHUT_RDWR)
uplink.IrcSocket.close() uplink.IrcSocket.close()
@@ -49,11 +43,19 @@ def restart_service(uplink: 'Irc', reason: str = "Restarting with no reason!") -
time.sleep(0.5) time.sleep(0.5)
uplink.Logs.warning('-- Waiting for socket to close ...') uplink.Logs.warning('-- Waiting for socket to close ...')
# Reload configuration
uplink.Loader.Config = uplink.Loader.ConfModule.Configuration(uplink.Loader).get_config_model()
uplink.Loader.Base = uplink.Loader.BaseModule.Base(uplink.Loader)
for mod in REHASH_MODULES:
importlib.reload(sys.modules[mod])
uplink.Protocol = uplink.Loader.PFactory.get()
uplink.Protocol.register_command()
uplink.init_service_user() uplink.init_service_user()
uplink.Utils.create_socket(uplink) uplink.Utils.create_socket(uplink)
uplink.Protocol.send_link() uplink.Protocol.send_link()
uplink.join_saved_channels()
uplink.ModuleUtils.db_load_all_existing_modules(uplink)
uplink.Config.DEFENDER_RESTART = 0 uplink.Config.DEFENDER_RESTART = 0
def rehash_service(uplink: 'Irc', nickname: str) -> None: def rehash_service(uplink: 'Irc', nickname: str) -> None:
@@ -70,13 +72,13 @@ def rehash_service(uplink: 'Irc', nickname: str) -> None:
channel=uplink.Config.SERVICE_CHANLOG channel=uplink.Config.SERVICE_CHANLOG
) )
uplink.Utils = sys.modules['core.utils'] uplink.Utils = sys.modules['core.utils']
uplink.Config = uplink.Loader.ConfModule.Configuration(uplink.Loader).get_config_model() uplink.Loader.Config = uplink.Loader.ConfModule.Configuration(uplink.Loader).get_config_model()
uplink.Config.HSID = config_model_bakcup.HSID uplink.Loader.Config.HSID = config_model_bakcup.HSID
uplink.Config.DEFENDER_INIT = config_model_bakcup.DEFENDER_INIT uplink.Loader.Config.DEFENDER_INIT = config_model_bakcup.DEFENDER_INIT
uplink.Config.DEFENDER_RESTART = config_model_bakcup.DEFENDER_RESTART uplink.Loader.Config.DEFENDER_RESTART = config_model_bakcup.DEFENDER_RESTART
uplink.Config.SSL_VERSION = config_model_bakcup.SSL_VERSION uplink.Loader.Config.SSL_VERSION = config_model_bakcup.SSL_VERSION
uplink.Config.CURRENT_VERSION = config_model_bakcup.CURRENT_VERSION uplink.Loader.Config.CURRENT_VERSION = config_model_bakcup.CURRENT_VERSION
uplink.Config.LATEST_VERSION = config_model_bakcup.LATEST_VERSION uplink.Loader.Config.LATEST_VERSION = config_model_bakcup.LATEST_VERSION
conf_bkp_dict: dict = config_model_bakcup.to_dict() conf_bkp_dict: dict = config_model_bakcup.to_dict()
config_dict: dict = uplink.Config.to_dict() config_dict: dict = uplink.Config.to_dict()
@@ -105,8 +107,9 @@ def rehash_service(uplink: 'Irc', nickname: str) -> None:
uplink.Commands = uplink.Loader.CommandModule.Command(uplink.Loader) uplink.Commands = uplink.Loader.CommandModule.Command(uplink.Loader)
uplink.Commands.DB_COMMANDS = uplink.Settings.get_cache('db_commands') uplink.Commands.DB_COMMANDS = uplink.Settings.get_cache('db_commands')
uplink.Base = uplink.Loader.BaseModule.Base(uplink.Loader) uplink.Loader.Base = uplink.Loader.BaseModule.Base(uplink.Loader)
uplink.Protocol = uplink.Loader.PFactory.get() uplink.Protocol = uplink.Loader.PFactory.get()
uplink.Protocol.register_command()
# Reload Service modules # Reload Service modules
for module in uplink.ModuleUtils.model_get_loaded_modules().copy(): for module in uplink.ModuleUtils.model_get_loaded_modules().copy():

View File

@@ -53,7 +53,7 @@ class Reputation:
bool: True if updated bool: True if updated
""" """
reputation_obj = self.get_Reputation(uid) reputation_obj = self.get_reputation(uid)
if reputation_obj is None: if reputation_obj is None:
return False return False
@@ -89,7 +89,7 @@ class Reputation:
return result return result
def get_Reputation(self, uidornickname: str) -> Optional[MReputation]: def get_reputation(self, uidornickname: str) -> Optional[MReputation]:
"""Get The User Object model """Get The User Object model
Args: Args:
@@ -116,7 +116,7 @@ class Reputation:
str|None: Return the UID str|None: Return the UID
""" """
reputation_obj = self.get_Reputation(uidornickname) reputation_obj = self.get_reputation(uidornickname)
if reputation_obj is None: if reputation_obj is None:
return None return None
@@ -132,7 +132,7 @@ class Reputation:
Returns: Returns:
str|None: the nickname str|None: the nickname
""" """
reputation_obj = self.get_Reputation(uidornickname) reputation_obj = self.get_reputation(uidornickname)
if reputation_obj is None: if reputation_obj is None:
return None return None
@@ -149,7 +149,7 @@ class Reputation:
bool: True if exist bool: True if exist
""" """
reputation_obj = self.get_Reputation(uidornickname) reputation_obj = self.get_reputation(uidornickname)
if isinstance(reputation_obj, MReputation): if isinstance(reputation_obj, MReputation):
return True return True

View File

@@ -8,7 +8,6 @@ from core.definition import MSModule, MAdmin
if TYPE_CHECKING: if TYPE_CHECKING:
from core.classes.user import User from core.classes.user import User
from core.classes.admin import Admin
class Settings: class Settings:
"""This Class will never be reloaded. """This Class will never be reloaded.
@@ -25,7 +24,11 @@ class Settings:
CONSOLE: bool = False CONSOLE: bool = False
MAIN_SERVER_HOSTNAME: str = None MAIN_SERVER_HOSTNAME: str = None
MAIN_SERVER_ID: str = None
PROTOCTL_PREFIX_MODES_SIGNES : dict[str, str] = {}
PROTOCTL_PREFIX_SIGNES_MODES : dict[str, str] = {}
PROTOCTL_USER_MODES: list[str] = [] PROTOCTL_USER_MODES: list[str] = []
PROTOCTL_CHANNEL_MODES: list[str] = []
PROTOCTL_PREFIX: list[str] = [] PROTOCTL_PREFIX: list[str] = []
SMOD_MODULES: list[MSModule] = [] SMOD_MODULES: list[MSModule] = []
@@ -42,7 +45,7 @@ class Settings:
__INSTANCE_OF_USER_UTILS: Optional['User'] = None __INSTANCE_OF_USER_UTILS: Optional['User'] = None
"""Instance of the User Utils class""" """Instance of the User Utils class"""
__CURRENT_ADMIN: Optional['MAdmin'] = None __CURRENT_ADMIN: Optional['MAdmin'] = None
"""The Current Admin Object Model""" """The Current Admin Object Model"""
__LOGGER: Optional[Logger] = None __LOGGER: Optional[Logger] = None
@@ -79,6 +82,7 @@ class Settings:
@property @property
def global_translation(self) -> dict[str, list[list[str]]]: def global_translation(self) -> dict[str, list[list[str]]]:
"""Get/set global translation variable"""
return self.__TRANSLATION return self.__TRANSLATION
@global_translation.setter @global_translation.setter
@@ -87,6 +91,7 @@ class Settings:
@property @property
def global_lang(self) -> str: def global_lang(self) -> str:
"""Global default language."""
return self.__LANG return self.__LANG
@global_lang.setter @global_lang.setter
@@ -103,6 +108,7 @@ class Settings:
@property @property
def current_admin(self) -> MAdmin: def current_admin(self) -> MAdmin:
"""Current admin data model."""
return self.__CURRENT_ADMIN return self.__CURRENT_ADMIN
@current_admin.setter @current_admin.setter
@@ -111,6 +117,7 @@ class Settings:
@property @property
def global_logger(self) -> Logger: def global_logger(self) -> Logger:
"""Global logger Instance"""
return self.__LOGGER return self.__LOGGER
@global_logger.setter @global_logger.setter

View File

@@ -55,7 +55,7 @@ class User:
return False return False
user_obj.nickname = new_nickname user_obj.nickname = new_nickname
self.Logs.debug(f"UID ({uid}) has benn update with new nickname ({new_nickname}).")
return True return True
def update_mode(self, uidornickname: str, modes: str) -> bool: def update_mode(self, uidornickname: str, modes: str) -> bool:

View File

@@ -93,6 +93,7 @@ class MReputation(MainModel):
umodes: str = None umodes: str = None
vhost: str = None vhost: str = None
fingerprint: str = None fingerprint: str = None
tls_cipher: str = None
isWebirc: bool = False isWebirc: bool = False
isWebsocket: bool = False isWebsocket: bool = False
remote_ip: str = None remote_ip: str = None
@@ -368,4 +369,14 @@ class MSasl(MainModel):
fingerprint: Optional[str] = None fingerprint: Optional[str] = None
language: str = "EN" language: str = "EN"
auth_success: bool = False auth_success: bool = False
level: int = 0 level: int = 0
@dataclass
class MRegister:
command_name: str
func: Any
@dataclass
class MIrcdCommand:
command_name: str
func: Any

View File

@@ -165,10 +165,10 @@ class Irc:
try: try:
self.init_service_user() self.init_service_user()
self.Protocol: 'IProtocol' = self.Loader.PFactory.get() self.Protocol: 'IProtocol' = self.Loader.PFactory.get()
self.Protocol.register_command()
self.Protocol.send_link() # Etablir le link en fonction du protocol choisi self.Protocol.send_link() # Etablir le link en fonction du protocol choisi
self.signal = True # Une variable pour initier la boucle infinie self.signal = True # Une variable pour initier la boucle infinie
self.join_saved_channels() # Join existing channels # self.join_saved_channels() # Join existing channels
time.sleep(3)
# self.ModuleUtils.db_load_all_existing_modules(self) # self.ModuleUtils.db_load_all_existing_modules(self)
while self.signal: while self.signal:
@@ -219,7 +219,7 @@ class Irc:
except ssl.SSLEOFError as soe: except ssl.SSLEOFError as soe:
self.Logs.error(f"SSLEOFError: {soe}") self.Logs.error(f"SSLEOFError: {soe}")
except AttributeError as atte: except AttributeError as atte:
self.Logs.critical(f"AttributeError: {atte}") self.Logs.critical(f"AttributeError: {atte}", exc_info=True)
except Exception as e: except Exception as e:
self.Logs.critical(f"General Error: {e}", exc_info=True) self.Logs.critical(f"General Error: {e}", exc_info=True)
@@ -260,9 +260,9 @@ class Irc:
# This is only to reference the method # This is only to reference the method
return None return None
############################################## # --------------------------------------------
# FIN CONNEXION IRC # # FIN CONNEXION IRC #
############################################## # --------------------------------------------
def build_command(self, level: int, module_name: str, command_name: str, command_description: str) -> None: def build_command(self, level: int, module_name: str, command_name: str, command_description: str) -> None:
"""This method build the commands variable """This method build the commands variable
@@ -489,116 +489,12 @@ class Irc:
return None return None
self.Logs.debug(f">> {self.Utils.hide_sensitive_data(original_response)}") self.Logs.debug(f">> {self.Utils.hide_sensitive_data(original_response)}")
# parsed_protocol = self.Protocol.parse_server_msg(original_response.copy())
pos, parsed_protocol = self.Protocol.get_ircd_protocol_poisition(cmd=original_response)
match parsed_protocol:
case 'PING': pos, parsed_protocol = self.Protocol.get_ircd_protocol_poisition(cmd=original_response, log=True)
self.Protocol.on_server_ping(serverMsg=original_response)
case 'SERVER': for parsed in self.Protocol.Handler.get_ircd_commands():
self.Protocol.on_server(serverMsg=original_response) if parsed.command_name.upper() == parsed_protocol:
parsed.func(original_response)
case 'SJOIN':
self.Protocol.on_sjoin(serverMsg=original_response)
case 'EOS':
self.Protocol.on_eos(serverMsg=original_response)
case 'UID':
try:
self.Protocol.on_uid(serverMsg=original_response)
for module in self.ModuleUtils.model_get_loaded_modules().copy():
module.class_instance.cmd(original_response)
# SASL authentication
# ['@s2s-md/..', ':001', 'UID', 'adator__', '0', '1755987444', '...', 'desktop-h1qck20.mshome.net', '001XLTT0U', '0', '+iwxz', '*', 'Clk-EC2256B2.mshome.net', 'rBKAAQ==', ':...']
dnickname = self.Config.SERVICE_NICKNAME
dchanlog = self.Config.SERVICE_CHANLOG
uid = original_response[8]
nickname = original_response[3]
sasl_obj = self.Sasl.get_sasl_obj(uid)
if sasl_obj:
if sasl_obj.auth_success:
self.insert_db_admin(sasl_obj.client_uid, sasl_obj.username, sasl_obj.level, sasl_obj.language)
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=tr("[ %sSASL AUTH%s ] - %s (%s) is now connected successfuly to %s", GREEN, NOGC, nickname, sasl_obj.username, dnickname),
channel=dchanlog)
self.Protocol.send_notice(nick_from=dnickname, nick_to=nickname, msg=tr("Successfuly connected to %s", dnickname))
else:
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=tr("[ %sSASL AUTH%s ] - %s provided a wrong password for this username %s", RED, NOGC, nickname, sasl_obj.username),
channel=dchanlog)
self.Protocol.send_notice(nick_from=dnickname, nick_to=nickname, msg=tr("Wrong password!"))
# Delete sasl object!
self.Sasl.delete_sasl_client(uid)
return None
except Exception as err:
self.Logs.error(f'General Error: {err}')
case 'QUIT':
self.Protocol.on_quit(serverMsg=original_response)
case 'PROTOCTL':
self.Protocol.on_protoctl(serverMsg=original_response)
case 'SVS2MODE':
self.Protocol.on_svs2mode(serverMsg=original_response)
case 'SQUIT':
self.Protocol.on_squit(serverMsg=original_response)
case 'PART':
self.Protocol.on_part(serverMsg=original_response)
case 'VERSION':
self.Protocol.on_version_msg(serverMsg=original_response)
case 'UMODE2':
self.Protocol.on_umode2(serverMsg=original_response)
case 'NICK':
self.Protocol.on_nick(serverMsg=original_response)
case 'REPUTATION':
self.Protocol.on_reputation(serverMsg=original_response)
case 'SMOD':
self.Protocol.on_smod(original_response)
case 'SASL':
sasl_response = self.Protocol.on_sasl(original_response, self.Sasl)
self.on_sasl_authentication_process(sasl_response)
case 'MD':
self.Protocol.on_md(serverMsg=original_response)
case 'PRIVMSG':
self.Protocol.on_privmsg(serverMsg=original_response)
case 'SLOG': # TODO
self.Logs.debug(f"[!] TO HANDLE: {parsed_protocol}")
case 'PONG': # TODO
self.Logs.debug(f"[!] TO HANDLE: {parsed_protocol}")
case 'MODE': # TODO
#['@msgid=d0ySx56Yd0nc35oHts2SkC-/J9mVUA1hfM6...', ':001', 'MODE', '#a', '+nt', '1723207536']
#['@unrealircd.org/userhost=adator@localhost;...', ':001LQ0L0C', 'MODE', '#services', '-l']
self.Logs.debug(f"[!] TO HANDLE: {parsed_protocol}")
case '320': # TODO
#:irc.deb.biz.st 320 PyDefender IRCParis07 :is in security-groups: known-users,webirc-users,tls-and-known-users,tls-users
self.Logs.debug(f"[!] TO HANDLE: {parsed_protocol}")
case '318': # TODO
#:irc.deb.biz.st 318 PyDefender IRCParis93 :End of /WHOIS list.
self.Logs.debug(f"[!] TO HANDLE: {parsed_protocol}")
case None:
self.Logs.debug(f"[!] TO HANDLE: {original_response}")
if len(original_response) > 2: if len(original_response) > 2:
if original_response[2] != 'UID': if original_response[2] != 'UID':
@@ -1154,7 +1050,8 @@ class Irc:
self.Base.execute_periodic_action() self.Base.execute_periodic_action()
for chan_name in self.Channel.UID_CHANNEL_DB: for chan_name in self.Channel.UID_CHANNEL_DB:
self.Protocol.send_mode_chan(chan_name.name, '-l') # self.Protocol.send_mode_chan(chan_name.name, '-l')
self.Protocol.send_set_mode('-l', channel_name=chan_name.name)
for client in self.Client.CLIENT_DB: for client in self.Client.CLIENT_DB:
self.Protocol.send_svslogout(client) self.Protocol.send_svslogout(client)
@@ -1207,7 +1104,7 @@ class Irc:
self.Protocol.send_notice( self.Protocol.send_notice(
nick_from=dnickname, nick_from=dnickname,
nick_to=fromuser, nick_to=fromuser,
msg=tr('%s - %sNot Loaded%s', module, GREEN, NOGC) msg=tr('%s - %sNot Loaded%s', module, RED, NOGC)
) )
case 'show_timers': case 'show_timers':

View File

@@ -1,7 +1,7 @@
from core import installation from core import installation
############################################# #############################################
# @Version : 6.2 # # @Version : 6.3 #
# Requierements : # # Requierements : #
# Python3.10 or higher # # Python3.10 or higher #
# SQLAlchemy, requests, psutil # # SQLAlchemy, requests, psutil #

View File

@@ -78,11 +78,11 @@ class Clone:
self.__load_module_configuration() self.__load_module_configuration()
self.Channel.db_query_channel(action='add', module_name=self.module_name, channel_name=self.Config.CLONE_CHANNEL) self.Channel.db_query_channel(action='add', module_name=self.module_name, channel_name=self.Config.CLONE_CHANNEL)
self.Protocol.send_join_chan(self.Config.SERVICE_NICKNAME, self.Config.CLONE_CHANNEL) self.Protocol.send_sjoin(self.Config.CLONE_CHANNEL)
self.Protocol.send_set_mode('+o', nickname=self.Config.SERVICE_NICKNAME, channel_name=self.Config.CLONE_CHANNEL)
self.Protocol.send_set_mode('+nts', channel_name=self.Config.CLONE_CHANNEL)
self.Protocol.send_set_mode('+k', channel_name=self.Config.CLONE_CHANNEL, params=self.Config.CLONE_CHANNEL_PASSWORD)
self.Protocol.send2socket(f":{self.Config.SERVICE_NICKNAME} SAMODE {self.Config.CLONE_CHANNEL} +o {self.Config.SERVICE_NICKNAME}")
self.Protocol.send2socket(f":{self.Config.SERVICE_NICKNAME} MODE {self.Config.CLONE_CHANNEL} +nts")
self.Protocol.send2socket(f":{self.Config.SERVICE_NICKNAME} MODE {self.Config.CLONE_CHANNEL} +k {self.Config.CLONE_CHANNEL_PASSWORD}")
def __create_tables(self) -> None: def __create_tables(self) -> None:
"""Methode qui va créer la base de donnée si elle n'existe pas. """Methode qui va créer la base de donnée si elle n'existe pas.
@@ -127,8 +127,8 @@ class Clone:
self.Settings.set_cache('UID_CLONE_DB', self.Clone.UID_CLONE_DB) self.Settings.set_cache('UID_CLONE_DB', self.Clone.UID_CLONE_DB)
self.Channel.db_query_channel(action='del', module_name=self.module_name, channel_name=self.Config.CLONE_CHANNEL) self.Channel.db_query_channel(action='del', module_name=self.module_name, channel_name=self.Config.CLONE_CHANNEL)
self.Protocol.send2socket(f":{self.Config.SERVICE_NICKNAME} MODE {self.Config.CLONE_CHANNEL} -nts") self.Protocol.send_set_mode('-nts', channel_name=self.Config.CLONE_CHANNEL)
self.Protocol.send2socket(f":{self.Config.SERVICE_NICKNAME} MODE {self.Config.CLONE_CHANNEL} -k {self.Config.CLONE_CHANNEL_PASSWORD}") self.Protocol.send_set_mode('-k', channel_name=self.Config.CLONE_CHANNEL)
self.Protocol.send_part_chan(self.Config.SERVICE_NICKNAME, self.Config.CLONE_CHANNEL) self.Protocol.send_part_chan(self.Config.SERVICE_NICKNAME, self.Config.CLONE_CHANNEL)
self.Irc.Commands.drop_command_by_module(self.module_name) self.Irc.Commands.drop_command_by_module(self.module_name)
@@ -148,7 +148,8 @@ class Clone:
match command: match command:
case 'PRIVMSG': case 'PRIVMSG':
return self.Utils.handle_on_privmsg(self, cmd) self.Utils.handle_on_privmsg(self, cmd)
return None
case 'QUIT': case 'QUIT':
return None return None

View File

@@ -174,17 +174,17 @@ def create_new_clone(uplink: 'Clone', faker_instance: 'Faker', group: str = 'Def
return True return True
def handle_on_privmsg(uplink: 'Clone', srvmsg: list[str]): def handle_on_privmsg(uplink: 'Clone', srvmsg: list[str]) -> None:
uid_sender = uplink.Irc.Utils.clean_uid(srvmsg[1]) parser = uplink.Protocol.parse_privmsg(srvmsg)
uid_sender = uplink.Irc.Utils.clean_uid(parser.get('uid_sender', None))
senderObj = uplink.User.get_user(uid_sender) senderObj = uplink.User.get_user(uid_sender)
if senderObj.hostname in uplink.Config.CLONE_LOG_HOST_EXEMPT: if senderObj is not None:
return if senderObj.hostname in uplink.Config.CLONE_LOG_HOST_EXEMPT:
return
if not senderObj is None: senderMsg = parser.get('message', None)
senderMsg = ' '.join(srvmsg[4:]) clone_obj = uplink.Clone.get_clone(parser.get('uid_reciever', None))
clone_obj = uplink.Clone.get_clone(srvmsg[3])
if clone_obj is None: if clone_obj is None:
return return
@@ -196,3 +196,5 @@ def handle_on_privmsg(uplink: 'Clone', srvmsg: list[str]):
msg=final_message, msg=final_message,
channel=uplink.Config.CLONE_CHANNEL channel=uplink.Config.CLONE_CHANNEL
) )
return None

View File

@@ -134,17 +134,20 @@ def set_operation(uplink: 'Command', cmd: list[str], channel_name: Optional[str]
return False return False
if len(cmd) == 1: if len(cmd) == 1:
uplink.Protocol.send2socket(f":{dnickname} MODE {channel_name} {mode} {client}") # uplink.Protocol.send2socket(f":{service_id} MODE {channel_name} {mode} {client}")
uplink.Protocol.send_set_mode(mode, nickname=client, channel_name=channel_name)
return None return None
# deop nickname # deop nickname
if len(cmd) == 2: if len(cmd) == 2:
nickname = cmd[1] nickname = cmd[1]
uplink.Protocol.send2socket(f":{service_id} MODE {channel_name} {mode} {nickname}") # uplink.Protocol.send2socket(f":{service_id} MODE {channel_name} {mode} {nickname}")
uplink.Protocol.send_set_mode(mode, nickname=nickname, channel_name=channel_name)
return None return None
nickname = cmd[2] nickname = cmd[2]
uplink.Protocol.send2socket(f":{service_id} MODE {channel_name} {mode} {nickname}") # uplink.Protocol.send2socket(f":{service_id} MODE {channel_name} {mode} {nickname}")
uplink.Protocol.send_set_mode(mode, nickname=nickname, channel_name=channel_name)
return None return None
def set_ban(uplink: 'Command', cmd: list[str], action: Literal['+', '-'], client: str) -> None: def set_ban(uplink: 'Command', cmd: list[str], action: Literal['+', '-'], client: str) -> None:

View File

@@ -365,7 +365,7 @@ class Defender:
release_code = cmd[1] release_code = cmd[1]
jailed_nickname = self.User.get_nickname(fromuser) jailed_nickname = self.User.get_nickname(fromuser)
jailed_UID = self.User.get_uid(fromuser) jailed_UID = self.User.get_uid(fromuser)
get_reputation = self.Reputation.get_Reputation(jailed_UID) get_reputation = self.Reputation.get_reputation(jailed_UID)
if get_reputation is None: if get_reputation is None:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=" No code is requested ...") self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=" No code is requested ...")
@@ -551,7 +551,7 @@ class Defender:
msg=f"This nickname ({str(cmd[2])}) is not connected to the network!") msg=f"This nickname ({str(cmd[2])}) is not connected to the network!")
return None return None
client_to_release = self.Reputation.get_Reputation(client_obj.uid) client_to_release = self.Reputation.get_reputation(client_obj.uid)
if client_to_release is None: if client_to_release is None:
p.send_notice(nick_from=dnickname, p.send_notice(nick_from=dnickname,

View File

@@ -87,7 +87,7 @@ def handle_on_sjoin(uplink: 'Defender', srvmsg: list[str]):
return return
if confmodel.reputation == 1: if confmodel.reputation == 1:
get_reputation = irc.Reputation.get_Reputation(parsed_UID) get_reputation = irc.Reputation.get_reputation(parsed_UID)
if parsed_chan != gconfig.SALON_JAIL: if parsed_chan != gconfig.SALON_JAIL:
p.send2socket(f":{gconfig.SERVICE_ID} MODE {parsed_chan} +b ~security-group:unknown-users") p.send2socket(f":{gconfig.SERVICE_ID} MODE {parsed_chan} +b ~security-group:unknown-users")
@@ -138,18 +138,20 @@ def handle_on_slog(uplink: 'Defender', srvmsg: list[str]):
return None return None
def handle_on_nick(uplink: 'Defender', srvmsg: list[str]): def handle_on_nick(uplink: 'Defender', srvmsg: list[str]):
"""_summary_ """Handle nickname changes.
>>> srvmsg = ['@unrealircd.org...', ':001MZQ0RB', 'NICK', 'newnickname', '1754663712'] >>> srvmsg = ['@unrealircd.org...', ':001MZQ0RB', 'NICK', 'newnickname', '1754663712']
>>> [':97KAAAAAC', 'NICK', 'testinspir', '1757360740']
Args: Args:
irc_instance (Irc): The Irc instance irc_instance (Irc): The Irc instance
srvmsg (list[str]): The Server MSG srvmsg (list[str]): The Server MSG
confmodel (ModConfModel): The Module Configuration confmodel (ModConfModel): The Module Configuration
""" """
uid = uplink.Loader.Utils.clean_uid(str(srvmsg[1]))
p = uplink.Protocol p = uplink.Protocol
parser = p.parse_nick(srvmsg)
uid = uplink.Loader.Utils.clean_uid(parser.get('uid', None))
confmodel = uplink.ModConfig confmodel = uplink.ModConfig
get_reputation = uplink.Reputation.get_Reputation(uid) get_reputation = uplink.Reputation.get_reputation(uid)
jail_salon = uplink.Config.SALON_JAIL jail_salon = uplink.Config.SALON_JAIL
service_id = uplink.Config.SERVICE_ID service_id = uplink.Config.SERVICE_ID
@@ -159,7 +161,7 @@ def handle_on_nick(uplink: 'Defender', srvmsg: list[str]):
# Update the new nickname # Update the new nickname
oldnick = get_reputation.nickname oldnick = get_reputation.nickname
newnickname = srvmsg[3] newnickname = parser.get('newnickname', None)
get_reputation.nickname = newnickname get_reputation.nickname = newnickname
# If ban in all channel is ON then unban old nickname an ban the new nickname # If ban in all channel is ON then unban old nickname an ban the new nickname
@@ -170,20 +172,21 @@ def handle_on_nick(uplink: 'Defender', srvmsg: list[str]):
p.send2socket(f":{service_id} MODE {chan.name} +b {newnickname}!*@*") p.send2socket(f":{service_id} MODE {chan.name} +b {newnickname}!*@*")
def handle_on_quit(uplink: 'Defender', srvmsg: list[str]): def handle_on_quit(uplink: 'Defender', srvmsg: list[str]):
"""_summary_ """Handle on quit message
>>> srvmsg = ['@unrealircd.org...', ':001MZQ0RB', 'QUIT', ':Quit:', 'quit message'] >>> srvmsg = ['@unrealircd.org...', ':001MZQ0RB', 'QUIT', ':Quit:', 'quit message']
Args: Args:
uplink (Irc): The Defender Module instance uplink (Irc): The Defender Module instance
srvmsg (list[str]): The Server MSG srvmsg (list[str]): The Server MSG
""" """
p = uplink.Protocol p = uplink.Protocol
parser = p.parse_quit(srvmsg)
confmodel = uplink.ModConfig confmodel = uplink.ModConfig
ban_all_chan = uplink.Base.int_if_possible(confmodel.reputation_ban_all_chan) ban_all_chan = uplink.Base.int_if_possible(confmodel.reputation_ban_all_chan)
final_UID = uplink.Loader.Utils.clean_uid(str(srvmsg[1])) final_UID = uplink.Loader.Utils.clean_uid(str(parser.get('uid', None)))
jail_salon = uplink.Config.SALON_JAIL jail_salon = uplink.Config.SALON_JAIL
service_id = uplink.Config.SERVICE_ID service_id = uplink.Config.SERVICE_ID
get_user_reputation = uplink.Reputation.get_Reputation(final_UID) get_user_reputation = uplink.Reputation.get_reputation(final_UID)
if get_user_reputation is not None: if get_user_reputation is not None:
final_nickname = get_user_reputation.nickname final_nickname = get_user_reputation.nickname
@@ -204,6 +207,7 @@ def handle_on_uid(uplink: 'Defender', srvmsg: list[str]):
uplink (Defender): The Defender instance uplink (Defender): The Defender instance
srvmsg (list[str]): The Server MSG srvmsg (list[str]): The Server MSG
""" """
parser_uid = uplink.Protocol.parse_uid(srvmsg)
gconfig = uplink.Config gconfig = uplink.Config
irc = uplink.Irc irc = uplink.Irc
confmodel = uplink.ModConfig confmodel = uplink.ModConfig
@@ -213,10 +217,10 @@ def handle_on_uid(uplink: 'Defender', srvmsg: list[str]):
return None return None
# Get User information # Get User information
_User = irc.User.get_user(str(srvmsg[8])) _User = irc.User.get_user(parser_uid.get('uid', None))
if _User is None: if _User is None:
irc.Logs.warning(f'This UID: [{srvmsg[8]}] is not available please check why') irc.Logs.warning(f'This UID: [{parser_uid.get("uid", None)}] is not available please check why')
return return
# If user is not service or IrcOp then scan them # If user is not service or IrcOp then scan them
@@ -249,7 +253,8 @@ def handle_on_uid(uplink: 'Defender', srvmsg: list[str]):
#################### ####################
# ACTION FUNCTIONS # # ACTION FUNCTIONS #
#################### ####################
# [:<sid>] UID <uid> <ts> <nick> <real-host> <displayed-host> <real-user> <ip> <signon> <modes> [<mode-parameters>]+ :<real>
# [:<sid>] UID nickname hopcount timestamp username hostname uid servicestamp umodes virthost cloakedhost ip :gecos
def action_on_flood(uplink: 'Defender', srvmsg: list[str]): def action_on_flood(uplink: 'Defender', srvmsg: list[str]):
confmodel = uplink.ModConfig confmodel = uplink.ModConfig
@@ -318,7 +323,7 @@ def action_add_reputation_sanctions(uplink: 'Defender', jailed_uid: str ):
p = uplink.Protocol p = uplink.Protocol
confmodel = uplink.ModConfig confmodel = uplink.ModConfig
get_reputation = irc.Reputation.get_Reputation(jailed_uid) get_reputation = irc.Reputation.get_reputation(jailed_uid)
if get_reputation is None: if get_reputation is None:
irc.Logs.warning(f'UID {jailed_uid} has not been found') irc.Logs.warning(f'UID {jailed_uid} has not been found')
@@ -404,7 +409,7 @@ def action_apply_reputation_santions(uplink: 'Defender') -> None:
# Suppression des éléments dans {UID_DB} et {REPUTATION_DB} # Suppression des éléments dans {UID_DB} et {REPUTATION_DB}
for chan in irc.Channel.UID_CHANNEL_DB: for chan in irc.Channel.UID_CHANNEL_DB:
if chan.name != salon_jail and ban_all_chan == 1: if chan.name != salon_jail and ban_all_chan == 1:
get_user_reputation = irc.Reputation.get_Reputation(uid) get_user_reputation = irc.Reputation.get_reputation(uid)
p.send2socket(f":{service_id} MODE {chan.name} -b {get_user_reputation.nickname}!*@*") p.send2socket(f":{service_id} MODE {chan.name} -b {get_user_reputation.nickname}!*@*")
# Lorsqu'un utilisateur quitte, il doit être supprimé de {UID_DB}. # Lorsqu'un utilisateur quitte, il doit être supprimé de {UID_DB}.

View File

@@ -1,5 +1,5 @@
{ {
"version": "6.2.6", "version": "6.3.0",
"requests": "2.32.3", "requests": "2.32.3",
"psutil": "6.0.0", "psutil": "6.0.0",