Merge pull request #94 from adator85/v6.3.2

V6.3.2
This commit is contained in:
adator
2025-10-26 21:10:39 +01:00
committed by GitHub
14 changed files with 174 additions and 156 deletions

1
.gitignore vendored
View File

@@ -6,6 +6,7 @@ db/
logs/ logs/
__pycache__/ __pycache__/
configuration.json configuration.json
configuration.yaml
configuration_inspircd.json configuration_inspircd.json
configuration_unreal6.json configuration_unreal6.json
*.log *.log

View File

@@ -1,48 +0,0 @@
{
"SERVEUR_IP": "YOUR.SERVER.IP",
"SERVEUR_HOSTNAME": "YOUR.SERVER.HOST",
"SERVEUR_LINK": "LINK.DE.TON.SERVER",
"SERVEUR_PORT": 7002,
"SERVEUR_PASSWORD": "YOUR_LINK_PASSWORD",
"SERVEUR_ID": "006",
"SERVEUR_SSL": true,
"SERVICE_NAME": "defender",
"SERVICE_NICKNAME": "PyDefender",
"SERVICE_REALNAME": "Python Defender Security",
"SERVICE_USERNAME": "PyDefender",
"SERVICE_HOST": "HOST.DE.TON.DEFENDER",
"SERVICE_INFO": "Network IRC Service",
"SERVICE_CHANLOG": "#services",
"SERVICE_SMODES": "+ioqBS",
"SERVICE_CMODES": "ntsOP",
"SERVICE_UMODES": "o",
"SERVICE_PREFIX": "!",
"OWNER": "TON_NICK_NAME",
"PASSWORD": "TON_PASSWORD",
"JSONRPC_URL": "https://your.domaine.com:8600/api",
"JSONRPC_PATH_TO_SOCKET_FILE": "/PATH/TO/YOUR/IRCD/data/rpc.socket",
"JSONRPC_METHOD": "socket",
"JSONRPC_USER": "YOUR_RPC_USER",
"JSONRPC_PASSWORD": "YOUR_RPC_PASSWORD",
"SALON_JAIL": "#jail",
"SALON_JAIL_MODES": "sS",
"SALON_LIBERER": "#welcome",
"CLONE_CHANNEL": "#clones",
"CLONE_CMODES": "+nts",
"CLONE_LOG_HOST_EXEMPT": ["HOST.TO.SKIP"],
"CLONE_CHANNEL_PASSWORD": "YOUR_CHANNEL_PASSWORD",
"API_TIMEOUT": 2,
"PORTS_TO_SCAN": [3028, 8080, 1080, 1085, 4145, 9050],
"WHITELISTED_IP": ["127.0.0.1"],
"GLINE_DURATION": "30",
"DEBUG_LEVEL": 20,
"DEBUG_HARD": true
}

View File

@@ -0,0 +1,47 @@
configuration:
SERVEUR_IP: "YOUR.SERVER.IP"
SERVEUR_HOSTNAME: "YOUR.SERVER.HOST"
SERVEUR_LINK: "LINK.DE.TON.SERVER"
SERVEUR_PORT: 7002
SERVEUR_PASSWORD: "YOUR_LINK_PASSWORD"
SERVEUR_ID: "006"
SERVEUR_SSL: true
SERVICE_NAME: "defender"
SERVICE_NICKNAME: "PyDefender"
SERVICE_REALNAME: "Python Defender Security"
SERVICE_USERNAME: "PyDefender"
SERVICE_HOST: "HOST.DE.TON.DEFENDER"
SERVICE_INFO: "Network IRC Service"
SERVICE_CHANLOG: "#services"
SERVICE_SMODES: "+ioqBS"
SERVICE_CMODES: "ntsOP"
SERVICE_UMODES: "o"
SERVICE_PREFIX: "!"
OWNER: "TON_NICK_NAME"
PASSWORD: "TON_PASSWORD"
JSONRPC_URL: "https://your.domaine.com:8600/api"
JSONRPC_PATH_TO_SOCKET_FILE: "/PATH/TO/YOUR/IRCD/data/rpc.socket"
JSONRPC_METHOD: "socket"
JSONRPC_USER: "YOUR_RPC_USER"
JSONRPC_PASSWORD: "YOUR_RPC_PASSWORD"
SALON_JAIL: "#jail"
SALON_JAIL_MODES: "sS"
SALON_LIBERER: "#welcome"
CLONE_CHANNEL: "#clones"
CLONE_CMODES: "+nts"
CLONE_LOG_HOST_EXEMPT: ["HOST.TO.SKIP"]
CLONE_CHANNEL_PASSWORD: "YOUR_CHANNEL_PASSWORD"
API_TIMEOUT: 2
PORTS_TO_SCAN: [3028 8080 1080 1085 4145 9050]
WHITELISTED_IP: ["127.0.0.1"]
GLINE_DURATION: "30"
DEBUG_LEVEL: 20
DEBUG_HARD: true

