Code refactoring on system modules.

This commit is contained in:
adator
2025-11-23 18:19:29 +01:00
parent 226340e1aa
commit 4c93f85008
6 changed files with 81 additions and 74 deletions

View File

@@ -14,12 +14,7 @@ class Admin:
Args: Args:
loader (Loader): The Loader Instance. loader (Loader): The Loader Instance.
""" """
self.Logs = loader.Logs self._ctx = loader
self.Base = loader.Base
self.Setting = loader.Settings
self.Config = loader.Config
self.User = loader.User
self.Definition = loader.Definition
def insert(self, new_admin: MAdmin) -> bool: def insert(self, new_admin: MAdmin) -> bool:
"""Insert a new admin object model """Insert a new admin object model
@@ -33,11 +28,11 @@ class Admin:
for record in self.UID_ADMIN_DB: for record in self.UID_ADMIN_DB:
if record.uid == new_admin.uid: if record.uid == new_admin.uid:
self.Logs.debug(f'{record.uid} already exist') self._ctx.Logs.debug(f'{record.uid} already exist')
return False return False
self.UID_ADMIN_DB.append(new_admin) self.UID_ADMIN_DB.append(new_admin)
self.Logs.debug(f'A new admin ({new_admin.nickname}) has been created') self._ctx.Logs.debug(f'A new admin ({new_admin.nickname}) has been created')
return True return True
def update_nickname(self, uid: str, new_admin_nickname: str) -> bool: def update_nickname(self, uid: str, new_admin_nickname: str) -> bool:
@@ -55,11 +50,11 @@ class Admin:
if record.uid == uid: if record.uid == uid:
# If the admin exist, update and do not go further # If the admin exist, update and do not go further
record.nickname = new_admin_nickname record.nickname = new_admin_nickname
self.Logs.debug(f'UID ({record.uid}) has been updated with new nickname {new_admin_nickname}') self._ctx.Logs.debug(f'UID ({record.uid}) has been updated with new nickname {new_admin_nickname}')
return True return True
self.Logs.debug(f'The new nickname {new_admin_nickname} was not updated, uid = {uid} - The Client is not an admin') self._ctx.Logs.debug(f'The new nickname {new_admin_nickname} was not updated, uid = {uid} - The Client is not an admin')
return False return False
def update_level(self, nickname: str, new_admin_level: int) -> bool: def update_level(self, nickname: str, new_admin_level: int) -> bool:
@@ -77,10 +72,10 @@ class Admin:
if record.nickname == nickname: if record.nickname == nickname:
# If the admin exist, update and do not go further # If the admin exist, update and do not go further
record.level = new_admin_level record.level = new_admin_level
self.Logs.debug(f'Admin ({record.nickname}) has been updated with new level {new_admin_level}') self._ctx.Logs.debug(f'Admin ({record.nickname}) has been updated with new level {new_admin_level}')
return True return True
self.Logs.debug(f'The new level {new_admin_level} was not updated, nickname = {nickname} - The Client is not an admin') self._ctx.Logs.debug(f'The new level {new_admin_level} was not updated, nickname = {nickname} - The Client is not an admin')
return False return False
@@ -96,10 +91,10 @@ class Admin:
admin_obj = self.get_admin(uidornickname) admin_obj = self.get_admin(uidornickname)
if admin_obj: if admin_obj:
self.UID_ADMIN_DB.remove(admin_obj) self.UID_ADMIN_DB.remove(admin_obj)
self.Logs.debug(f'UID ({admin_obj.uid}) has been deleted') self._ctx.Logs.debug(f'UID ({admin_obj.uid}) has been deleted')
return True return True
self.Logs.debug(f'The UID {uidornickname} was not deleted') self._ctx.Logs.debug(f'The UID {uidornickname} was not deleted')
return False return False
@@ -186,20 +181,20 @@ class Admin:
if fp is None: if fp is None:
return False return False
query = f"SELECT user, level, language FROM {self.Config.TABLE_ADMIN} WHERE fingerprint = :fp" query = f"SELECT user, level, language FROM {self._ctx.Config.TABLE_ADMIN} WHERE fingerprint = :fp"
data = {'fp': fp} data = {'fp': fp}
exe = await self.Base.db_execute_query(query, data) exe = await self._ctx.Base.db_execute_query(query, data)
result = exe.fetchone() result = exe.fetchone()
if result: if result:
account = result[0] account = result[0]
level = result[1] level = result[1]
language = result[2] language = result[2]
user_obj = self.User.get_user(uidornickname) user_obj = self._ctx.User.get_user(uidornickname)
if user_obj: if user_obj:
admin_obj = self.Definition.MAdmin(**user_obj.to_dict(), account=account, level=level, language=language) admin_obj = self._ctx.Definition.MAdmin(**user_obj.to_dict(), account=account, level=level, language=language)
if self.insert(admin_obj): if self.insert(admin_obj):
self.Setting.current_admin = admin_obj self._ctx.Settings.current_admin = admin_obj
self.Logs.debug(f"[Fingerprint login] {user_obj.nickname} ({admin_obj.account}) has been logged in successfully!") self._ctx.Logs.debug(f"[Fingerprint login] {user_obj.nickname} ({admin_obj.account}) has been logged in successfully!")
return True return True
return False return False
@@ -215,8 +210,8 @@ class Admin:
""" """
mes_donnees = {'admin': admin_nickname} mes_donnees = {'admin': admin_nickname}
query_search_user = f"SELECT id FROM {self.Config.TABLE_ADMIN} WHERE user = :admin" query_search_user = f"SELECT id FROM {self._ctx.Config.TABLE_ADMIN} WHERE user = :admin"
r = await self.Base.db_execute_query(query_search_user, mes_donnees) r = await self._ctx.Base.db_execute_query(query_search_user, mes_donnees)
exist_user = r.fetchone() exist_user = r.fetchone()
if exist_user: if exist_user:
return True return True

