Merge pull request #89 from adator85/v6.2.5

V6.2.5
This commit is contained in:
adator
2025-08-29 21:45:20 +02:00
committed by GitHub
27 changed files with 527 additions and 143 deletions

View File

@@ -53,7 +53,7 @@ Pour Les prochains lancement de defender vous devez utiliser la commande suivant
$ cd IRC_DEFENDER_MODULES
$ python3 -m venv .pyenv
$ source .pyenv/bin/activate
(pyenv)$ pip install sqlalchemy, psutil, requests, faker, unrealircd_rpc_py
(pyenv)$ pip install sqlalchemy, psutil, requests, faker, unrealircd_rpc_py, pyyaml
# Créer un service nommé "defender.service"
# pour votre service et placer le dans "/PATH/TO/USER/.config/systemd/user/"

View File

@@ -296,13 +296,14 @@ class Base:
'password': password,
'hostname': '*',
'vhost': '*',
'language': 'EN',
'level': 5
}
self.db_execute_query(f"""
INSERT INTO {self.Config.TABLE_ADMIN}
(createdOn, user, password, hostname, vhost, level)
(createdOn, user, password, hostname, vhost, language, level)
VALUES
(:createdOn, :user, :password, :hostname, :vhost, :level)"""
(:createdOn, :user, :password, :hostname, :vhost, :language, :level)"""
, mes_donnees)
return None
@@ -564,6 +565,9 @@ class Base:
self.db_execute_query(table_core_channel)
self.db_execute_query(table_core_config)
# Patch database
self.db_patch(self.Config.TABLE_ADMIN, "language", "TEXT")
if self.install:
self.Loader.ModuleUtils.db_register_module('mod_command', 'sys', True)
self.Loader.ModuleUtils.db_register_module('mod_defender', 'sys', True)
@@ -584,6 +588,25 @@ class Base:
return response
def db_is_column_exist(self, table_name: str, column_name: str) -> bool:
q = self.db_execute_query(f"PRAGMA table_info({table_name})")
existing_columns = [col[1] for col in q.fetchall()]
if column_name in existing_columns:
return True
else:
return False
def db_patch(self, table_name: str, column_name: str, column_type: str) -> bool:
if not self.db_is_column_exist(table_name, column_name):
patch = f"ALTER TABLE {self.Config.TABLE_ADMIN} ADD COLUMN {column_name} {column_type}"
self.db_execute_query(patch)
self.logs.debug(f"The patch has been applied")
self.logs.debug(f"Table name: {table_name}, Column name: {column_name}, Column type: {column_type}")
return True
else:
return False
def db_close(self) -> None:
try:

View File

@@ -10,6 +10,11 @@ class Admin:
def __init__(self, loader: 'Loader') -> None:
self.Logs = loader.Logs
self.Base = loader.Base
self.Setting = loader.Settings
self.Config = loader.Config
self.User = loader.User
self.Definition = loader.Definition
def insert(self, new_admin: MAdmin) -> bool:
"""Insert a new admin object model
@@ -153,3 +158,46 @@ class Admin:
return record.nickname
return None
def get_language(self, uidornickname: str) -> Optional[str]:
"""Get the language of the admin
Args:
uidornickname (str): The user ID or the Nickname of the admin
Returns:
Optional[str]: The language selected by the admin.
"""
admin = self.get_admin(uidornickname)
if admin is None:
return None
return admin.language
def db_auth_admin_via_fingerprint(self, fp: str, uidornickname: str) -> bool:
"""Check the fingerprint
Args:
fp (str): The unique fingerprint of the user
uidornickname (str): The UID or the Nickname of the user
Returns:
bool: True if found
"""
query = f"SELECT user, level, language FROM {self.Config.TABLE_ADMIN} WHERE fingerprint = :fp"
data = {'fp': fp}
exe = self.Base.db_execute_query(query, data)
result = exe.fetchone()
if result:
account = result[0]
level = result[1]
language = result[2]
user_obj = self.User.get_user(uidornickname)
if user_obj:
admin_obj = self.Definition.MAdmin(**user_obj.to_dict(),account=account, level=level, language=language)
if self.insert(admin_obj):
self.Setting.current_admin = admin_obj
return True
return False

View File

@@ -211,15 +211,6 @@ class Channel:
return None
def get_channel_asdict(self, channel_name: str) -> Optional[dict[str, Any]]:
channel_obj: Optional['MChannel'] = self.get_channel(channel_name)
if channel_obj is None:
return None
return channel_obj.to_dict()
def is_valid_channel(self, channel_to_check: str) -> bool:
"""Check if the string has the # caractere and return True if this is a valid channel
@@ -276,7 +267,7 @@ class Channel:
mes_donnees = {'datetime': self.Utils.get_sdatetime(), 'channel_name': channel_name, 'module_name': module_name}
insert = self.Base.db_execute_query(f"INSERT INTO {core_table} (datetime, channel_name, module_name) VALUES (:datetime, :channel_name, :module_name)", mes_donnees)
if insert.rowcount:
self.Logs.debug(f'New channel added: channel={channel_name} / module_name={module_name}')
self.Logs.debug(f'Channel added to DB: channel={channel_name} / module_name={module_name}')
return True
else:
return False
@@ -286,7 +277,7 @@ class Channel:
response = self.Base.db_execute_query(f"DELETE FROM {core_table} WHERE channel_name = :channel_name AND module_name = :module_name", mes_donnes)
if response.rowcount > 0:
self.Logs.debug(f'Channel deleted: channel={channel_name} / module: {module_name}')
self.Logs.debug(f'Channel deleted from DB: channel={channel_name} / module: {module_name}')
return True
else:
return False

View File

@@ -11,6 +11,7 @@ class Command:
def __init__(self, loader: 'Loader'):
self.Loader = loader
self.Base = loader.Base
self.Logs = loader.Logs
def build(self, new_command_obj: MCommand) -> bool:
@@ -45,6 +46,27 @@ class Command:
return False
def drop_command_by_module(self, module_name: str) -> bool:
"""Drop all command by module
Args:
module_name (str): The module name
Returns:
bool: True
"""
tmp_model: list[MCommand] = []
for command in self.DB_COMMANDS:
if command.module_name.lower() == module_name.lower():
tmp_model.append(command)
for c in tmp_model:
self.DB_COMMANDS.remove(c)
self.Logs.debug(f"[COMMAND] Drop command for module {module_name}")
return True
def get_ordered_commands(self) -> list[MCommand]:
return sorted(self.DB_COMMANDS, key=lambda c: (c.command_level, c.module_name))

View File

@@ -4,6 +4,8 @@ from datetime import datetime
from typing import TYPE_CHECKING, Optional
from ssl import SSLEOFError, SSLError
from core.utils import tr
if TYPE_CHECKING:
from core.irc import Irc
from core.classes.sasl import Sasl
@@ -25,7 +27,7 @@ class Unrealircd6:
self.known_protocol: set[str] = {'SJOIN', 'UID', 'MD', 'QUIT', 'SQUIT',
'EOS', 'PRIVMSG', 'MODE', 'UMODE2',
'VERSION', 'REPUTATION', 'SVS2MODE',
'SLOG', 'NICK', 'PART', 'PONG', 'SASL',
'SLOG', 'NICK', 'PART', 'PONG', 'SASL', 'PING',
'PROTOCTL', 'SERVER', 'SMOD', 'TKL', 'NETINFO',
'006', '007', '018'}
@@ -567,6 +569,7 @@ class Unrealircd6:
def on_svs2mode(self, serverMsg: list[str]) -> None:
"""Handle svs2mode coming from a server
>>> [':00BAAAAAG', 'SVS2MODE', '001U01R03', '-r']
Args:
serverMsg (list[str]): Original server message
@@ -604,6 +607,7 @@ class Unrealircd6:
def on_umode2(self, serverMsg: list[str]) -> None:
"""Handle umode2 coming from a server
>>> [':adator_', 'UMODE2', '-i']
Args:
serverMsg (list[str]): Original server message
@@ -860,7 +864,7 @@ class Unrealircd6:
# Initialisation terminé aprés le premier PING
self.send_priv_msg(
nick_from=self.__Config.SERVICE_NICKNAME,
msg=f"[{self.__Config.COLORS.green}INFORMATION{self.__Config.COLORS.nogc}] >> Defender is ready",
msg=tr("[ %sINFORMATION%s ] >> %s is ready!", self.__Config.COLORS.green, self.__Config.COLORS.nogc, self.__Config.SERVICE_NICKNAME),
channel=self.__Config.SERVICE_CHANLOG
)
self.__Config.DEFENDER_INIT = 0
@@ -946,6 +950,11 @@ class Unrealircd6:
fp_match = match(pattern, serverMsg[0])
fingerprint = fp_match.group(1) if fp_match else None
# Extract tls_cipher information
pattern = r'^.*tls_cipher=([^;]+).*$'
tlsc_match = match(pattern, serverMsg[0])
tls_cipher = tlsc_match.group(1) if tlsc_match else None
if geoip_match:
geoip = geoip_match.group(1)
else:
@@ -963,6 +972,7 @@ class Unrealircd6:
umodes=umodes,
vhost=vhost,
fingerprint=fingerprint,
tls_cipher=tls_cipher,
isWebirc=isWebirc,
isWebsocket=isWebsocket,
remote_ip=remote_ip,
@@ -971,6 +981,21 @@ class Unrealircd6:
connexion_datetime=datetime.now()
)
)
# Auto Auth admin via fingerprint
dnickname = self.__Config.SERVICE_NICKNAME
dchanlog = self.__Config.SERVICE_CHANLOG
GREEN = self.__Config.COLORS.green
NOGC = self.__Config.COLORS.nogc
if self.__Irc.Admin.db_auth_admin_via_fingerprint(fingerprint, uid):
admin = self.__Irc.Admin.get_admin(uid)
account = admin.account if admin else ''
self.send_priv_msg(nick_from=dnickname,
msg=tr("[ %sSASL AUTO AUTH%s ] - %s (%s) is now connected successfuly to %s", GREEN, NOGC, nickname, account, dnickname),
channel=dchanlog)
self.send_notice(nick_from=dnickname, nick_to=nickname, msg=tr("Successfuly connected to %s", dnickname))
return None
except IndexError as ie:
self.__Logs.error(f"{__name__} - Index Error: {ie}")
@@ -1302,3 +1327,33 @@ class Unrealircd6:
except Exception as err:
self.__Logs.error(f'General Error: {err}', exc_info=True)
def on_md(self, serverMsg: list[str]) -> None:
"""Handle MD responses
[':001', 'MD', 'client', '001MYIZ03', 'certfp', ':d1235648...']
Args:
serverMsg (list[str]): The server reply
"""
try:
scopy = serverMsg.copy()
available_vars = ['creationtime', 'certfp', 'tls_cipher']
uid = str(scopy[3])
var = str(scopy[4]).lower()
value = str(scopy[5]).replace(':', '')
user_obj = self.__Irc.User.get_user(uid)
if user_obj is None:
return None
match var:
case 'certfp':
user_obj.fingerprint = value
case 'tls_cipher':
user_obj.tls_cipher = value
case _:
return None
...
except Exception as e:
self.__Logs.error(f"General Error: {e}")

View File

@@ -69,7 +69,7 @@ def rehash_service(uplink: 'Irc', nickname: str) -> None:
msg=f'[REHASH] Module [{mod}] reloaded',
channel=uplink.Config.SERVICE_CHANLOG
)
uplink.Utils = sys.modules['core.utils']
uplink.Config = uplink.Loader.ConfModule.Configuration(uplink.Loader).get_config_model()
uplink.Config.HSID = config_model_bakcup.HSID
uplink.Config.DEFENDER_INIT = config_model_bakcup.DEFENDER_INIT

View File

@@ -1,9 +1,14 @@
'''This class should never be reloaded.
'''
from logging import Logger
from threading import Timer, Thread, RLock
from socket import socket
from typing import Any, Optional
from core.definition import MSModule
from typing import Any, Optional, TYPE_CHECKING
from core.definition import MSModule, MAdmin
if TYPE_CHECKING:
from core.classes.user import User
from core.classes.admin import Admin
class Settings:
"""This Class will never be reloaded.
@@ -29,6 +34,20 @@ class Settings:
__CACHE: dict[str, Any] = {}
"""Use set_cache or get_cache instead"""
__TRANSLATION: dict[str, list[list[str]]] = dict()
"""Translation Varibale"""
__LANG: str = "EN"
__INSTANCE_OF_USER_UTILS: Optional['User'] = None
"""Instance of the User Utils class"""
__CURRENT_ADMIN: Optional['MAdmin'] = None
"""The Current Admin Object Model"""
__LOGGER: Optional[Logger] = None
"""Instance of the logger"""
def set_cache(self, key: str, value_to_cache: Any):
"""When you want to store a variable
@@ -57,3 +76,45 @@ class Settings:
def show_cache(self) -> dict[str, Any]:
return self.__CACHE.copy()
@property
def global_translation(self) -> dict[str, list[list[str]]]:
return self.__TRANSLATION
@global_translation.setter
def global_translation(self, translation_var: dict) -> None:
self.__TRANSLATION = translation_var
@property
def global_lang(self) -> str:
return self.__LANG
@global_lang.setter
def global_lang(self, lang: str) -> None:
self.__LANG = lang
@property
def global_user(self) -> 'User':
return self.__INSTANCE_OF_USER_UTILS
@global_user.setter
def global_user(self, user_utils_instance: 'User') -> None:
self.__INSTANCE_OF_USER_UTILS = user_utils_instance
@property
def current_admin(self) -> MAdmin:
return self.__CURRENT_ADMIN
@current_admin.setter
def current_admin(self, current_admin: MAdmin) -> None:
self.__CURRENT_ADMIN = current_admin
@property
def global_logger(self) -> Logger:
return self.__LOGGER
@global_logger.setter
def global_logger(self, logger: Logger) -> None:
self.__LOGGER = logger
global_settings = Settings()

View File

@@ -0,0 +1,92 @@
import yaml
import yaml.scanner
from os import sep
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from core.loader import Loader
class Translation:
def __init__(self, loader: 'Loader') -> None:
self.Logs = loader.Logs
self.Settings = loader.Settings
return None
def get_translation(self) -> dict[str, list[list[str]]]:
try:
translation: dict[str, list[list[str]]] = dict()
sfs: dict[str, list[list[str]]] = {}
module_translation_directory = Path("mods")
core_translation_directory = Path("core")
sfs_core = self.get_subfolders_name(core_translation_directory.__str__())
sfs_module = self.get_subfolders_name(module_translation_directory.__str__())
# Combine the 2 dict
for d in (sfs_core, sfs_module):
for k, v in d.items():
sfs.setdefault(k, []).extend(v)
loaded_files: list[str] = []
for module, filenames in sfs.items():
translation[module] = []
for filename in filenames:
with open(f"{filename}", "r", encoding="utf-8") as fyaml:
data: dict[str, list[dict[str, str]]] = yaml.safe_load(fyaml)
if not isinstance(data, dict):
continue
for key, list_trad in data.items():
for vlist in list_trad:
translation[module].append([vlist["orig"], vlist["trad"]])
loaded_files.append(f"{filename}")
return translation
except yaml.scanner.ScannerError as se:
self.Logs.error(f"[!] {se} [!]")
return {}
except yaml.YAMLError as ye:
if hasattr(ye, 'problem_mark'):
mark = ye.problem_mark
self.Logs.error(f"Error YAML: {ye.with_traceback(None)}")
self.Logs.error("Error position: (%s:%s)" % (mark.line+1, mark.column+1))
return {}
except yaml.error.MarkedYAMLError as me:
self.Logs.error(f"[!] {me} [!]")
return {}
except Exception as err:
self.Logs.error(f'General Error: {err}', exc_info=True)
return {}
finally:
self.Logs.debug("Translation files loaded")
for f in loaded_files:
self.Logs.debug(f" - {f}")
def get_subfolders_name(self, directory: str) -> dict[str, list[str]]:
try:
translation_information: dict[str, list[str]] = dict()
main_directory = Path(directory)
# Init the dictionnary
for subfolder in main_directory.rglob(f'*language{sep}*{sep}*.yaml'):
if subfolder.name != '__pycache__':
translation_information[subfolder.parent.name.lower()] = []
for subfolder in main_directory.rglob(f'*language{sep}*{sep}*.yaml'):
if subfolder.name != '__pycache__':
translation_information[subfolder.parent.name.lower()].append(subfolder)
return translation_information
except Exception as err:
self.Logs.error(f'General Error: {err}')
return {}

View File

@@ -10,10 +10,15 @@ class User:
UID_DB: list['MUser'] = []
@property
def get_current_user(self) -> 'MUser':
return self.current_user
def __init__(self, loader: 'Loader'):
self.Logs = loader.Logs
self.Base = loader.Base
self.current_user: Optional['MUser'] = None
def insert(self, new_user: 'MUser') -> bool:
"""Insert a new User object
@@ -126,8 +131,10 @@ class User:
"""
for record in self.UID_DB:
if record.uid == uidornickname:
self.current_user = record
return record
elif record.nickname == uidornickname:
self.current_user = record
return record
return None
@@ -147,6 +154,7 @@ class User:
if user_obj is None:
return None
self.current_user = user_obj
return user_obj.uid
def get_nickname(self, uidornickname:str) -> Optional[str]:
@@ -163,6 +171,7 @@ class User:
if user_obj is None:
return None
self.current_user = user_obj
return user_obj.nickname
def get_user_asdict(self, uidornickname: str) -> Optional[dict[str, Any]]:

View File

@@ -31,6 +31,7 @@ class MClient(MainModel):
umodes: str = None
vhost: str = None
fingerprint: str = None
tls_cipher: str = None
isWebirc: bool = False
isWebsocket: bool = False
remote_ip: str = None
@@ -50,6 +51,7 @@ class MUser(MainModel):
umodes: str = None
vhost: str = None
fingerprint: str = None
tls_cipher: str = None
isWebirc: bool = False
isWebsocket: bool = False
remote_ip: str = None
@@ -70,12 +72,14 @@ class MAdmin(MainModel):
umodes: str = None
vhost: str = None
fingerprint: str = None
tls_cipher: str = None
isWebirc: bool = False
isWebsocket: bool = False
remote_ip: str = None
score_connexion: int = 0
geoip: str = None
connexion_datetime: datetime = field(default=datetime.now())
language: str = "EN"
level: int = 0
@dataclass
@@ -191,6 +195,9 @@ class MConfig(MainModel):
SERVICE_ID: str = field(init=False)
"""The service unique ID"""
LANG: str = "EN"
"""The default language of Defender. default: EN"""
OWNER: str = "admin"
"""The nickname of the admin of the service"""
@@ -359,5 +366,6 @@ class MSasl(MainModel):
username: Optional[str] = None
password: Optional[str] = None
fingerprint: Optional[str] = None
language: str = "EN"
auth_success: bool = False
level: int = 0

View File

@@ -16,12 +16,12 @@ class Install:
service_cmd_daemon_reload: list
defender_main_executable: str
python_min_version: str
python_current_version_tuple: tuple[str, str, str]
python_current_version: str
python_current_version_tuple: tuple[int, int, int]
python_current_version: tuple[int, int, int]
defender_install_folder: str
venv_folder: str
venv_cmd_installation: list
venv_cmd_requirements: list
venv_cmd_requirements: list[str]
venv_pip_executable: str
venv_python_executable: str
@@ -70,24 +70,24 @@ class Install:
service_cmd_executable=['systemctl', '--user', 'start', 'defender'],
service_cmd_daemon_reload=['systemctl', '--user', 'daemon-reload'],
defender_main_executable=defender_main_executable,
python_min_version='3.10',
python_current_version_tuple=python_version_tuple(),
python_min_version=(3, 10, 0),
python_current_version_tuple=tuple(map(int, python_version_tuple())),
python_current_version=python_version(),
defender_install_folder=defender_install_folder,
venv_folder=venv_folder,
venv_cmd_installation=['python3', '-m', 'venv', venv_folder],
venv_cmd_requirements=['sqlalchemy','psutil','requests','faker','unrealircd_rpc_py'],
venv_cmd_requirements=['sqlalchemy','psutil','requests','faker','pyyaml','unrealircd_rpc_py'],
venv_pip_executable=f'{os.path.join(defender_install_folder, venv_folder, "bin")}{os.sep}pip',
venv_python_executable=f'{os.path.join(defender_install_folder, venv_folder, "bin")}{os.sep}python'
)
if not self.check_python_version():
# If the Python version is not good then Exit
exit("/!\\ Python version error /!\\")
exit("[!] Python version error [!]")
if not os.path.exists(os.path.join(self.config.defender_install_folder, 'config', 'configuration.json')):
# If configuration file do not exist
exit("/!\\ Configuration file (core/configuration.json) doesn't exist! please create it /!\\")
exit("[!] Configuration file (core/configuration.json) doesn't exist! please create it [!]")
# Exclude Windows OS from the installation
if os.name == 'nt':
@@ -98,7 +98,7 @@ class Install:
return False
if self.is_root():
exit(f'/!\\ I highly not recommend running Defender as root /!\\')
exit(f'[!] I highly not recommend running Defender as root [!]')
self.skip_install = True
return False
@@ -108,7 +108,7 @@ class Install:
print('> User without privileges ==> OK')
return False
elif os.geteuid() == 0:
print('/!\\ Do not use root to install Defender /!\\')
print('[!] Do not use root to install Defender [!]')
exit("Do not use root to install Defender")
return True
@@ -117,13 +117,13 @@ class Install:
full_service_file_path = os.path.join(self.config.unix_systemd_folder, self.config.service_file_name)
if not os.path.exists(full_service_file_path):
print(f'/!\\ Service file does not exist /!\\')
print(f'[!] Service file does not exist [!]')
return True
# Check if virtual env exist
if not os.path.exists(f'{os.path.join(self.config.defender_install_folder, self.config.venv_folder)}'):
self.run_subprocess(self.config.venv_cmd_installation)
print(f'/!\\ Virtual env does not exist run the install /!\\')
print(f'[!] Virtual env does not exist run the install [!]')
return True
def run_subprocess(self, command:list) -> None:
@@ -173,25 +173,19 @@ class Install:
print(f"> Checking for dependencies versions ==> WAIT")
for package in self.DB_PACKAGES:
newVersion = False
required_version = package.version
installed_version = None
_required_version = package.version
_installed_version: str = None
output = check_output([self.config.venv_pip_executable, 'show', package.name])
for line in output.decode().splitlines():
if line.startswith('Version:'):
installed_version = line.split(':')[1].strip()
_installed_version = line.split(':')[1].strip()
break
required_major, required_minor, required_patch = required_version.split('.')
installed_major, installed_minor, installed_patch = installed_version.split('.')
required_version = tuple(map(int, _required_version.split('.')))
installed_version = tuple(map(int, _installed_version.split('.')))
if required_major > installed_major:
print(f'> New version of {package.name} is available {installed_version} ==> {required_version}')
newVersion = True
elif required_major == installed_major and required_minor > installed_minor:
print(f'> New version of {package.name} is available {installed_version} ==> {required_version}')
newVersion = True
elif required_major == installed_major and required_minor == installed_minor and required_patch > installed_patch:
if required_version > installed_version:
print(f'> New version of {package.name} is available {installed_version} ==> {required_version}')
newVersion = True
@@ -202,7 +196,7 @@ class Install:
return newVersion
except CalledProcessError:
print(f"/!\\ Package {package.name} not installed /!\\")
print(f"[!] Package {package.name} not installed [!]")
except Exception as err:
print(f"General Error: {err}")
@@ -212,23 +206,11 @@ class Install:
Returns:
bool: True si la version de python est autorisé sinon False
"""
# Current system version
sys_major, sys_minor, sys_patch = self.config.python_current_version_tuple
# min python version required
python_required_version = self.config.python_min_version.split('.')
min_major, min_minor = tuple((python_required_version[0], python_required_version[1]))
if int(sys_major) < int(min_major):
print(f"## Your python version must be greather than or equal to {self.config.python_min_version} ##")
return False
elif (int(sys_major) <= int(min_major)) and (int(sys_minor) < int(min_minor)):
if self.config.python_current_version_tuple < self.config.python_min_version:
print(f"## Your python version must be greather than or equal to {self.config.python_min_version} ##")
return False
print(f"> Version of python : {self.config.python_current_version} ==> OK")
return True
def check_package(self, package_name) -> bool:
@@ -255,6 +237,7 @@ class Install:
do_install = True
for module in self.config.venv_cmd_requirements:
module = module.replace('pyyaml', 'yaml')
if not self.check_package(module):
do_install = True
@@ -284,7 +267,7 @@ class Install:
full_service_file_path = os.path.join(self.config.unix_systemd_folder, self.config.service_file_name)
if os.path.exists(full_service_file_path):
print(f'/!\\ Service file already exist /!\\')
print(f'[!] Service file already exist [!]')
self.run_subprocess(self.config.service_cmd_executable)
return None

View File

@@ -9,7 +9,7 @@ from typing import TYPE_CHECKING, Any, Optional, Union
from core.classes import rehash
from core.loader import Loader
from core.classes.protocol import Protocol
from core.classes.commands import Command
from core.utils import tr
if TYPE_CHECKING:
from core.definition import MSasl
@@ -313,15 +313,15 @@ class Irc:
def db_get_admin_info(*, username: Optional[str] = None, password: Optional[str] = None, fingerprint: Optional[str] = None) -> Optional[dict[str, Any]]:
if fingerprint:
mes_donnees = {'fingerprint': fingerprint}
query = f"SELECT user, level FROM {self.Config.TABLE_ADMIN} WHERE fingerprint = :fingerprint"
query = f"SELECT user, level, language FROM {self.Config.TABLE_ADMIN} WHERE fingerprint = :fingerprint"
else:
mes_donnees = {'user': username, 'password': self.Utils.hash_password(password)}
query = f"SELECT user, level FROM {self.Config.TABLE_ADMIN} WHERE user = :user AND password = :password"
query = f"SELECT user, level, language FROM {self.Config.TABLE_ADMIN} WHERE user = :user AND password = :password"
result = self.Base.db_execute_query(query, mes_donnees)
user_from_db = result.fetchone()
if user_from_db:
return {'user': user_from_db[0], 'level': user_from_db[1]}
return {'user': user_from_db[0], 'level': user_from_db[1], 'language': user_from_db[2]}
else:
return None
@@ -331,6 +331,7 @@ class Irc:
if admin_info is not None:
s.auth_success = True
s.level = admin_info.get('level', 0)
s.language = admin_info.get('language', 'EN')
self.Protocol.send2socket(f":{self.Config.SERVEUR_LINK} SASL {self.Settings.MAIN_SERVER_HOSTNAME} {s.client_uid} D S")
self.Protocol.send2socket(f":{self.Config.SERVEUR_LINK} 903 {s.username} :SASL authentication successful")
else:
@@ -345,6 +346,7 @@ class Irc:
s.auth_success = True
s.level = admin_info.get('level', 0)
s.username = admin_info.get('user', None)
s.language = admin_info.get('language', 'EN')
self.Protocol.send2socket(f":{self.Config.SERVEUR_LINK} SASL {self.Settings.MAIN_SERVER_HOSTNAME} {s.client_uid} D S")
self.Protocol.send2socket(f":{self.Config.SERVEUR_LINK} 903 {s.username} :SASL authentication successful")
else:
@@ -380,14 +382,16 @@ class Irc:
time.sleep(beat)
self.Base.execute_periodic_action()
def insert_db_admin(self, uid: str, account: str, level: int) -> None:
def insert_db_admin(self, uid: str, account: str, level: int, language: str) -> None:
user_obj = self.User.get_user(uid)
if user_obj is None:
return None
self.Admin.insert(
self.Loader.Definition.MAdmin(
**user_obj.to_dict(),
language=language,
account=account,
level=int(level)
)
@@ -476,13 +480,17 @@ class Irc:
"""
try:
original_response: list[str] = data.copy()
RED = self.Config.COLORS.red
GREEN = self.Config.COLORS.green
NOGC = self.Config.COLORS.nogc
if len(original_response) < 2:
self.Logs.warning(f'Size ({str(len(original_response))}) - {original_response}')
return None
self.Logs.debug(f">> {self.Utils.hide_sensitive_data(original_response)}")
parsed_protocol = self.Protocol.parse_server_msg(original_response.copy())
# parsed_protocol = self.Protocol.parse_server_msg(original_response.copy())
pos, parsed_protocol = self.Protocol.get_ircd_protocol_poisition(cmd=original_response)
match parsed_protocol:
case 'PING':
@@ -512,16 +520,16 @@ class Irc:
sasl_obj = self.Sasl.get_sasl_obj(uid)
if sasl_obj:
if sasl_obj.auth_success:
self.insert_db_admin(sasl_obj.client_uid, sasl_obj.username, sasl_obj.level)
self.insert_db_admin(sasl_obj.client_uid, sasl_obj.username, sasl_obj.level, sasl_obj.language)
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {self.Config.COLORS.green}SASL AUTH{self.Config.COLORS.nogc} ] - {nickname} ({sasl_obj.username}) est désormais connecté a {dnickname}",
msg=tr("[ %sSASL AUTH%s ] - %s (%s) is now connected successfuly to %s", GREEN, NOGC, nickname, sasl_obj.username, dnickname),
channel=dchanlog)
self.Protocol.send_notice(nick_from=dnickname, nick_to=nickname, msg=f"Connexion a {dnickname} réussie!")
self.Protocol.send_notice(nick_from=dnickname, nick_to=nickname, msg=tr("Successfuly connected to %s", dnickname))
else:
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {self.Config.COLORS.red}SASL AUTH{self.Config.COLORS.nogc} ] - {nickname} a tapé un mauvais mot de pass pour le username ({sasl_obj.username})",
msg=tr("[ %sSASL AUTH%s ] - %s provided a wrong password for this username %s", RED, NOGC, nickname, sasl_obj.username),
channel=dchanlog)
self.Protocol.send_notice(nick_from=dnickname, nick_to=nickname, msg=f"Mot de passe incorrecte")
self.Protocol.send_notice(nick_from=dnickname, nick_to=nickname, msg=tr("Wrong password!"))
# Delete sasl object!
self.Sasl.delete_sasl_client(uid)
@@ -537,7 +545,6 @@ class Irc:
self.Protocol.on_protoctl(serverMsg=original_response)
case 'SVS2MODE':
# >> [':00BAAAAAG', 'SVS2MODE', '001U01R03', '-r']
self.Protocol.on_svs2mode(serverMsg=original_response)
case 'SQUIT':
@@ -550,7 +557,6 @@ class Irc:
self.Protocol.on_version_msg(serverMsg=original_response)
case 'UMODE2':
# [':adator_', 'UMODE2', '-i']
self.Protocol.on_umode2(serverMsg=original_response)
case 'NICK':
@@ -566,15 +572,15 @@ class Irc:
sasl_response = self.Protocol.on_sasl(original_response, self.Sasl)
self.on_sasl_authentication_process(sasl_response)
case 'SLOG': # TODO
self.Logs.debug(f"[!] TO HANDLE: {parsed_protocol}")
case 'MD': # TODO
self.Logs.debug(f"[!] TO HANDLE: {parsed_protocol}")
case 'MD':
self.Protocol.on_md(serverMsg=original_response)
case 'PRIVMSG':
self.Protocol.on_privmsg(serverMsg=original_response)
case 'SLOG': # TODO
self.Logs.debug(f"[!] TO HANDLE: {parsed_protocol}")
case 'PONG': # TODO
self.Logs.debug(f"[!] TO HANDLE: {parsed_protocol}")
@@ -619,7 +625,8 @@ class Irc:
"""
fromuser = self.User.get_nickname(user) # Nickname qui a lancé la commande
uid = self.User.get_uid(fromuser) # Récuperer le uid de l'utilisateur
uid = self.User.get_uid(user) # Récuperer le uid de l'utilisateur
self.Settings.current_admin = self.Admin.get_admin(user) # set Current admin if any.
RED = self.Config.COLORS.red
GREEN = self.Config.COLORS.green
@@ -646,9 +653,9 @@ class Irc:
case 'notallowed':
try:
current_command = cmd[0]
current_command = str(cmd[0])
self.Protocol.send_priv_msg(
msg=f'[ {RED}{current_command}{NOGC} ] - Accès Refusé à {self.User.get_nickname(fromuser)}',
msg=tr('[ %s%s%s ] - Access denied to %s', RED, current_command.upper(), NOGC, fromuser),
nick_from=dnickname,
channel=dchanlog
)
@@ -656,7 +663,7 @@ class Irc:
self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg=f'Accès Refusé'
msg=tr('Access denied!')
)
except IndexError as ie:
@@ -664,12 +671,12 @@ class Irc:
case 'deauth':
current_command = cmd[0]
current_command = str(cmd[0]).upper()
uid_to_deauth = self.User.get_uid(fromuser)
self.delete_db_admin(uid_to_deauth)
self.Protocol.send_priv_msg(
msg=f"[ {RED}{str(current_command).upper()}{NOGC} ] - {self.User.get_nickname(fromuser)} est désormais déconnecter de {dnickname}",
msg=tr("[ %s%s%s ] - %s has been disconnected from %s", RED, current_command, NOGC, fromuser, dnickname),
nick_from=dnickname,
channel=dchanlog
)
@@ -688,7 +695,7 @@ class Irc:
self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg=f"You can't use this command anymore ! Please use [{self.Config.SERVICE_PREFIX}auth] instead"
msg=tr("You can't use this command anymore ! Please use [%sauth] instead", self.Config.SERVICE_PREFIX)
)
return False
@@ -733,7 +740,7 @@ class Irc:
if cmd_owner == config_owner and cmd_password == config_password:
self.Base.db_create_first_admin()
self.insert_db_admin(current_uid, cmd_owner, 5)
self.insert_db_admin(current_uid, cmd_owner, 5, self.Config.LANG)
self.Protocol.send_priv_msg(
msg=f"[ {self.Config.COLORS.green}{str(current_command).upper()} ]{self.Config.COLORS.black} - {self.User.get_nickname(fromuser)} est désormais connecté a {dnickname}",
nick_from=dnickname,
@@ -784,14 +791,15 @@ class Irc:
return None
mes_donnees = {'user': user_to_log, 'password': self.Loader.Utils.hash_password(password)}
query = f"SELECT id, user, level FROM {self.Config.TABLE_ADMIN} WHERE user = :user AND password = :password"
query = f"SELECT id, user, level, language FROM {self.Config.TABLE_ADMIN} WHERE user = :user AND password = :password"
result = self.Base.db_execute_query(query, mes_donnees)
user_from_db = result.fetchone()
if user_from_db:
account = user_from_db[1]
level = user_from_db[2]
self.insert_db_admin(current_client.uid, account, level)
account = str(user_from_db[1])
level = int(user_from_db[2])
language = str(user_from_db[3])
self.insert_db_admin(current_client.uid, account, level, language)
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {GREEN}{str(current_command).upper()}{NOGC} ] - {current_client.nickname} ({account}) est désormais connecté a {dnickname}",
channel=dchanlog)
@@ -1189,14 +1197,14 @@ class Irc:
self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg=f"{module} - {GREEN}Loaded{NOGC} by {loaded_user} on {loaded_datetime}"
msg=tr('%s - %sLoaded%s by %s on %s', module, GREEN, NOGC, loaded_user, loaded_datetime)
)
loaded = False
else:
self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg=f"{module} - {RED}Not Loaded{NOGC}"
msg=tr('%s - %sNot Loaded%s', module, GREEN, NOGC)
)
case 'show_timers':
@@ -1266,7 +1274,7 @@ class Irc:
self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg=f"UID : {db_admin.uid} - Nickname: {db_admin.nickname} - Account: {db_admin.account} - Level: {db_admin.level} - Connection: {db_admin.connexion_datetime}"
msg=f"UID : {db_admin.uid} - Nickname: {db_admin.nickname} - Account: {db_admin.account} - Level: {db_admin.level} - Language: {db_admin.language} - Connection: {db_admin.connexion_datetime}"
)
return None

