New features on branch v6.2.5:

- New capability in base.py to patch the database
    - Some minor updates on installation.py.
    - Translation feature:
        - New library requirement (pyyaml)
        - New translation systeme implemented.
        - New class translation.py added.
        - Module folder updated by adding new folder language.
        - Core module updated as well with new language folder.
This commit is contained in:
adator
2025-08-25 22:31:07 +02:00
parent 0a2e3f724b
commit 0c6fcb7710
22 changed files with 346 additions and 92 deletions

View File

@@ -564,6 +564,9 @@ class Base:
self.db_execute_query(table_core_channel) self.db_execute_query(table_core_channel)
self.db_execute_query(table_core_config) self.db_execute_query(table_core_config)
# Patch database
self.db_patch(self.Config.TABLE_ADMIN, "language", "TEXT")
if self.install: if self.install:
self.Loader.ModuleUtils.db_register_module('mod_command', 'sys', True) self.Loader.ModuleUtils.db_register_module('mod_command', 'sys', True)
self.Loader.ModuleUtils.db_register_module('mod_defender', 'sys', True) self.Loader.ModuleUtils.db_register_module('mod_defender', 'sys', True)
@@ -584,6 +587,25 @@ class Base:
return response return response
def db_is_column_exist(self, table_name: str, column_name: str) -> bool:
q = self.db_execute_query(f"PRAGMA table_info({table_name})")
existing_columns = [col[1] for col in q.fetchall()]
if column_name in existing_columns:
return True
else:
return False
def db_patch(self, table_name: str, column_name: str, column_type: str) -> bool:
if not self.db_is_column_exist(table_name, column_name):
patch = f"ALTER TABLE {self.Config.TABLE_ADMIN} ADD COLUMN {column_name} {column_type}"
self.db_execute_query(patch)
self.logs.debug(f"The patch has been applied")
self.logs.debug(f"Table name: {table_name}, Column name: {column_name}, Column type: {column_type}")
return True
else:
return False
def db_close(self) -> None: def db_close(self) -> None:
try: try:

View File

@@ -211,15 +211,6 @@ class Channel:
return None return None
def get_channel_asdict(self, channel_name: str) -> Optional[dict[str, Any]]:
channel_obj: Optional['MChannel'] = self.get_channel(channel_name)
if channel_obj is None:
return None
return channel_obj.to_dict()
def is_valid_channel(self, channel_to_check: str) -> bool: def is_valid_channel(self, channel_to_check: str) -> bool:
"""Check if the string has the # caractere and return True if this is a valid channel """Check if the string has the # caractere and return True if this is a valid channel
@@ -276,7 +267,7 @@ class Channel:
mes_donnees = {'datetime': self.Utils.get_sdatetime(), 'channel_name': channel_name, 'module_name': module_name} mes_donnees = {'datetime': self.Utils.get_sdatetime(), 'channel_name': channel_name, 'module_name': module_name}
insert = self.Base.db_execute_query(f"INSERT INTO {core_table} (datetime, channel_name, module_name) VALUES (:datetime, :channel_name, :module_name)", mes_donnees) insert = self.Base.db_execute_query(f"INSERT INTO {core_table} (datetime, channel_name, module_name) VALUES (:datetime, :channel_name, :module_name)", mes_donnees)
if insert.rowcount: if insert.rowcount:
self.Logs.debug(f'New channel added: channel={channel_name} / module_name={module_name}') self.Logs.debug(f'Channel added to DB: channel={channel_name} / module_name={module_name}')
return True return True
else: else:
return False return False
@@ -286,7 +277,7 @@ class Channel:
response = self.Base.db_execute_query(f"DELETE FROM {core_table} WHERE channel_name = :channel_name AND module_name = :module_name", mes_donnes) response = self.Base.db_execute_query(f"DELETE FROM {core_table} WHERE channel_name = :channel_name AND module_name = :module_name", mes_donnes)
if response.rowcount > 0: if response.rowcount > 0:
self.Logs.debug(f'Channel deleted: channel={channel_name} / module: {module_name}') self.Logs.debug(f'Channel deleted from DB: channel={channel_name} / module: {module_name}')
return True return True
else: else:
return False return False

View File