View File

@@ -15,8 +15,7 @@ class Client:
Args: Args:
loader (Loader): The Loader instance. loader (Loader): The Loader instance.
""" """
self.Logs = loader.Logs self._ctx = loader
self.Base = loader.Base
def insert(self, new_client: 'MClient') -> bool: def insert(self, new_client: 'MClient') -> bool:
"""Insert a new User object """Insert a new User object
@@ -28,7 +27,7 @@ class Client:
bool: True if inserted bool: True if inserted
""" """
client_obj = self.get_Client(new_client.uid) client_obj = self.get_client(new_client.uid)
if not client_obj is None: if not client_obj is None:
# User already created return False # User already created return False
@@ -48,7 +47,7 @@ class Client:
Returns: Returns:
bool: True if updated bool: True if updated
""" """
user_obj = self.get_Client(uidornickname=uid) user_obj = self.get_client(uidornickname=uid)
if user_obj is None: if user_obj is None:
return False return False
@@ -68,7 +67,7 @@ class Client:
bool: True if user mode has been updaed bool: True if user mode has been updaed
""" """
response = True response = True
user_obj = self.get_Client(uidornickname=uidornickname) user_obj = self.get_client(uidornickname=uidornickname)
if user_obj is None: if user_obj is None:
return False return False
@@ -93,7 +92,7 @@ class Client:
return False return False
liste_umodes = list(umodes) liste_umodes = list(umodes)
final_umodes_liste = [x for x in self.Base.Settings.PROTOCTL_USER_MODES if x in liste_umodes] final_umodes_liste = [x for x in self._ctx.Base.Settings.PROTOCTL_USER_MODES if x in liste_umodes]
final_umodes = ''.join(final_umodes_liste) final_umodes = ''.join(final_umodes_liste)
user_obj.umodes = f"+{final_umodes}" user_obj.umodes = f"+{final_umodes}"
@@ -110,7 +109,7 @@ class Client:
bool: True if deleted bool: True if deleted
""" """
user_obj = self.get_Client(uidornickname=uid) user_obj = self.get_client(uidornickname=uid)
if user_obj is None: if user_obj is None:
return False return False
@@ -119,7 +118,7 @@ class Client:
return True return True
def get_Client(self, uidornickname: str) -> Optional['MClient']: def get_client(self, uidornickname: str) -> Optional['MClient']:
"""Get The Client Object model """Get The Client Object model
Args: Args:
@@ -146,7 +145,7 @@ class Client:
str|None: Return the UID str|None: Return the UID
""" """
client_obj = self.get_Client(uidornickname=uidornickname) client_obj = self.get_client(uidornickname=uidornickname)
if client_obj is None: if client_obj is None:
return None return None
@@ -162,29 +161,13 @@ class Client:
Returns: Returns:
str|None: the nickname str|None: the nickname
""" """
client_obj = self.get_Client(uidornickname=uidornickname) client_obj = self.get_client(uidornickname=uidornickname)
if client_obj is None: if client_obj is None:
return None return None
return client_obj.nickname return client_obj.nickname
def get_client_asdict(self, uidornickname: str) -> Optional[dict[str, Any]]:
"""Transform User Object to a dictionary
Args:
uidornickname (str): The UID or The nickname
Returns:
Union[dict[str, any], None]: User Object as a dictionary or None
"""
client_obj = self.get_Client(uidornickname=uidornickname)
if client_obj is None:
return None
return client_obj.to_dict()
def is_exist(self, uidornickname: str) -> bool: def is_exist(self, uidornickname: str) -> bool:
"""Check if the UID or the nickname exist in the USER DB """Check if the UID or the nickname exist in the USER DB
@@ -194,7 +177,7 @@ class Client:
Returns: Returns:
bool: True if exist bool: True if exist
""" """
user_obj = self.get_Client(uidornickname=uidornickname) user_obj = self.get_client(uidornickname=uidornickname)
if user_obj is None: if user_obj is None:
return False return False
@@ -211,15 +194,15 @@ class Client:
bool: True if exist bool: True if exist
""" """
table_client = self.Base.Config.TABLE_CLIENT table_client = self._ctx.Base.Config.TABLE_CLIENT
account_to_check = {'account': account.lower()} account_to_check = {'account': account.lower()}
account_to_check_query = await self.Base.db_execute_query(f""" account_to_check_query = await self._ctx.Base.db_execute_query(f"""
SELECT id FROM {table_client} WHERE LOWER(account) = :account SELECT id FROM {table_client} WHERE LOWER(account) = :account
""", account_to_check) """, account_to_check)
account_to_check_result = account_to_check_query.fetchone() account_to_check_result = account_to_check_query.fetchone()
if account_to_check_result: if account_to_check_result:
self.Logs.error(f"Account ({account}) already exist") self._ctx.Logs.error(f"Account ({account}) already exist")
return True return True
return False return False