View File

@@ -0,0 +1,13 @@
traduction:
# Message help
- orig: "Access denied!"
trad: "Accès refusé."
- orig: "%s - %sLoaded%s by %s on %s"
trad: "%s - %sChargé%s par %s le %s"
- orig: "%s - %sNot Loaded%s"
trad: "%s - %sNon chargé%s"
- orig: "Successfuly connected to %s"
trad: "Connecter a %s avec succés"
- orig: "[ %sINFORMATION%s ] >> %s is ready!"
trad: "[ %sINFORMATION%s ] >> %s est prêt!"

View File

@@ -1,5 +1,6 @@
from logging import Logger
from core.classes import user, admin, client, channel, reputation, settings, sasl
from core.classes.settings import global_settings
from core.classes import translation, user, admin, client, channel, reputation, settings, sasl
import core.logs as logs
import core.definition as df
import core.utils as utils
@@ -26,18 +27,28 @@ class Loader:
self.Utils: utils = utils
# Load Classes
self.Settings: settings.Settings = global_settings
self.ServiceLogging: logs.ServiceLogging = self.LoggingModule.ServiceLogging()
self.Logs: Logger = self.ServiceLogging.get_logger()
self.Settings: settings.Settings = settings.Settings()
self.Config: df.MConfig = self.ConfModule.Configuration(self).get_config_model()
self.Settings.global_lang = self.Config.LANG if self.Config.LANG else "EN"
self.Settings.global_logger = self.Logs
self.Translation: translation.Translation = translation.Translation(self)
self.Settings.global_translation = self.Translation.get_translation()
self.Base: base_mod.Base = self.BaseModule.Base(self)
self.User: user.User = user.User(self)
self.Settings.global_user = self.User
self.Client: client.Client = client.Client(self)
self.Admin: admin.Admin = admin.Admin(self)
@@ -52,4 +63,4 @@ class Loader:
self.Sasl: sasl.Sasl = sasl.Sasl(self)
self.Logs.debug("LOADER Success!")
self.Logs.debug(self.Utils.tr("Loader %s success", __name__))