View File

@@ -36,7 +36,7 @@ class Command:
def get_command(self, command_name: str, module_name: str) -> Optional[MCommand]: def get_command(self, command_name: str, module_name: str) -> Optional[MCommand]:
for command in self.DB_COMMANDS: for command in self.DB_COMMANDS:
if command.command_name.lower() == command_name and command.module_name == module_name: if command.command_name.lower() == command_name.lower() and command.module_name.lower() == module_name.lower():
return command return command
return None return None
@@ -90,7 +90,7 @@ class Command:
admin_level = admin.level if admin else 0 admin_level = admin.level if admin else 0
commands = self.get_commands_by_level(admin_level) commands = self.get_commands_by_level(admin_level)
if command_name in [command.command_name for command in commands]: if command_name.lower() in [command.command_name.lower() for command in commands]:
return True return True
return False return False

View File

@@ -1,3 +1,5 @@
import sys
import yaml
from json import load from json import load
from sys import exit from sys import exit
from os import sep from os import sep
@@ -13,49 +15,43 @@ class Configuration:
self.Loader = loader self.Loader = loader
self.Logs = loader.Logs self.Logs = loader.Logs
self._config_model: MConfig = self.__load_service_configuration() self.configuration_model = self.__load_service_configuration()
loader.ServiceLogging.set_file_handler_level(self._config_model.DEBUG_LEVEL) loader.ServiceLogging.set_file_handler_level(self._config_model.DEBUG_LEVEL)
loader.ServiceLogging.set_stdout_handler_level(self._config_model.DEBUG_LEVEL) loader.ServiceLogging.set_stdout_handler_level(self._config_model.DEBUG_LEVEL)
loader.ServiceLogging.update_handler_format(self._config_model.DEBUG_HARD) loader.ServiceLogging.update_handler_format(self._config_model.DEBUG_HARD)
return None return None
def get_config_model(self) -> MConfig: @property
def configuration_model(self) -> MConfig:
return self._config_model return self._config_model
def __load_json_service_configuration(self) -> Optional[dict[str, Any]]: @configuration_model.setter
def configuration_model(self, conf_model: MConfig):
self._config_model = conf_model
def __load_config_file(self) -> Optional[dict[str, Any]]:
try: try:
conf_filename = f'config{sep}configuration.json' conf_filename = f'config{sep}configuration.yaml'
with open(conf_filename, 'r') as configuration_data: with open(conf_filename, 'r') as conf:
configuration: dict[str, Union[str, int, list, dict]] = load(configuration_data) configuration: dict[str, dict[str, Any]] = yaml.safe_load(conf)
return configuration
return configuration.get('configuration', None)
except FileNotFoundError as fe: except FileNotFoundError as fe:
self.Logs.error(f'FileNotFound: {fe}') self.Logs.error(f'FileNotFound: {fe}')
self.Logs.error('Configuration file not found please create config/configuration.json') self.Logs.error('Configuration file not found please create config/configuration.yaml')
exit(0) exit("Configuration file not found please create config/configuration.yaml")
except KeyError as ke:
self.Logs.error(f'Key Error: {ke}')
self.Logs.error('The key must be defined in core/configuration.json')
def __load_service_configuration(self) -> MConfig: def __load_service_configuration(self) -> MConfig:
try: try:
import_config = self.__load_json_service_configuration() import_config = self.__load_config_file()
if import_config is None:
self.Logs.error("Error While importing configuration file!", exc_info=True)
raise Exception("Error While importing yaml configuration")
Model_keys = MConfig().to_dict() list_key_to_remove: list[str] = [key_to_del for key_to_del in import_config if key_to_del not in MConfig().get_attributes()]
model_key_list: list = [] for key_to_remove in list_key_to_remove:
json_config_key_list: list = [] import_config.pop(key_to_remove, None)
self.Logs.warning(f"[!] The key {key_to_remove} is not expected, it has been removed from the system ! please remove it from configuration.json file [!]")
for key in Model_keys:
model_key_list.append(key)
for key in import_config:
json_config_key_list.append(key)
for json_conf in json_config_key_list:
if not json_conf in model_key_list:
import_config.pop(json_conf, None)
self.Logs.warning(f"[!] The key {json_conf} is not expected, it has been removed from the system ! please remove it from configuration.json file [!]")
self.Logs.debug(f"[LOADING CONFIGURATION]: Loading configuration with {len(import_config)} parameters!") self.Logs.debug(f"[LOADING CONFIGURATION]: Loading configuration with {len(import_config)} parameters!")
return MConfig(**import_config) return MConfig(**import_config)