View File

@@ -13,11 +13,21 @@ class Command:
Args: Args:
loader (Loader): The Loader instance. loader (Loader): The Loader instance.
""" """
self.Loader = loader self._ctx = loader
self.Base = loader.Base
self.Logs = loader.Logs
def build(self, new_command_obj: MCommand) -> bool: def build_command(self, level: int, module_name: str, command_name: str, command_description: str) -> bool:
"""This method build the commands variable
Args:
level (int): The Level of the command
module_name (str): The module name
command_name (str): The command name
command_description (str): The description of the command
"""
# Build Model.
return self._build(self._ctx.Definition.MCommand(module_name, command_name, command_description, level))
def _build(self, new_command_obj: MCommand) -> bool:
command = self.get_command(new_command_obj.command_name, new_command_obj.module_name) command = self.get_command(new_command_obj.command_name, new_command_obj.module_name)
if command is None: if command is None:
@@ -68,7 +78,7 @@ class Command:
for c in tmp_model: for c in tmp_model:
self.DB_COMMANDS.remove(c) self.DB_COMMANDS.remove(c)
self.Logs.debug(f"[COMMAND] Drop command for module {module_name}") self._ctx.Logs.debug(f"[COMMAND] Drop command for module {module_name}")
return True return True
def get_ordered_commands(self) -> list[MCommand]: def get_ordered_commands(self) -> list[MCommand]:
@@ -86,7 +96,7 @@ class Command:
return new_list return new_list
def is_client_allowed_to_run_command(self, nickname: str, command_name: str) -> bool: def is_client_allowed_to_run_command(self, nickname: str, command_name: str) -> bool:
admin = self.Loader.Admin.get_admin(nickname) admin = self._ctx.Admin.get_admin(nickname)
admin_level = admin.level if admin else 0 admin_level = admin.level if admin else 0
commands = self.get_commands_by_level(admin_level) commands = self.get_commands_by_level(admin_level)

View File