@@ -11,6 +11,7 @@ class Command:
def __init__(self, loader: 'Loader'): def __init__(self, loader: 'Loader'):
self.Loader = loader self.Loader = loader
self.Base = loader.Base self.Base = loader.Base
self.Logs = loader.Logs
def build(self, new_command_obj: MCommand) -> bool: def build(self, new_command_obj: MCommand) -> bool:
@@ -45,6 +46,27 @@ class Command:
return False return False
def drop_command_by_module(self, module_name: str) -> bool:
"""Drop all command by module
Args:
module_name (str): The module name
Returns:
bool: True
"""
tmp_model: list[MCommand] = []
for command in self.DB_COMMANDS:
if command.module_name.lower() == module_name.lower():
tmp_model.append(command)
for c in tmp_model:
self.DB_COMMANDS.remove(c)
self.Logs.debug(f"[COMMAND] Drop command for module {module_name}")
return True
def get_ordered_commands(self) -> list[MCommand]: def get_ordered_commands(self) -> list[MCommand]:
return sorted(self.DB_COMMANDS, key=lambda c: (c.command_level, c.module_name)) return sorted(self.DB_COMMANDS, key=lambda c: (c.command_level, c.module_name))

View File

@@ -1,10 +1,14 @@
'''This class should never be reloaded. '''This class should never be reloaded.
''' '''
from logging import Logger
from threading import Timer, Thread, RLock from threading import Timer, Thread, RLock
from socket import socket from socket import socket
from typing import Any, Optional from typing import Any, Optional, TYPE_CHECKING
from core.definition import MSModule from core.definition import MSModule
if TYPE_CHECKING:
from core.classes.user import User
class Settings: class Settings:
"""This Class will never be reloaded. """This Class will never be reloaded.
Means that the variables are available during Means that the variables are available during
@@ -29,6 +33,17 @@ class Settings:
__CACHE: dict[str, Any] = {} __CACHE: dict[str, Any] = {}
"""Use set_cache or get_cache instead""" """Use set_cache or get_cache instead"""
__TRANSLATION: dict[str, list[list[str]]] = dict()
"""Translation Varibale"""
__LANG: str = "EN"
__INSTANCE_OF_USER_UTILS: Optional['User'] = None
"""Instance of the User Utils class"""
__LOGGER: Optional[Logger] = None
"""Instance of the logger"""
def set_cache(self, key: str, value_to_cache: Any): def set_cache(self, key: str, value_to_cache: Any):
"""When you want to store a variable """When you want to store a variable
@@ -57,3 +72,37 @@ class Settings:
def show_cache(self) -> dict[str, Any]: def show_cache(self) -> dict[str, Any]:
return self.__CACHE.copy() return self.__CACHE.copy()
@property
def global_translation(self) -> dict[str, list[list[str]]]:
return self.__TRANSLATION
@global_translation.setter
def global_translation(self, translation_var: dict) -> None:
self.__TRANSLATION = translation_var
@property
def global_lang(self) -> str:
return self.__LANG
@global_lang.setter
def global_lang(self, lang: str) -> None:
self.__LANG = lang
@property
def global_user(self) -> 'User':
return self.__INSTANCE_OF_USER_UTILS
@global_user.setter
def global_user(self, user_utils_instance: 'User') -> None:
self.__INSTANCE_OF_USER_UTILS = user_utils_instance
@property
def global_logger(self) -> Logger:
return self.__LOGGER
@global_logger.setter
def global_logger(self, logger: Logger) -> None:
self.__LOGGER = logger
global_settings = Settings()

View File