View File

@@ -288,46 +288,46 @@ class IProtocol(ABC):
# ------------------------------------------------------------------------ # ------------------------------------------------------------------------
@abstractmethod @abstractmethod
def parse_uid(self, server_msg: list[str]) -> dict[str, str]: def parse_uid(self, serverMsg: list[str]) -> dict[str, str]:
"""Parse UID and return dictionary. """Parse UID and return dictionary.
Args: Args:
server_msg (list[str]): The UID IRCD message serverMsg (list[str]): The UID IRCD message
Returns: Returns:
dict[str, str]: The response as dictionary. dict[str, str]: The response as dictionary.
""" """
@abstractmethod @abstractmethod
def parse_quit(self, server_msg: list[str]) -> dict[str, str]: def parse_quit(self, serverMsg: list[str]) -> dict[str, str]:
"""Parse quit and return dictionary. """Parse quit and return dictionary.
>>> [':97KAAAAAB', 'QUIT', ':Quit:', 'this', 'is', 'my', 'reason', 'to', 'quit'] >>> [':97KAAAAAB', 'QUIT', ':Quit:', 'this', 'is', 'my', 'reason', 'to', 'quit']
Args: Args:
server_msg (list[str]): The server message to parse serverMsg (list[str]): The server message to parse
Returns: Returns:
dict[str, str]: The response as dictionary. dict[str, str]: The response as dictionary.
""" """
@abstractmethod @abstractmethod
def parse_nick(self, server_msg: list[str]) -> dict[str, str]: def parse_nick(self, serverMsg: list[str]) -> dict[str, str]:
"""Parse nick changes and return dictionary. """Parse nick changes and return dictionary.
>>> [':97KAAAAAC', 'NICK', 'testinspir', '1757360740'] >>> [':97KAAAAAC', 'NICK', 'testinspir', '1757360740']
Args: Args:
server_msg (list[str]): The server message to parse serverMsg (list[str]): The server message to parse
Returns: Returns:
dict[str, str]: The response as dictionary. dict[str, str]: The response as dictionary.
""" """
@abstractmethod @abstractmethod
def parse_privmsg(self, server_msg: list[str]) -> dict[str, str]: def parse_privmsg(self, serverMsg: list[str]) -> dict[str, str]:
"""Parse PRIVMSG message. """Parse PRIVMSG message.
>>> [':97KAAAAAE', 'PRIVMSG', '#welcome', ':This', 'is', 'my', 'public', 'message'] >>> [':97KAAAAAE', 'PRIVMSG', '#welcome', ':This', 'is', 'my', 'public', 'message']
Args: Args:
server_msg (list[str]): The server message to parse serverMsg (list[str]): The server message to parse
Returns: Returns:
dict[str, str]: The response as dictionary. dict[str, str]: The response as dictionary.
@@ -345,174 +345,174 @@ class IProtocol(ABC):
# ------------------------------------------------------------------------ # ------------------------------------------------------------------------
@abstractmethod @abstractmethod
def on_svs2mode(self, server_msg: list[str]) -> None: def on_svs2mode(self, serverMsg: list[str]) -> None:
"""Handle svs2mode coming from a server """Handle svs2mode coming from a server
>>> [':00BAAAAAG', 'SVS2MODE', '001U01R03', '-r'] >>> [':00BAAAAAG', 'SVS2MODE', '001U01R03', '-r']
Args: Args:
server_msg (list[str]): Original server message serverMsg (list[str]): Original server message
""" """
@abstractmethod @abstractmethod
def on_mode(self, server_msg: list[str]) -> None: def on_mode(self, serverMsg: list[str]) -> None:
"""Handle mode coming from a server """Handle mode coming from a server
Args: Args:
server_msg (list[str]): Original server message serverMsg (list[str]): Original server message
""" """
@abstractmethod @abstractmethod
def on_umode2(self, server_msg: list[str]) -> None: def on_umode2(self, serverMsg: list[str]) -> None:
"""Handle umode2 coming from a server """Handle umode2 coming from a server
>>> [':adator_', 'UMODE2', '-i'] >>> [':adator_', 'UMODE2', '-i']
Args: Args:
server_msg (list[str]): Original server message serverMsg (list[str]): Original server message
""" """
@abstractmethod @abstractmethod
def on_quit(self, server_msg: list[str]) -> None: def on_quit(self, serverMsg: list[str]) -> None:
"""Handle quit coming from a server """Handle quit coming from a server
Args: Args:
server_msg (list[str]): Original server message serverMsg (list[str]): Original server message
""" """
@abstractmethod @abstractmethod
def on_squit(self, server_msg: list[str]) -> None: def on_squit(self, serverMsg: list[str]) -> None:
"""Handle squit coming from a server """Handle squit coming from a server
Args: Args:
server_msg (list[str]): Original server message serverMsg (list[str]): Original server message
""" """
@abstractmethod @abstractmethod
def on_protoctl(self, server_msg: list[str]) -> None: def on_protoctl(self, serverMsg: list[str]) -> None:
"""Handle protoctl coming from a server """Handle protoctl coming from a server
Args: Args:
server_msg (list[str]): Original server message serverMsg (list[str]): Original server message
""" """
@abstractmethod @abstractmethod
def on_nick(self, server_msg: list[str]) -> None: def on_nick(self, serverMsg: list[str]) -> None:
"""Handle nick coming from a server """Handle nick coming from a server
new nickname new nickname
Args: Args:
server_msg (list[str]): Original server message serverMsg (list[str]): Original server message
""" """
@abstractmethod @abstractmethod
def on_sjoin(self, server_msg: list[str]) -> None: def on_sjoin(self, serverMsg: list[str]) -> None:
"""Handle sjoin coming from a server """Handle sjoin coming from a server
Args: Args:
server_msg (list[str]): Original server message serverMsg (list[str]): Original server message
""" """
@abstractmethod @abstractmethod
def on_part(self, server_msg: list[str]) -> None: def on_part(self, serverMsg: list[str]) -> None:
"""Handle part coming from a server """Handle part coming from a server
Args: Args:
server_msg (list[str]): Original server message serverMsg (list[str]): Original server message
""" """
@abstractmethod @abstractmethod
def on_eos(self, server_msg: list[str]) -> None: def on_eos(self, serverMsg: list[str]) -> None:
"""Handle EOS coming from a server """Handle EOS coming from a server
Args: Args:
server_msg (list[str]): Original server message serverMsg (list[str]): Original server message
""" """
@abstractmethod @abstractmethod
def on_reputation(self, server_msg: list[str]) -> None: def on_reputation(self, serverMsg: list[str]) -> None:
"""Handle REPUTATION coming from a server """Handle REPUTATION coming from a server
Args: Args:
server_msg (list[str]): Original server message serverMsg (list[str]): Original server message
""" """
@abstractmethod @abstractmethod
def on_uid(self, server_msg: list[str]) -> None: def on_uid(self, serverMsg: list[str]) -> None:
"""Handle uid message coming from the server """Handle uid message coming from the server
Args: Args:
server_msg (list[str]): Original server message serverMsg (list[str]): Original server message
""" """
@abstractmethod @abstractmethod
def on_privmsg(self, server_msg: list[str]) -> None: def on_privmsg(self, serverMsg: list[str]) -> None:
"""Handle PRIVMSG message coming from the server """Handle PRIVMSG message coming from the server
Args: Args:
server_msg (list[str]): Original server message serverMsg (list[str]): Original server message
""" """
@abstractmethod @abstractmethod
def on_server_ping(self, server_msg: list[str]) -> None: def on_server_ping(self, serverMsg: list[str]) -> None:
"""Send a PONG message to the server """Send a PONG message to the server
Args: Args:
server_msg (list[str]): List of str coming from the server serverMsg (list[str]): List of str coming from the server
""" """
@abstractmethod @abstractmethod
def on_server(self, server_msg: list[str]) -> None: def on_server(self, serverMsg: list[str]) -> None:
"""_summary_ """_summary_
Args: Args:
server_msg (list[str]): _description_ serverMsg (list[str]): _description_
""" """
@abstractmethod @abstractmethod
def on_version(self, server_msg: list[str]) -> None: def on_version(self, serverMsg: list[str]) -> None:
"""Sending Server Version to the server """Sending Server Version to the server
Args: Args:
server_msg (list[str]): List of str coming from the server serverMsg (list[str]): List of str coming from the server
""" """
@abstractmethod @abstractmethod
def on_time(self, server_msg: list[str]) -> None: def on_time(self, serverMsg: list[str]) -> None:
"""Sending TIME answer to a requestor """Sending TIME answer to a requestor
Args: Args:
server_msg (list[str]): List of str coming from the server serverMsg (list[str]): List of str coming from the server
""" """
@abstractmethod @abstractmethod
def on_ping(self, server_msg: list[str]) -> None: def on_ping(self, serverMsg: list[str]) -> None:
"""Sending a PING answer to requestor """Sending a PING answer to requestor
Args: Args:
server_msg (list[str]): List of str coming from the server serverMsg (list[str]): List of str coming from the server
""" """
@abstractmethod @abstractmethod
def on_version_msg(self, server_msg: list[str]) -> None: def on_version_msg(self, serverMsg: list[str]) -> None:
"""Handle version coming from the server """Handle version coming from the server
\n ex. /version Defender \n ex. /version Defender
Args: Args:
server_msg (list[str]): Original message from the server serverMsg (list[str]): Original message from the server
""" """
@abstractmethod @abstractmethod
def on_smod(self, server_msg: list[str]) -> None: def on_smod(self, serverMsg: list[str]) -> None:
"""Handle SMOD message coming from the server """Handle SMOD message coming from the server
Args: Args:
server_msg (list[str]): Original server message serverMsg (list[str]): Original server message
""" """
@abstractmethod @abstractmethod
def on_sasl(self, server_msg: list[str]) -> Optional['MSasl']: def on_sasl(self, serverMsg: list[str]) -> Optional['MSasl']:
"""Handle SASL coming from a server """Handle SASL coming from a server
Args: Args:
server_msg (list[str]): Original server message serverMsg (list[str]): Original server message
Returns: Returns:
@@ -530,9 +530,18 @@ class IProtocol(ABC):
""" """
@abstractmethod @abstractmethod
def on_md(self, server_msg: list[str]) -> None: def on_md(self, serverMsg: list[str]) -> None:
"""Handle MD responses """Handle MD responses
[':001', 'MD', 'client', '001MYIZ03', 'certfp', ':d1235648...'] [':001', 'MD', 'client', '001MYIZ03', 'certfp', ':d1235648...']
Args: Args:
server_msg (list[str]): The server reply serverMsg (list[str]): The server reply
"""
@abstractmethod
def on_kick(self, serverMsg: list[str]) -> None:
"""When a user is kicked out from a channel
Eg. ['@unrealircd.org...', ':001', 'KICK', '#jsonrpc', '001ELW13T', ':Kicked', 'from', 'JSONRPC', 'User']
Args:
serverMsg (list[str]): The server message
""" """