@@ -14,9 +14,7 @@ class Reputation:
Args: Args:
loader (Loader): The Loader instance. loader (Loader): The Loader instance.
""" """
self._ctx = loader
self.Logs = loader.Logs
self.MReputation: Optional[MReputation] = None
def insert(self, new_reputation_user: MReputation) -> bool: def insert(self, new_reputation_user: MReputation) -> bool:
"""Insert a new Reputation User object """Insert a new Reputation User object
@@ -34,16 +32,16 @@ class Reputation:
if record.uid == new_reputation_user.uid: if record.uid == new_reputation_user.uid:
# If the user exist then return False and do not go further # If the user exist then return False and do not go further
exist = True exist = True
self.Logs.debug(f'{record.uid} already exist') self._ctx.Logs.debug(f'{record.uid} already exist')
return result return result
if not exist: if not exist:
self.UID_REPUTATION_DB.append(new_reputation_user) self.UID_REPUTATION_DB.append(new_reputation_user)
result = True result = True
self.Logs.debug(f'New Reputation User Captured: ({new_reputation_user})') self._ctx.Logs.debug(f'New Reputation User Captured: ({new_reputation_user})')
if not result: if not result:
self.Logs.critical(f'The Reputation User Object was not inserted {new_reputation_user}') self._ctx.Logs.critical(f'The Reputation User Object was not inserted {new_reputation_user}')
return result return result
@@ -86,11 +84,11 @@ class Reputation:
# If the user exist then remove and return True and do not go further # If the user exist then remove and return True and do not go further
self.UID_REPUTATION_DB.remove(record) self.UID_REPUTATION_DB.remove(record)
result = True result = True
self.Logs.debug(f'UID ({record.uid}) has been deleted') self._ctx.Logs.debug(f'UID ({record.uid}) has been deleted')
return result return result
if not result: if not result:
self.Logs.critical(f'The UID {uid} was not deleted') self._ctx.Logs.critical(f'The UID {uid} was not deleted')
return result return result

View File

@@ -11,14 +11,16 @@ class User:
UID_DB: list['MUser'] = [] UID_DB: list['MUser'] = []
@property @property
def get_current_user(self) -> 'MUser': def current_user(self) -> 'MUser':
return self.current_user return self._current_user
@current_user.setter
def current_user(self, muser: 'MUser') -> None:
self._current_user = muser
def __init__(self, loader: 'Loader'): def __init__(self, loader: 'Loader'):
self._ctx = loader
self.Logs = loader.Logs self._current_user: Optional['MUser'] = None
self.Base = loader.Base
self.current_user: Optional['MUser'] = None
def insert(self, new_user: 'MUser') -> bool: def insert(self, new_user: 'MUser') -> bool:
"""Insert a new User object """Insert a new User object
@@ -55,7 +57,7 @@ class User:
return False return False
user_obj.nickname = new_nickname user_obj.nickname = new_nickname
self.Logs.debug(f"UID ({uid}) has benn update with new nickname ({new_nickname}).") self._ctx.Logs.debug(f"UID ({uid}) has benn update with new nickname ({new_nickname}).")
return True return True
def update_mode(self, uidornickname: str, modes: str) -> bool: def update_mode(self, uidornickname: str, modes: str) -> bool:
@@ -94,7 +96,7 @@ class User:
return False return False
liste_umodes = list(umodes) liste_umodes = list(umodes)
final_umodes_liste = [x for x in self.Base.Settings.PROTOCTL_USER_MODES if x in liste_umodes] final_umodes_liste = [x for x in self._ctx.Base.Settings.PROTOCTL_USER_MODES if x in liste_umodes]
final_umodes = ''.join(final_umodes_liste) final_umodes = ''.join(final_umodes_liste)
user_obj.umodes = f"+{final_umodes}" user_obj.umodes = f"+{final_umodes}"

View File

@@ -1,3 +1,7 @@
import asyncio
import concurrent
import concurrent.futures
import threading
from datetime import datetime from datetime import datetime
from json import dumps from json import dumps
from dataclasses import dataclass, field, asdict, fields, replace from dataclasses import dataclass, field, asdict, fields, replace
@@ -210,6 +214,12 @@ class MConfig(MainModel):
PASSWORD: str = "password" PASSWORD: str = "password"
"""The password of the admin of the service""" """The password of the admin of the service"""
RPC_HOST: str = "127.0.0.1"
"""The host to bind. Default: 127.0.0.1"""
RPC_PORT: int = 5000
"""The port of the defender json rpc. Default: 5000"""
RPC_USERS: list[dict] = field(default_factory=list) RPC_USERS: list[dict] = field(default_factory=list)
"""The Defender rpc users""" """The Defender rpc users"""
@@ -344,6 +354,15 @@ class MConfig(MainModel):
self.SERVEUR_CHARSET: list = ["utf-8", "iso-8859-1"] self.SERVEUR_CHARSET: list = ["utf-8", "iso-8859-1"]
"""0: utf-8 | 1: iso-8859-1""" """0: utf-8 | 1: iso-8859-1"""
@dataclass
class MThread(MainModel):
name: str
thread_id: Optional[int]
thread_event: Optional[threading.Event]
thread_obj: threading.Thread
executor: concurrent.futures.ThreadPoolExecutor
future: asyncio.Future
@dataclass @dataclass
class MCommand(MainModel): class MCommand(MainModel):
module_name: str = None module_name: str = None