@@ -0,0 +1,92 @@
import yaml
import yaml.scanner
from os import sep
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from core.loader import Loader
class Translation:
def __init__(self, loader: 'Loader') -> None:
self.Logs = loader.Logs
self.Settings = loader.Settings
return None
def get_translation(self) -> dict[str, list[list[str]]]:
try:
translation: dict[str, list[list[str]]] = dict()
sfs: dict[str, list[list[str]]] = {}
module_translation_directory = Path("mods")
core_translation_directory = Path("core")
sfs_core = self.get_subfolders_name(core_translation_directory.__str__())
sfs_module = self.get_subfolders_name(module_translation_directory.__str__())
# Combine the 2 dict
for d in (sfs_core, sfs_module):
for k, v in d.items():
sfs.setdefault(k, []).extend(v)
loaded_files: list[str] = []
for module, filenames in sfs.items():
translation[module] = []
for filename in filenames:
with open(f"{filename}", "r", encoding="utf-8") as fyaml:
data: dict[str, list[dict[str, str]]] = yaml.safe_load(fyaml)
if not isinstance(data, dict):
continue
for key, list_trad in data.items():
for vlist in list_trad:
translation[module].append([vlist["orig"], vlist["trad"]])
loaded_files.append(f"{filename}")
return translation
except yaml.scanner.ScannerError as se:
self.Logs.error(f"[!] {se} [!]")
return {}
except yaml.YAMLError as ye:
if hasattr(ye, 'problem_mark'):
mark = ye.problem_mark
self.Logs.error(f"Error YAML: {ye.with_traceback(None)}")
self.Logs.error("Error position: (%s:%s)" % (mark.line+1, mark.column+1))
return {}
except yaml.error.MarkedYAMLError as me:
self.Logs.error(f"[!] {me} [!]")
return {}
except Exception as err:
self.Logs.error(f'General Error: {err}', exc_info=True)
return {}
finally:
self.Logs.debug("Translation files loaded")
for f in loaded_files:
self.Logs.debug(f" - {f}")
def get_subfolders_name(self, directory: str) -> dict[str, list[str]]:
try:
translation_information: dict[str, list[str]] = dict()
main_directory = Path(directory)
# Init the dictionnary
for subfolder in main_directory.rglob(f'*language{sep}*{sep}*.yaml'):
if subfolder.name != '__pycache__':
translation_information[subfolder.parent.name.lower()] = []
for subfolder in main_directory.rglob(f'*language{sep}*{sep}*.yaml'):
if subfolder.name != '__pycache__':
translation_information[subfolder.parent.name.lower()].append(subfolder)
return translation_information
except Exception as err:
self.Logs.error(f'General Error: {err}')
return {}

View File

@@ -10,10 +10,15 @@ class User:
UID_DB: list['MUser'] = [] UID_DB: list['MUser'] = []
@property
def get_current_user(self) -> 'MUser':
return self.current_user
def __init__(self, loader: 'Loader'): def __init__(self, loader: 'Loader'):
self.Logs = loader.Logs self.Logs = loader.Logs
self.Base = loader.Base self.Base = loader.Base
self.current_user: Optional['MUser'] = None
def insert(self, new_user: 'MUser') -> bool: def insert(self, new_user: 'MUser') -> bool:
"""Insert a new User object """Insert a new User object
@@ -126,8 +131,10 @@ class User:
""" """
for record in self.UID_DB: for record in self.UID_DB:
if record.uid == uidornickname: if record.uid == uidornickname:
self.current_user = record
return record return record
elif record.nickname == uidornickname: elif record.nickname == uidornickname:
self.current_user = record
return record return record
return None return None
@@ -147,6 +154,7 @@ class User:
if user_obj is None: if user_obj is None:
return None return None
self.current_user = user_obj
return user_obj.uid return user_obj.uid
def get_nickname(self, uidornickname:str) -> Optional[str]: def get_nickname(self, uidornickname:str) -> Optional[str]:
@@ -163,6 +171,7 @@ class User:
if user_obj is None: if user_obj is None:
return None return None
self.current_user = user_obj
return user_obj.nickname return user_obj.nickname
def get_user_asdict(self, uidornickname: str) -> Optional[dict[str, Any]]: def get_user_asdict(self, uidornickname: str) -> Optional[dict[str, Any]]:

View File

@@ -191,6 +191,9 @@ class MConfig(MainModel):
SERVICE_ID: str = field(init=False) SERVICE_ID: str = field(init=False)
"""The service unique ID""" """The service unique ID"""
LANG: str = "EN"
"""The default language of Defender. default: EN"""
OWNER: str = "admin" OWNER: str = "admin"
"""The nickname of the admin of the service""" """The nickname of the admin of the service"""

View File