View File

@@ -76,6 +76,7 @@ class Unrealircd6(IProtocol):
self.Handler.register(m(command_name="SASL", func=self.on_sasl)) self.Handler.register(m(command_name="SASL", func=self.on_sasl))
self.Handler.register(m(command_name="MD", func=self.on_md)) self.Handler.register(m(command_name="MD", func=self.on_md))
self.Handler.register(m(command_name="PRIVMSG", func=self.on_privmsg)) self.Handler.register(m(command_name="PRIVMSG", func=self.on_privmsg))
self.Handler.register(m(command_name="KICK", func=self.on_kick))
return None return None
@@ -699,7 +700,9 @@ class Unrealircd6(IProtocol):
serverMsg (list[str]): The server message to parse serverMsg (list[str]): The server message to parse
Returns: Returns:
dict[str, str]: The response as dictionary. dict: The response as dictionary.
>>> {"uid": "", "newnickname": "", "timestamp": ""}
""" """
scopy = serverMsg.copy() scopy = serverMsg.copy()
if scopy[0].startswith('@'): if scopy[0].startswith('@'):
@@ -1199,7 +1202,7 @@ class Unrealircd6(IProtocol):
admin = self.__Irc.Admin.get_admin(uid) admin = self.__Irc.Admin.get_admin(uid)
account = admin.account if admin else '' account = admin.account if admin else ''
self.send_priv_msg(nick_from=dnickname, self.send_priv_msg(nick_from=dnickname,
msg=tr("[ %sSASL AUTO AUTH%s ] - %s (%s) is now connected successfuly to %s", GREEN, NOGC, nickname, account, dnickname), msg=tr("[ %sFINGERPRINT AUTH%s ] - %s (%s) is now connected successfuly to %s", GREEN, NOGC, nickname, account, dnickname),
channel=dchanlog) channel=dchanlog)
self.send_notice(nick_from=dnickname, nick_to=nickname, msg=tr("Successfuly connected to %s", dnickname)) self.send_notice(nick_from=dnickname, nick_to=nickname, msg=tr("Successfuly connected to %s", dnickname))
@@ -1612,3 +1615,18 @@ class Unrealircd6(IProtocol):
... ...
except Exception as e: except Exception as e:
self.__Logs.error(f"General Error: {e}") self.__Logs.error(f"General Error: {e}")
def on_kick(self, serverMsg: list[str]) -> None:
"""When a user is kicked out from a channel
['@unrealircd.org/issued-by=RPC:admin-for-test@...', ':001', 'KICK', '#jsonrpc', '001ELW13T', ':Kicked', 'from', 'JSONRPC', 'User']
Args:
serverMsg (list[str]): The server message
"""
scopy = serverMsg.copy()
uid = scopy[4]
channel = scopy[3]
# Delete the user from the channel.
self.__Irc.Channel.delete_user_from_channel(channel, uid)
return None