View File

@@ -13,10 +13,56 @@ from datetime import datetime, timedelta, timezone
from time import time
from random import choice
from hashlib import md5, sha3_512
from core.classes.settings import global_settings
if TYPE_CHECKING:
from core.irc import Irc
def tr(message: str, *args) -> str:
"""Translation Engine system
```python
example:
_('Hello my firstname is %s and my lastname is %s', firstname, lastname)
```
Args:
message (str): The message to translate
*args (any) : Whatever the variable you want to pass
Returns:
str: The translated message
"""
count_args = len(args) # Count number of args sent
count_placeholder = message.count('%s') # Count number of placeholder in the message
is_args_available = True if args else False
g = global_settings
try:
# Access to user object ==> global_instance.get_user_option
client_language = g.current_admin.language if g.current_admin else g.global_lang
if count_args != count_placeholder:
g.global_logger.error(f"Translation: Original message: {message} | Args: {count_args} - Placeholder: {count_placeholder}")
return message
if g.global_lang is None:
return message % args if is_args_available else message
if client_language.lower() == 'en':
return message % args if is_args_available else message
for trads in g.global_translation[client_language.lower()]:
if sub(r"\s+", "", message) == sub(r"\s+", "", trads[0]):
return trads[1] % args if is_args_available else trads[1]
return message % args if is_args_available else message
except KeyError as ke:
g.global_logger.error(f"Key Error: {ke}")
return message % args if is_args_available else message
except Exception as err:
global_settings.global_logger.error(f"General Error: {err} / {message}")
return message
def convert_to_int(value: Any) -> Optional[int]:
"""Convert a value to int

View File

@@ -5,7 +5,7 @@ from core import installation
# Requierements : #
# Python3.10 or higher #
# SQLAlchemy, requests, psutil #
# unrealircd-rpc-py #
# unrealircd-rpc-py, pyyaml #
# UnrealIRCD 6.2.2 or higher #
#############################################

View File

@@ -0,0 +1,4 @@
traduction:
# Message help
- orig: "Hi my name is clone-es"
trad: "Hola mi name is clone-es"

View File

@@ -0,0 +1,6 @@
traduction:
# Message help
- orig: "You are now logged in"
trad: "Vous étes désomais identifier"
- orig: "NSUser ==> nsuid: %s | cuid: %s | Account: %s | Nickname: %s | email: %s"
trad: "NSUser ==> nsuid: %s | cuid: %s | Compte: %s | Pseudo: %s | email: %s"

View File

View File

@@ -131,6 +131,8 @@ class Clone:
self.Protocol.send2socket(f":{self.Config.SERVICE_NICKNAME} MODE {self.Config.CLONE_CHANNEL} -k {self.Config.CLONE_CHANNEL_PASSWORD}")
self.Protocol.send_part_chan(self.Config.SERVICE_NICKNAME, self.Config.CLONE_CHANNEL)
self.Irc.Commands.drop_command_by_module(self.module_name)
return None
def cmd(self, data:list) -> None:
@@ -200,7 +202,7 @@ class Clone:
except Exception as err:
self.Logs.error(f'{err}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone connect [number of clone you want to connect] [Group]")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone connect [number of clone you want to connect] [Group] [freq]")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Exemple /msg {dnickname} clone connect 6 Ambiance")
case 'kill':
@@ -230,8 +232,7 @@ class Clone:
except Exception as err:
self.Logs.error(f'{err}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone kill all")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone kill clone_nickname")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone kill [all | group name | nickname]")
case 'join':
try:
@@ -260,8 +261,7 @@ class Clone:
except Exception as err:
self.Logs.error(f'{err}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone join all #channel")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone join clone_nickname #channel")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone join [all | group name | nickname] #channel")
case 'part':
try:
@@ -291,8 +291,7 @@ class Clone:
except Exception as err:
self.Logs.error(f'{err}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone part all #channel")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone part clone_nickname #channel")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone part [all | group name | nickname] #channel")
case 'list':
try:

View File

@@ -172,7 +172,7 @@ class Command:
self.Base.db_update_core_config(self.module_name, self.ModConfig, param_key, param_value)
def unload(self) -> None:
self.Irc.Commands.drop_command_by_module(self.module_name)
return None
def cmd(self, data: list[str]) -> None:

View File

@@ -1,8 +1,8 @@
import traceback
from typing import TYPE_CHECKING
import mods.defender.schemas as schemas
import mods.defender.utils as utils
import mods.defender.threads as thds
from typing import TYPE_CHECKING
from core.utils import tr
if TYPE_CHECKING:
from core.irc import Irc
@@ -202,6 +202,8 @@ class Defender:
self.reputationTimer_isRunning:bool = False
self.autolimit_isRunning: bool = False
self.Irc.Commands.drop_command_by_module(self.module_name)
return None
def insert_db_trusted(self, uid: str, nickname:str) -> None:
@@ -320,8 +322,7 @@ class Defender:
except IndexError as ie:
self.Logs.error(f"{ie} / {cmd} / length {str(len(cmd))}")
except Exception as err:
self.Logs.error(f"General Error: {err}")
traceback.print_exc()
self.Logs.error(f"General Error: {err}", exc_info=True)
def hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
@@ -948,7 +949,7 @@ class Defender:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' WebWebsocket : {UserObject.isWebsocket}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' REPUTATION : {UserObject.score_connexion}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' MODES : {UserObject.umodes}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' CHANNELS : {channels}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' CHANNELS : {", ".join(channels)}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' CONNECTION TIME : {UserObject.connexion_datetime}')
else:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"This user {nickoruid} doesn't exist")

View File

@@ -220,6 +220,7 @@ class Jsonrpc():
)
self.Base.create_thread(func=self.Threads.thread_unsubscribe, func_args=(self, ), run_once=True)
self.update_configuration('jsonrpc', 0)
self.Irc.Commands.drop_command_by_module(self.module_name)
self.Logs.debug(f"Unloading {self.module_name}")
return None

View File

@@ -114,7 +114,7 @@ class Test():
self.Base.db_update_core_config(self.module_name, self.ModConfig, param_key, param_value)
def unload(self) -> None:
self.Irc.Commands.drop_command_by_module(self.module_name)
return None
def cmd(self, data:list) -> None:

View File

@@ -126,6 +126,8 @@ class Votekick:
self.VoteKickManager.VOTE_CHANNEL_DB = []
self.Logs.debug(f'Delete memory DB VOTE_CHANNEL_DB: {self.VoteKickManager.VOTE_CHANNEL_DB}')
self.Irc.Commands.drop_command_by_module(self.module_name)
return None
except UnboundLocalError as ne:
self.Logs.error(f'{ne}')

View File

@@ -1,9 +1,10 @@
{
"version": "6.2.2",
"version": "6.2.5",
"requests": "2.32.3",
"psutil": "6.0.0",
"unrealircd_rpc_py": "2.0.5",
"sqlalchemy": "2.0.35",
"faker": "30.1.0"
"faker": "30.1.0",
"pyyaml": "6.0.2"
}