@@ -16,12 +16,12 @@ class Install:
service_cmd_daemon_reload: list service_cmd_daemon_reload: list
defender_main_executable: str defender_main_executable: str
python_min_version: str python_min_version: str
python_current_version_tuple: tuple[str, str, str] python_current_version_tuple: tuple[int, int, int]
python_current_version: str python_current_version: tuple[int, int, int]
defender_install_folder: str defender_install_folder: str
venv_folder: str venv_folder: str
venv_cmd_installation: list venv_cmd_installation: list
venv_cmd_requirements: list venv_cmd_requirements: list[str]
venv_pip_executable: str venv_pip_executable: str
venv_python_executable: str venv_python_executable: str
@@ -70,24 +70,24 @@ class Install:
service_cmd_executable=['systemctl', '--user', 'start', 'defender'], service_cmd_executable=['systemctl', '--user', 'start', 'defender'],
service_cmd_daemon_reload=['systemctl', '--user', 'daemon-reload'], service_cmd_daemon_reload=['systemctl', '--user', 'daemon-reload'],
defender_main_executable=defender_main_executable, defender_main_executable=defender_main_executable,
python_min_version='3.10', python_min_version=(3, 10, 0),
python_current_version_tuple=python_version_tuple(), python_current_version_tuple=tuple(map(int, python_version_tuple())),
python_current_version=python_version(), python_current_version=python_version(),
defender_install_folder=defender_install_folder, defender_install_folder=defender_install_folder,
venv_folder=venv_folder, venv_folder=venv_folder,
venv_cmd_installation=['python3', '-m', 'venv', venv_folder], venv_cmd_installation=['python3', '-m', 'venv', venv_folder],
venv_cmd_requirements=['sqlalchemy','psutil','requests','faker','unrealircd_rpc_py'], venv_cmd_requirements=['sqlalchemy','psutil','requests','faker','pyyaml','unrealircd_rpc_py'],
venv_pip_executable=f'{os.path.join(defender_install_folder, venv_folder, "bin")}{os.sep}pip', venv_pip_executable=f'{os.path.join(defender_install_folder, venv_folder, "bin")}{os.sep}pip',
venv_python_executable=f'{os.path.join(defender_install_folder, venv_folder, "bin")}{os.sep}python' venv_python_executable=f'{os.path.join(defender_install_folder, venv_folder, "bin")}{os.sep}python'
) )
if not self.check_python_version(): if not self.check_python_version():
# If the Python version is not good then Exit # If the Python version is not good then Exit
exit("/!\\ Python version error /!\\") exit("[!] Python version error [!]")
if not os.path.exists(os.path.join(self.config.defender_install_folder, 'config', 'configuration.json')): if not os.path.exists(os.path.join(self.config.defender_install_folder, 'config', 'configuration.json')):
# If configuration file do not exist # If configuration file do not exist
exit("/!\\ Configuration file (core/configuration.json) doesn't exist! please create it /!\\") exit("[!] Configuration file (core/configuration.json) doesn't exist! please create it [!]")
# Exclude Windows OS from the installation # Exclude Windows OS from the installation
if os.name == 'nt': if os.name == 'nt':
@@ -98,7 +98,7 @@ class Install:
return False return False
if self.is_root(): if self.is_root():
exit(f'/!\\ I highly not recommend running Defender as root /!\\') exit(f'[!] I highly not recommend running Defender as root [!]')
self.skip_install = True self.skip_install = True
return False return False
@@ -108,7 +108,7 @@ class Install:
print('> User without privileges ==> OK') print('> User without privileges ==> OK')
return False return False
elif os.geteuid() == 0: elif os.geteuid() == 0:
print('/!\\ Do not use root to install Defender /!\\') print('[!] Do not use root to install Defender [!]')
exit("Do not use root to install Defender") exit("Do not use root to install Defender")
return True return True
@@ -117,13 +117,13 @@ class Install:
full_service_file_path = os.path.join(self.config.unix_systemd_folder, self.config.service_file_name) full_service_file_path = os.path.join(self.config.unix_systemd_folder, self.config.service_file_name)
if not os.path.exists(full_service_file_path): if not os.path.exists(full_service_file_path):
print(f'/!\\ Service file does not exist /!\\') print(f'[!] Service file does not exist [!]')
return True return True
# Check if virtual env exist # Check if virtual env exist
if not os.path.exists(f'{os.path.join(self.config.defender_install_folder, self.config.venv_folder)}'): if not os.path.exists(f'{os.path.join(self.config.defender_install_folder, self.config.venv_folder)}'):
self.run_subprocess(self.config.venv_cmd_installation) self.run_subprocess(self.config.venv_cmd_installation)
print(f'/!\\ Virtual env does not exist run the install /!\\') print(f'[!] Virtual env does not exist run the install [!]')
return True return True
def run_subprocess(self, command:list) -> None: def run_subprocess(self, command:list) -> None:
@@ -173,25 +173,19 @@ class Install:
print(f"> Checking for dependencies versions ==> WAIT") print(f"> Checking for dependencies versions ==> WAIT")
for package in self.DB_PACKAGES: for package in self.DB_PACKAGES:
newVersion = False newVersion = False
required_version = package.version _required_version = package.version
installed_version = None _installed_version: str = None
output = check_output([self.config.venv_pip_executable, 'show', package.name]) output = check_output([self.config.venv_pip_executable, 'show', package.name])
for line in output.decode().splitlines(): for line in output.decode().splitlines():
if line.startswith('Version:'): if line.startswith('Version:'):
installed_version = line.split(':')[1].strip() _installed_version = line.split(':')[1].strip()
break break
required_major, required_minor, required_patch = required_version.split('.') required_version = tuple(map(int, _required_version.split('.')))
installed_major, installed_minor, installed_patch = installed_version.split('.') installed_version = tuple(map(int, _installed_version.split('.')))
if required_major > installed_major: if required_version > installed_version:
print(f'> New version of {package.name} is available {installed_version} ==> {required_version}')
newVersion = True
elif required_major == installed_major and required_minor > installed_minor:
print(f'> New version of {package.name} is available {installed_version} ==> {required_version}')
newVersion = True
elif required_major == installed_major and required_minor == installed_minor and required_patch > installed_patch:
print(f'> New version of {package.name} is available {installed_version} ==> {required_version}') print(f'> New version of {package.name} is available {installed_version} ==> {required_version}')
newVersion = True newVersion = True
@@ -202,7 +196,7 @@ class Install:
return newVersion return newVersion
except CalledProcessError: except CalledProcessError:
print(f"/!\\ Package {package.name} not installed /!\\") print(f"[!] Package {package.name} not installed [!]")
except Exception as err: except Exception as err:
print(f"General Error: {err}") print(f"General Error: {err}")
@@ -212,23 +206,11 @@ class Install:
Returns: Returns:
bool: True si la version de python est autorisé sinon False bool: True si la version de python est autorisé sinon False
""" """
# Current system version if self.config.python_current_version_tuple < self.config.python_min_version:
sys_major, sys_minor, sys_patch = self.config.python_current_version_tuple
# min python version required
python_required_version = self.config.python_min_version.split('.')
min_major, min_minor = tuple((python_required_version[0], python_required_version[1]))
if int(sys_major) < int(min_major):
print(f"## Your python version must be greather than or equal to {self.config.python_min_version} ##")
return False
elif (int(sys_major) <= int(min_major)) and (int(sys_minor) < int(min_minor)):
print(f"## Your python version must be greather than or equal to {self.config.python_min_version} ##") print(f"## Your python version must be greather than or equal to {self.config.python_min_version} ##")
return False return False
print(f"> Version of python : {self.config.python_current_version} ==> OK") print(f"> Version of python : {self.config.python_current_version} ==> OK")
return True return True
def check_package(self, package_name) -> bool: def check_package(self, package_name) -> bool:
@@ -255,6 +237,7 @@ class Install:
do_install = True do_install = True
for module in self.config.venv_cmd_requirements: for module in self.config.venv_cmd_requirements:
module = module.replace('pyyaml', 'yaml')
if not self.check_package(module): if not self.check_package(module):
do_install = True do_install = True
@@ -284,7 +267,7 @@ class Install:
full_service_file_path = os.path.join(self.config.unix_systemd_folder, self.config.service_file_name) full_service_file_path = os.path.join(self.config.unix_systemd_folder, self.config.service_file_name)
if os.path.exists(full_service_file_path): if os.path.exists(full_service_file_path):
print(f'/!\\ Service file already exist /!\\') print(f'[!] Service file already exist [!]')
self.run_subprocess(self.config.service_cmd_executable) self.run_subprocess(self.config.service_cmd_executable)
return None return None