View File

@@ -49,7 +49,7 @@ def restart_service(uplink: 'Irc', reason: str = "Restarting with no reason!") -
uplink.Logs.warning('-- Waiting for socket to close ...') uplink.Logs.warning('-- Waiting for socket to close ...')
# Reload configuration # Reload configuration
uplink.Loader.Config = uplink.Loader.ConfModule.Configuration(uplink.Loader).get_config_model() uplink.Loader.Config = uplink.Loader.ConfModule.Configuration(uplink.Loader).configuration_model
uplink.Loader.Base = uplink.Loader.BaseModule.Base(uplink.Loader) uplink.Loader.Base = uplink.Loader.BaseModule.Base(uplink.Loader)
for mod in REHASH_MODULES: for mod in REHASH_MODULES:
@@ -77,7 +77,7 @@ def rehash_service(uplink: 'Irc', nickname: str) -> None:
channel=uplink.Config.SERVICE_CHANLOG channel=uplink.Config.SERVICE_CHANLOG
) )
uplink.Utils = sys.modules['core.utils'] uplink.Utils = sys.modules['core.utils']
uplink.Config = uplink.Loader.ConfModule.Configuration(uplink.Loader).get_config_model() uplink.Config = uplink.Loader.ConfModule.Configuration(uplink.Loader).configuration_model
uplink.Config.HSID = config_model_bakcup.HSID uplink.Config.HSID = config_model_bakcup.HSID
uplink.Config.DEFENDER_INIT = config_model_bakcup.DEFENDER_INIT uplink.Config.DEFENDER_INIT = config_model_bakcup.DEFENDER_INIT
uplink.Config.DEFENDER_RESTART = config_model_bakcup.DEFENDER_RESTART uplink.Config.DEFENDER_RESTART = config_model_bakcup.DEFENDER_RESTART