View File

@@ -9,7 +9,7 @@ from typing import TYPE_CHECKING, Any, Optional, Union
from core.classes import rehash from core.classes import rehash
from core.loader import Loader from core.loader import Loader
from core.classes.protocol import Protocol from core.classes.protocol import Protocol
from core.classes.commands import Command from core.utils import tr
if TYPE_CHECKING: if TYPE_CHECKING:
from core.definition import MSasl from core.definition import MSasl
@@ -476,6 +476,9 @@ class Irc:
""" """
try: try:
original_response: list[str] = data.copy() original_response: list[str] = data.copy()
RED = self.Config.COLORS.red
GREEN = self.Config.COLORS.green
NOGC = self.Config.COLORS.nogc
if len(original_response) < 2: if len(original_response) < 2:
self.Logs.warning(f'Size ({str(len(original_response))}) - {original_response}') self.Logs.warning(f'Size ({str(len(original_response))}) - {original_response}')
@@ -514,14 +517,14 @@ class Irc:
if sasl_obj.auth_success: if sasl_obj.auth_success:
self.insert_db_admin(sasl_obj.client_uid, sasl_obj.username, sasl_obj.level) self.insert_db_admin(sasl_obj.client_uid, sasl_obj.username, sasl_obj.level)
self.Protocol.send_priv_msg(nick_from=dnickname, self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {self.Config.COLORS.green}SASL AUTH{self.Config.COLORS.nogc} ] - {nickname} ({sasl_obj.username}) est désormais connecté a {dnickname}", msg=tr("[ %sSASL AUTH%s ] - %s (%s) is now connected successfuly to %s", GREEN, NOGC, nickname, sasl_obj.username, dnickname),
channel=dchanlog) channel=dchanlog)
self.Protocol.send_notice(nick_from=dnickname, nick_to=nickname, msg=f"Connexion a {dnickname} réussie!") self.Protocol.send_notice(nick_from=dnickname, nick_to=nickname, msg=tr("Successfuly connected to %s", dnickname))
else: else:
self.Protocol.send_priv_msg(nick_from=dnickname, self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {self.Config.COLORS.red}SASL AUTH{self.Config.COLORS.nogc} ] - {nickname} a tapé un mauvais mot de pass pour le username ({sasl_obj.username})", msg=tr("[ %sSASL AUTH%s ] - %s provided a wrong password for this username %s", RED, NOGC, nickname, sasl_obj.username),
channel=dchanlog) channel=dchanlog)
self.Protocol.send_notice(nick_from=dnickname, nick_to=nickname, msg=f"Mot de passe incorrecte") self.Protocol.send_notice(nick_from=dnickname, nick_to=nickname, msg=tr("Wrong password!"))
# Delete sasl object! # Delete sasl object!
self.Sasl.delete_sasl_client(uid) self.Sasl.delete_sasl_client(uid)
@@ -646,9 +649,9 @@ class Irc:
case 'notallowed': case 'notallowed':
try: try:
current_command = cmd[0] current_command = str(cmd[0])
self.Protocol.send_priv_msg( self.Protocol.send_priv_msg(
msg=f'[ {RED}{current_command}{NOGC} ] - Accès Refusé à {self.User.get_nickname(fromuser)}', msg=tr('[ %s%s%s ] - Access denied to %s', RED, current_command.upper(), NOGC, fromuser),
nick_from=dnickname, nick_from=dnickname,
channel=dchanlog channel=dchanlog
) )
@@ -656,7 +659,7 @@ class Irc:
self.Protocol.send_notice( self.Protocol.send_notice(
nick_from=dnickname, nick_from=dnickname,
nick_to=fromuser, nick_to=fromuser,
msg=f'Accès Refusé' msg=tr('Access denied!')
) )
except IndexError as ie: except IndexError as ie:
@@ -664,12 +667,12 @@ class Irc:
case 'deauth': case 'deauth':
current_command = cmd[0] current_command = str(cmd[0]).upper()
uid_to_deauth = self.User.get_uid(fromuser) uid_to_deauth = self.User.get_uid(fromuser)
self.delete_db_admin(uid_to_deauth) self.delete_db_admin(uid_to_deauth)
self.Protocol.send_priv_msg( self.Protocol.send_priv_msg(
msg=f"[ {RED}{str(current_command).upper()}{NOGC} ] - {self.User.get_nickname(fromuser)} est désormais déconnecter de {dnickname}", msg=tr("[ %s%s%s ] - %s has been disconnected from %s", RED, current_command, NOGC, fromuser, dnickname),
nick_from=dnickname, nick_from=dnickname,
channel=dchanlog channel=dchanlog
) )
@@ -688,7 +691,7 @@ class Irc:
self.Protocol.send_notice( self.Protocol.send_notice(
nick_from=dnickname, nick_from=dnickname,
nick_to=fromuser, nick_to=fromuser,
msg=f"You can't use this command anymore ! Please use [{self.Config.SERVICE_PREFIX}auth] instead" msg=tr("You can't use this command anymore ! Please use [%sauth] instead", self.Config.SERVICE_PREFIX)
) )
return False return False

View File

@@ -0,0 +1,4 @@
traduction:
# Message help
- orig: "Access denied!"
trad: "Accès refusé."

View File

@@ -1,5 +1,6 @@
from logging import Logger from logging import Logger
from core.classes import user, admin, client, channel, reputation, settings, sasl from core.classes.settings import global_settings
from core.classes import translation, user, admin, client, channel, reputation, settings, sasl
import core.logs as logs import core.logs as logs
import core.definition as df import core.definition as df
import core.utils as utils import core.utils as utils
@@ -26,18 +27,28 @@ class Loader:
self.Utils: utils = utils self.Utils: utils = utils
# Load Classes # Load Classes
self.Settings: settings.Settings = global_settings
self.ServiceLogging: logs.ServiceLogging = self.LoggingModule.ServiceLogging() self.ServiceLogging: logs.ServiceLogging = self.LoggingModule.ServiceLogging()
self.Logs: Logger = self.ServiceLogging.get_logger() self.Logs: Logger = self.ServiceLogging.get_logger()
self.Settings: settings.Settings = settings.Settings()
self.Config: df.MConfig = self.ConfModule.Configuration(self).get_config_model() self.Config: df.MConfig = self.ConfModule.Configuration(self).get_config_model()
self.Settings.global_lang = self.Config.LANG if self.Config.LANG else "EN"
self.Settings.global_logger = self.Logs
self.Translation: translation.Translation = translation.Translation(self)
self.Settings.global_translation = self.Translation.get_translation()
self.Base: base_mod.Base = self.BaseModule.Base(self) self.Base: base_mod.Base = self.BaseModule.Base(self)
self.User: user.User = user.User(self) self.User: user.User = user.User(self)
self.Settings.global_user = self.User
self.Client: client.Client = client.Client(self) self.Client: client.Client = client.Client(self)
self.Admin: admin.Admin = admin.Admin(self) self.Admin: admin.Admin = admin.Admin(self)
@@ -52,4 +63,4 @@ class Loader:
self.Sasl: sasl.Sasl = sasl.Sasl(self) self.Sasl: sasl.Sasl = sasl.Sasl(self)
self.Logs.debug("LOADER Success!") self.Logs.debug(self.Utils.tr("Loader %s success", __name__))