View File

@@ -847,12 +847,15 @@ class Irc:
try: try:
admin_obj = self.Admin.get_admin(fromuser) admin_obj = self.Admin.get_admin(fromuser)
if admin_obj: if admin_obj:
if admin_obj.fingerprint is not None:
query = f'UPDATE {self.Config.TABLE_ADMIN} SET fingerprint = :fingerprint WHERE user = :user' query = f'UPDATE {self.Config.TABLE_ADMIN} SET fingerprint = :fingerprint WHERE user = :user'
r = self.Base.db_execute_query(query, {'fingerprint': admin_obj.fingerprint, 'user': admin_obj.account}) r = self.Base.db_execute_query(query, {'fingerprint': admin_obj.fingerprint, 'user': admin_obj.account})
if r.rowcount > 0: if r.rowcount > 0:
self.Protocol.send_notice(dnickname, fromuser, f'[ {GREEN}CERT{NOGC} ] Your new fingerprint has been attached to your account. {admin_obj.fingerprint}') self.Protocol.send_notice(dnickname, fromuser, f'[ {GREEN}CERT{NOGC} ] Your new fingerprint has been attached to your account. {admin_obj.fingerprint}')
else: else:
self.Protocol.send_notice(dnickname, fromuser, f'[ {RED}CERT{NOGC} ] Impossible to add your fingerprint.{admin_obj.fingerprint}') self.Protocol.send_notice(dnickname, fromuser, f'[ {RED}CERT{NOGC} ] Impossible to add your fingerprint.{admin_obj.fingerprint}')
else:
self.Protocol.send_notice(dnickname, fromuser, f'[ {RED}CERT{NOGC} ] There is no fingerprint to add.')
except Exception as e: except Exception as e:
self.Logs.error(e) self.Logs.error(e)

View File

@@ -35,7 +35,7 @@ class Loader:
self.Logs: Logger = self.ServiceLogging.get_logger() self.Logs: Logger = self.ServiceLogging.get_logger()
self.Config: df.MConfig = self.ConfModule.Configuration(self).get_config_model() self.Config: df.MConfig = self.ConfModule.Configuration(self).configuration_model
self.Settings.global_lang = self.Config.LANG if self.Config.LANG else "EN" self.Settings.global_lang = self.Config.LANG if self.Config.LANG else "EN"

View File

@@ -10,15 +10,10 @@ from core import installation
############################################# #############################################
try: try:
installation.Install() installation.Install()
from core.loader import Loader from core.loader import Loader
loader = Loader() loader = Loader()
loader.Irc.init_irc() loader.Irc.init_irc()
# from core.irc import Irc
# ircInstance = Irc(Loader())
# ircInstance.init_irc(ircInstance)
except AssertionError as ae: except AssertionError as ae:
print(f'Assertion Error -> {ae}') print(f'Assertion Error -> {ae}')

View File

@@ -217,7 +217,7 @@ class Defender:
if response is not None: if response is not None:
q_insert = "INSERT INTO def_trusted (datetime, user, host, vhost) VALUES (?, ?, ?, ?)" q_insert = "INSERT INTO def_trusted (datetime, user, host, vhost) VALUES (?, ?, ?, ?)"
mes_donnees = {'datetime': self.Base.get_datetime(), 'user': nickname, 'host': '*', 'vhost': '*'} mes_donnees = {'datetime': self.Loader.Utils.get_datetime(), 'user': nickname, 'host': '*', 'vhost': '*'}
exec_query = self.Base.db_execute_query(q_insert, mes_donnees) exec_query = self.Base.db_execute_query(q_insert, mes_donnees)
pass pass

View File

@@ -383,14 +383,11 @@ def action_apply_reputation_santions(uplink: 'Defender') -> None:
color_red = gconfig.COLORS.red color_red = gconfig.COLORS.red
nogc = gconfig.COLORS.nogc nogc = gconfig.COLORS.nogc
salon_jail = gconfig.SALON_JAIL salon_jail = gconfig.SALON_JAIL
if reputation_flag == 0:
return None
elif reputation_timer == 0:
return None
uid_to_clean = [] uid_to_clean = []
if reputation_flag == 0 or reputation_timer == 0:
return None
for user in irc.Reputation.UID_REPUTATION_DB: for user in irc.Reputation.UID_REPUTATION_DB:
if not user.isWebirc: # Si il ne vient pas de WebIRC if not user.isWebirc: # Si il ne vient pas de WebIRC
if irc.User.get_user_uptime_in_minutes(user.uid) >= reputation_timer and int(user.score_connexion) <= int(reputation_seuil): if irc.User.get_user_uptime_in_minutes(user.uid) >= reputation_timer and int(user.score_connexion) <= int(reputation_seuil):

View File

@@ -1,5 +1,5 @@
{ {
"version": "6.3.0", "version": "6.3.2",
"requests": "2.32.3", "requests": "2.32.3",
"psutil": "6.0.0", "psutil": "6.0.0",