View File

@@ -2,6 +2,7 @@
Main utils library. Main utils library.
''' '''
import gc import gc
import glob
import ssl import ssl
import socket import socket
import sys import sys
@@ -13,10 +14,57 @@ from datetime import datetime, timedelta, timezone
from time import time from time import time
from random import choice from random import choice
from hashlib import md5, sha3_512 from hashlib import md5, sha3_512
from core.classes.settings import global_settings
if TYPE_CHECKING: if TYPE_CHECKING:
from core.irc import Irc from core.irc import Irc
def tr(message: str, *args) -> str:
"""Translation Engine system
```python
example:
_('Hello my firstname is %s and my lastname is %s', firstname, lastname)
```
Args:
message (str): The message to translate
*args (any) : Whatever the variable you want to pass
Returns:
str: The translated message
"""
count_args = len(args) # Count number of args sent
count_placeholder = message.count('%s') # Count number of placeholder in the message
is_args_available = True if args else False
g = global_settings
try:
# Access to user object ==> global_instance.get_user_option
client_language = global_settings.global_user.current_user.geoip if global_settings.global_user.current_user else 'en'
client_language = client_language if client_language else 'en'
if count_args != count_placeholder:
global_settings.global_logger.error(f"Translation: Original message: {message} | Args: {count_args} - Placeholder: {count_placeholder}")
return message
if g.global_lang is None:
return message % args if is_args_available else message
if g.global_lang.lower() == 'en':
return message % args if is_args_available else message
for trads in global_settings.global_translation[global_settings.global_lang.lower()]:
if sub(r"\s+", "", message) == sub(r"\s+", "", trads[0]):
return trads[1] % args if is_args_available else trads[1]
return message % args if is_args_available else message
except KeyError as ke:
g.global_logger.error(f"Key Error: {ke}")
return message % args if is_args_available else message
except Exception as err:
global_settings.global_logger.error(f"General Error: {err} / {message}")
return message
def convert_to_int(value: Any) -> Optional[int]: def convert_to_int(value: Any) -> Optional[int]:
"""Convert a value to int """Convert a value to int

View File

@@ -0,0 +1,4 @@
traduction:
# Message help
- orig: "Hi my name is clone-es"
trad: "Hola mi name is clone-es"

View File

@@ -0,0 +1,6 @@
traduction:
# Message help
- orig: "You are now logged in"
trad: "Vous étes désomais identifier"
- orig: "NSUser ==> nsuid: %s | cuid: %s | Account: %s | Nickname: %s | email: %s"
trad: "NSUser ==> nsuid: %s | cuid: %s | Compte: %s | Pseudo: %s | email: %s"

View File

View File

@@ -131,6 +131,8 @@ class Clone:
self.Protocol.send2socket(f":{self.Config.SERVICE_NICKNAME} MODE {self.Config.CLONE_CHANNEL} -k {self.Config.CLONE_CHANNEL_PASSWORD}") self.Protocol.send2socket(f":{self.Config.SERVICE_NICKNAME} MODE {self.Config.CLONE_CHANNEL} -k {self.Config.CLONE_CHANNEL_PASSWORD}")
self.Protocol.send_part_chan(self.Config.SERVICE_NICKNAME, self.Config.CLONE_CHANNEL) self.Protocol.send_part_chan(self.Config.SERVICE_NICKNAME, self.Config.CLONE_CHANNEL)
self.Irc.Commands.drop_command_by_module(self.module_name)
return None return None
def cmd(self, data:list) -> None: def cmd(self, data:list) -> None:

View File

@@ -172,7 +172,7 @@ class Command:
self.Base.db_update_core_config(self.module_name, self.ModConfig, param_key, param_value) self.Base.db_update_core_config(self.module_name, self.ModConfig, param_key, param_value)
def unload(self) -> None: def unload(self) -> None:
self.Irc.Commands.drop_command_by_module(self.module_name)
return None return None
def cmd(self, data: list[str]) -> None: def cmd(self, data: list[str]) -> None:

View File

@@ -1,8 +1,8 @@
import traceback from typing import TYPE_CHECKING
import mods.defender.schemas as schemas import mods.defender.schemas as schemas
import mods.defender.utils as utils import mods.defender.utils as utils
import mods.defender.threads as thds import mods.defender.threads as thds
from typing import TYPE_CHECKING from core.utils import tr
if TYPE_CHECKING: if TYPE_CHECKING:
from core.irc import Irc from core.irc import Irc
@@ -202,6 +202,8 @@ class Defender:
self.reputationTimer_isRunning:bool = False self.reputationTimer_isRunning:bool = False
self.autolimit_isRunning: bool = False self.autolimit_isRunning: bool = False
self.Irc.Commands.drop_command_by_module(self.module_name)
return None return None
def insert_db_trusted(self, uid: str, nickname:str) -> None: def insert_db_trusted(self, uid: str, nickname:str) -> None:
@@ -320,8 +322,7 @@ class Defender:
except IndexError as ie: except IndexError as ie:
self.Logs.error(f"{ie} / {cmd} / length {str(len(cmd))}") self.Logs.error(f"{ie} / {cmd} / length {str(len(cmd))}")
except Exception as err: except Exception as err:
self.Logs.error(f"General Error: {err}") self.Logs.error(f"General Error: {err}", exc_info=True)
traceback.print_exc()
def hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None: def hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:

View File

@@ -220,6 +220,7 @@ class Jsonrpc():
) )
self.Base.create_thread(func=self.Threads.thread_unsubscribe, func_args=(self, ), run_once=True) self.Base.create_thread(func=self.Threads.thread_unsubscribe, func_args=(self, ), run_once=True)
self.update_configuration('jsonrpc', 0) self.update_configuration('jsonrpc', 0)
self.Irc.Commands.drop_command_by_module(self.module_name)
self.Logs.debug(f"Unloading {self.module_name}") self.Logs.debug(f"Unloading {self.module_name}")
return None return None

View File

@@ -114,7 +114,7 @@ class Test():
self.Base.db_update_core_config(self.module_name, self.ModConfig, param_key, param_value) self.Base.db_update_core_config(self.module_name, self.ModConfig, param_key, param_value)
def unload(self) -> None: def unload(self) -> None:
self.Irc.Commands.drop_command_by_module(self.module_name)
return None return None
def cmd(self, data:list) -> None: def cmd(self, data:list) -> None:

View File

@@ -126,6 +126,8 @@ class Votekick:
self.VoteKickManager.VOTE_CHANNEL_DB = [] self.VoteKickManager.VOTE_CHANNEL_DB = []
self.Logs.debug(f'Delete memory DB VOTE_CHANNEL_DB: {self.VoteKickManager.VOTE_CHANNEL_DB}') self.Logs.debug(f'Delete memory DB VOTE_CHANNEL_DB: {self.VoteKickManager.VOTE_CHANNEL_DB}')
self.Irc.Commands.drop_command_by_module(self.module_name)
return None return None
except UnboundLocalError as ne: except UnboundLocalError as ne:
self.Logs.error(f'{ne}') self.Logs.error(f'{ne}')

View File

@@ -1,9 +1,10 @@
{ {
"version": "6.2.2", "version": "6.2.5",
"requests": "2.32.3", "requests": "2.32.3",
"psutil": "6.0.0", "psutil": "6.0.0",
"unrealircd_rpc_py": "2.0.5", "unrealircd_rpc_py": "2.0.5",
"sqlalchemy": "2.0.35", "sqlalchemy": "2.0.35",
"faker": "30.1.0" "faker": "30.1.0",
"pyyaml": "6.0.2"
} }