mirror of
https://github.com/iio612/DEFENDER.git
synced 2026-02-13 19:24:23 +00:00
Refactoring the code, create new folder modules.
This commit is contained in:
227
core/classes/modules/admin.py
Normal file
227
core/classes/modules/admin.py
Normal file
@@ -0,0 +1,227 @@
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
from core.definition import MAdmin
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.loader import Loader
|
||||
|
||||
class Admin:
|
||||
|
||||
UID_ADMIN_DB: list[MAdmin] = []
|
||||
|
||||
def __init__(self, loader: 'Loader') -> None:
|
||||
"""
|
||||
|
||||
Args:
|
||||
loader (Loader): The Loader Instance.
|
||||
"""
|
||||
self.Logs = loader.Logs
|
||||
self.Base = loader.Base
|
||||
self.Setting = loader.Settings
|
||||
self.Config = loader.Config
|
||||
self.User = loader.User
|
||||
self.Definition = loader.Definition
|
||||
|
||||
def insert(self, new_admin: MAdmin) -> bool:
|
||||
"""Insert a new admin object model
|
||||
|
||||
Args:
|
||||
new_admin (MAdmin): The new admin object model to insert
|
||||
|
||||
Returns:
|
||||
bool: True if it was inserted
|
||||
"""
|
||||
|
||||
for record in self.UID_ADMIN_DB:
|
||||
if record.uid == new_admin.uid:
|
||||
self.Logs.debug(f'{record.uid} already exist')
|
||||
return False
|
||||
|
||||
self.UID_ADMIN_DB.append(new_admin)
|
||||
self.Logs.debug(f'A new admin ({new_admin.nickname}) has been created')
|
||||
return True
|
||||
|
||||
def update_nickname(self, uid: str, new_admin_nickname: str) -> bool:
|
||||
"""Update nickname of an admin
|
||||
|
||||
Args:
|
||||
uid (str): The Admin UID
|
||||
new_admin_nickname (str): The new nickname of the admin
|
||||
|
||||
Returns:
|
||||
bool: True if the nickname has been updated.
|
||||
"""
|
||||
|
||||
for record in self.UID_ADMIN_DB:
|
||||
if record.uid == uid:
|
||||
# If the admin exist, update and do not go further
|
||||
record.nickname = new_admin_nickname
|
||||
self.Logs.debug(f'UID ({record.uid}) has been updated with new nickname {new_admin_nickname}')
|
||||
return True
|
||||
|
||||
|
||||
self.Logs.debug(f'The new nickname {new_admin_nickname} was not updated, uid = {uid} - The Client is not an admin')
|
||||
return False
|
||||
|
||||
def update_level(self, nickname: str, new_admin_level: int) -> bool:
|
||||
"""Update the admin level
|
||||
|
||||
Args:
|
||||
nickname (str): The admin nickname
|
||||
new_admin_level (int): The new level of the admin
|
||||
|
||||
Returns:
|
||||
bool: True if the admin level has been updated
|
||||
"""
|
||||
|
||||
for record in self.UID_ADMIN_DB:
|
||||
if record.nickname == nickname:
|
||||
# If the admin exist, update and do not go further
|
||||
record.level = new_admin_level
|
||||
self.Logs.debug(f'Admin ({record.nickname}) has been updated with new level {new_admin_level}')
|
||||
return True
|
||||
|
||||
self.Logs.debug(f'The new level {new_admin_level} was not updated, nickname = {nickname} - The Client is not an admin')
|
||||
|
||||
return False
|
||||
|
||||
def delete(self, uidornickname: str) -> bool:
|
||||
"""Delete admin
|
||||
|
||||
Args:
|
||||
uidornickname (str): The UID or nickname of the admin
|
||||
|
||||
Returns:
|
||||
bool: True if the admin has been deleted
|
||||
"""
|
||||
|
||||
for record in self.UID_ADMIN_DB:
|
||||
if record.uid == uidornickname:
|
||||
# If the admin exist, delete and do not go further
|
||||
self.UID_ADMIN_DB.remove(record)
|
||||
self.Logs.debug(f'UID ({record.uid}) has been deleted')
|
||||
return True
|
||||
if record.nickname.lower() == uidornickname.lower():
|
||||
# If the admin exist, delete and do not go further
|
||||
self.UID_ADMIN_DB.remove(record)
|
||||
self.Logs.debug(f'nickname ({record.nickname}) has been deleted')
|
||||
return True
|
||||
|
||||
self.Logs.debug(f'The UID {uidornickname} was not deleted')
|
||||
|
||||
return False
|
||||
|
||||
def get_admin(self, uidornickname: str) -> Optional[MAdmin]:
|
||||
"""Get the admin object model
|
||||
|
||||
Args:
|
||||
uidornickname (str): UID or Nickname of the admin
|
||||
|
||||
Returns:
|
||||
Optional[MAdmin]: The MAdmin object model if exist
|
||||
"""
|
||||
|
||||
for record in self.UID_ADMIN_DB:
|
||||
if record.uid == uidornickname:
|
||||
return record
|
||||
elif record.nickname.lower() == uidornickname.lower():
|
||||
return record
|
||||
|
||||
return None
|
||||
|
||||
def get_uid(self, uidornickname:str) -> Optional[str]:
|
||||
"""Get the UID of the admin
|
||||
|
||||
Args:
|
||||
uidornickname (str): The UID or nickname of the admin
|
||||
|
||||
Returns:
|
||||
Optional[str]: The UID of the admin
|
||||
"""
|
||||
|
||||
for record in self.UID_ADMIN_DB:
|
||||
if record.uid == uidornickname:
|
||||
return record.uid
|
||||
if record.nickname.lower() == uidornickname.lower():
|
||||
return record.uid
|
||||
|
||||
return None
|
||||
|
||||
def get_nickname(self, uidornickname:str) -> Optional[str]:
|
||||
"""Get the nickname of the admin
|
||||
|
||||
Args:
|
||||
uidornickname (str): The UID or the nickname of the admin
|
||||
|
||||
Returns:
|
||||
Optional[str]: The nickname of the admin
|
||||
"""
|
||||
|
||||
for record in self.UID_ADMIN_DB:
|
||||
if record.nickname.lower() == uidornickname.lower():
|
||||
return record.nickname
|
||||
if record.uid == uidornickname:
|
||||
return record.nickname
|
||||
|
||||
return None
|
||||
|
||||
def get_language(self, uidornickname: str) -> Optional[str]:
|
||||
"""Get the language of the admin
|
||||
|
||||
Args:
|
||||
uidornickname (str): The user ID or the Nickname of the admin
|
||||
|
||||
Returns:
|
||||
Optional[str]: The language selected by the admin.
|
||||
"""
|
||||
admin = self.get_admin(uidornickname)
|
||||
|
||||
if admin is None:
|
||||
return None
|
||||
|
||||
return admin.language
|
||||
|
||||
def db_auth_admin_via_fingerprint(self, fp: str, uidornickname: str) -> bool:
|
||||
"""Check the fingerprint
|
||||
|
||||
Args:
|
||||
fp (str): The unique fingerprint of the user
|
||||
uidornickname (str): The UID or the Nickname of the user
|
||||
|
||||
Returns:
|
||||
bool: True if found
|
||||
"""
|
||||
query = f"SELECT user, level, language FROM {self.Config.TABLE_ADMIN} WHERE fingerprint = :fp"
|
||||
data = {'fp': fp}
|
||||
exe = self.Base.db_execute_query(query, data)
|
||||
result = exe.fetchone()
|
||||
if result:
|
||||
account = result[0]
|
||||
level = result[1]
|
||||
language = result[2]
|
||||
user_obj = self.User.get_user(uidornickname)
|
||||
if user_obj:
|
||||
admin_obj = self.Definition.MAdmin(**user_obj.to_dict(),account=account, level=level, language=language)
|
||||
if self.insert(admin_obj):
|
||||
self.Setting.current_admin = admin_obj
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def db_is_admin_exist(self, admin_nickname: str) -> bool:
|
||||
"""Verify if the admin exist in the database!
|
||||
|
||||
Args:
|
||||
admin_nickname (str): The nickname admin to check.
|
||||
|
||||
Returns:
|
||||
bool: True if the admin exist otherwise False.
|
||||
"""
|
||||
|
||||
mes_donnees = {'admin': admin_nickname}
|
||||
query_search_user = f"SELECT id FROM {self.Config.TABLE_ADMIN} WHERE user = :admin"
|
||||
r = self.Base.db_execute_query(query_search_user, mes_donnees)
|
||||
exist_user = r.fetchone()
|
||||
if exist_user:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
294
core/classes/modules/channel.py
Normal file
294
core/classes/modules/channel.py
Normal file
@@ -0,0 +1,294 @@
|
||||
from re import findall
|
||||
from typing import Any, Optional, Literal, TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.definition import MChannel
|
||||
from core.loader import Loader
|
||||
|
||||
class Channel:
|
||||
|
||||
UID_CHANNEL_DB: list['MChannel'] = []
|
||||
"""List that contains all the Channels objects (ChannelModel)
|
||||
"""
|
||||
|
||||
def __init__(self, loader: 'Loader'):
|
||||
"""
|
||||
|
||||
Args:
|
||||
loader (Loader): The Loader Instance
|
||||
"""
|
||||
self.Logs = loader.Logs
|
||||
self.Base = loader.Base
|
||||
self.Utils = loader.Utils
|
||||
|
||||
def insert(self, new_channel: 'MChannel') -> bool:
|
||||
"""This method will insert a new channel and if the channel exist it will update the user list (uids)
|
||||
|
||||
Args:
|
||||
new_channel (MChannel): The channel model object
|
||||
|
||||
Returns:
|
||||
bool: True if new channel, False if channel exist (However UID could be updated)
|
||||
"""
|
||||
result = False
|
||||
exist = False
|
||||
|
||||
if not self.is_valid_channel(new_channel.name):
|
||||
self.Logs.error(f"The channel {new_channel.name} is not valid, channel must start with #")
|
||||
return False
|
||||
|
||||
for record in self.UID_CHANNEL_DB:
|
||||
if record.name.lower() == new_channel.name.lower():
|
||||
# If the channel exist, update the user list and do not go further
|
||||
exist = True
|
||||
# self.Logs.debug(f'{record.name} already exist')
|
||||
|
||||
for user in new_channel.uids:
|
||||
record.uids.append(user)
|
||||
|
||||
# Supprimer les doublons
|
||||
del_duplicates = list(set(record.uids))
|
||||
record.uids = del_duplicates
|
||||
# self.Logs.debug(f'Updating a new UID to the channel {record}')
|
||||
return result
|
||||
|
||||
if not exist:
|
||||
# If the channel don't exist, then create it
|
||||
new_channel.name = new_channel.name.lower()
|
||||
self.UID_CHANNEL_DB.append(new_channel)
|
||||
result = True
|
||||
# self.Logs.debug(f'New Channel Created: ({new_channel})')
|
||||
|
||||
if not result:
|
||||
self.Logs.critical(f'The Channel Object was not inserted {new_channel}')
|
||||
|
||||
self.clean_channel()
|
||||
|
||||
return result
|
||||
|
||||
def delete(self, channel_name: str) -> bool:
|
||||
"""Delete channel from the UID_CHANNEL_DB
|
||||
|
||||
Args:
|
||||
channel_name (str): The Channel name
|
||||
|
||||
Returns:
|
||||
bool: True if it was deleted
|
||||
"""
|
||||
|
||||
chan_obj = self.get_channel(channel_name)
|
||||
|
||||
if chan_obj is None:
|
||||
return False
|
||||
|
||||
self.UID_CHANNEL_DB.remove(chan_obj)
|
||||
|
||||
return True
|
||||
|
||||
def delete_user_from_channel(self, channel_name: str, uid: str) -> bool:
|
||||
"""Delete a user from a channel
|
||||
|
||||
Args:
|
||||
channel_name (str): The channel name
|
||||
uid (str): The Client UID
|
||||
|
||||
Returns:
|
||||
bool: True if the client has been deleted from the channel
|
||||
"""
|
||||
try:
|
||||
result = False
|
||||
chan_obj = self.get_channel(channel_name)
|
||||
|
||||
if chan_obj is None:
|
||||
return result
|
||||
|
||||
for userid in chan_obj.uids:
|
||||
if self.Utils.clean_uid(userid) == self.Utils.clean_uid(uid):
|
||||
chan_obj.uids.remove(userid)
|
||||
result = True
|
||||
|
||||
self.clean_channel()
|
||||
|
||||
return result
|
||||
except ValueError as ve:
|
||||
self.Logs.error(f'{ve}')
|
||||
return False
|
||||
|
||||
def delete_user_from_all_channel(self, uid:str) -> bool:
|
||||
"""Delete a client from all channels
|
||||
|
||||
Args:
|
||||
uid (str): The client UID
|
||||
|
||||
Returns:
|
||||
bool: True if the client has been deleted from all channels
|
||||
"""
|
||||
try:
|
||||
result = False
|
||||
|
||||
for record in self.UID_CHANNEL_DB:
|
||||
for user_id in record.uids:
|
||||
if self.Utils.clean_uid(user_id) == self.Utils.clean_uid(uid):
|
||||
record.uids.remove(user_id)
|
||||
result = True
|
||||
|
||||
self.clean_channel()
|
||||
|
||||
return result
|
||||
except ValueError as ve:
|
||||
self.Logs.error(f'{ve}')
|
||||
return False
|
||||
|
||||
def add_user_to_a_channel(self, channel_name: str, uid: str) -> bool:
|
||||
"""Add a client to a channel
|
||||
|
||||
Args:
|
||||
channel_name (str): The channel name
|
||||
uid (str): The client UID
|
||||
|
||||
Returns:
|
||||
bool: True is the clien has been added
|
||||
"""
|
||||
try:
|
||||
chan_obj = self.get_channel(channel_name)
|
||||
|
||||
if chan_obj is None:
|
||||
# Create a new channel if the channel don't exist
|
||||
self.Logs.debug(f"New channel will be created ({channel_name} - {uid})")
|
||||
return self.insert(MChannel(channel_name, uids=[uid]))
|
||||
|
||||
chan_obj.uids.append(uid)
|
||||
del_duplicates = list(set(chan_obj.uids))
|
||||
chan_obj.uids = del_duplicates
|
||||
|
||||
return True
|
||||
except Exception as err:
|
||||
self.Logs.error(f'{err}')
|
||||
return False
|
||||
|
||||
def is_user_present_in_channel(self, channel_name: str, uid: str) -> bool:
|
||||
"""Check if a user is present in the channel
|
||||
|
||||
Args:
|
||||
channel_name (str): The channel name to check
|
||||
uid (str): The client UID
|
||||
|
||||
Returns:
|
||||
bool: True if the user is present in the channel
|
||||
"""
|
||||
chan = self.get_channel(channel_name=channel_name)
|
||||
if chan is None:
|
||||
return False
|
||||
|
||||
clean_uid = self.Utils.clean_uid(uid=uid)
|
||||
for chan_uid in chan.uids:
|
||||
if self.Utils.clean_uid(chan_uid) == clean_uid:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def clean_channel(self) -> None:
|
||||
"""If channel doesn't contain any client this method will remove the channel
|
||||
"""
|
||||
try:
|
||||
for record in self.UID_CHANNEL_DB:
|
||||
if not record.uids:
|
||||
self.UID_CHANNEL_DB.remove(record)
|
||||
|
||||
return None
|
||||
except Exception as err:
|
||||
self.Logs.error(f'{err}')
|
||||
|
||||
def get_channel(self, channel_name: str) -> Optional['MChannel']:
|
||||
"""Get the channel object
|
||||
|
||||
Args:
|
||||
channel_name (str): The Channel name
|
||||
|
||||
Returns:
|
||||
MChannel: The channel object model if exist else None
|
||||
"""
|
||||
|
||||
for record in self.UID_CHANNEL_DB:
|
||||
if record.name.lower() == channel_name.lower():
|
||||
return record
|
||||
|
||||
return None
|
||||
|
||||
def is_valid_channel(self, channel_to_check: str) -> bool:
|
||||
"""Check if the string has the # caractere and return True if this is a valid channel
|
||||
|
||||
Args:
|
||||
channel_to_check (str): The string to test if it is a channel or not
|
||||
|
||||
Returns:
|
||||
bool: True if the string is a channel / False if this is not a channel
|
||||
"""
|
||||
try:
|
||||
|
||||
if channel_to_check is None:
|
||||
return False
|
||||
|
||||
pattern = fr'^#'
|
||||
is_channel = findall(pattern, channel_to_check)
|
||||
|
||||
if not is_channel:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
except TypeError as te:
|
||||
self.Logs.error(f'TypeError: [{channel_to_check}] - {te}')
|
||||
return False
|
||||
except Exception as err:
|
||||
self.Logs.error(f'Error Not defined: {err}')
|
||||
return False
|
||||
|
||||
def db_query_channel(self, action: Literal['add','del'], module_name: str, channel_name: str) -> bool:
|
||||
"""You can add a channel or delete a channel.
|
||||
|
||||
Args:
|
||||
action (Literal['add','del']): Action on the database
|
||||
module_name (str): The module name (mod_test)
|
||||
channel_name (str): The channel name (With #)
|
||||
|
||||
Returns:
|
||||
bool: True if action done
|
||||
"""
|
||||
try:
|
||||
channel_name = channel_name.lower() if self.is_valid_channel(channel_name) else None
|
||||
core_table = self.Base.Config.TABLE_CHANNEL
|
||||
|
||||
if not channel_name:
|
||||
self.Logs.warning(f'The channel [{channel_name}] is not correct')
|
||||
return False
|
||||
|
||||
match action:
|
||||
|
||||
case 'add':
|
||||
mes_donnees = {'module_name': module_name, 'channel_name': channel_name}
|
||||
response = self.Base.db_execute_query(f"SELECT id FROM {core_table} WHERE module_name = :module_name AND channel_name = :channel_name", mes_donnees)
|
||||
is_channel_exist = response.fetchone()
|
||||
|
||||
if is_channel_exist is None:
|
||||
mes_donnees = {'datetime': self.Utils.get_sdatetime(), 'channel_name': channel_name, 'module_name': module_name}
|
||||
insert = self.Base.db_execute_query(f"INSERT INTO {core_table} (datetime, channel_name, module_name) VALUES (:datetime, :channel_name, :module_name)", mes_donnees)
|
||||
if insert.rowcount:
|
||||
self.Logs.debug(f'Channel added to DB: channel={channel_name} / module_name={module_name}')
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
case 'del':
|
||||
mes_donnes = {'channel_name': channel_name, 'module_name': module_name}
|
||||
response = self.Base.db_execute_query(f"DELETE FROM {core_table} WHERE channel_name = :channel_name AND module_name = :module_name", mes_donnes)
|
||||
|
||||
if response.rowcount > 0:
|
||||
self.Logs.debug(f'Channel deleted from DB: channel={channel_name} / module: {module_name}')
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
except Exception as err:
|
||||
self.Logs.error(err)
|
||||
return False
|
||||
243
core/classes/modules/client.py
Normal file
243
core/classes/modules/client.py
Normal file
@@ -0,0 +1,243 @@
|
||||
from re import sub
|
||||
from typing import Any, Optional, Union, TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.loader import Loader
|
||||
from core.definition import MClient
|
||||
|
||||
class Client:
|
||||
|
||||
CLIENT_DB: list['MClient'] = []
|
||||
|
||||
def __init__(self, loader: 'Loader'):
|
||||
"""
|
||||
|
||||
Args:
|
||||
loader (Loader): The Loader instance.
|
||||
"""
|
||||
self.Logs = loader.Logs
|
||||
self.Base = loader.Base
|
||||
|
||||
def insert(self, new_client: 'MClient') -> bool:
|
||||
"""Insert a new User object
|
||||
|
||||
Args:
|
||||
new_client (MClient): New Client object
|
||||
|
||||
Returns:
|
||||
bool: True if inserted
|
||||
"""
|
||||
|
||||
client_obj = self.get_Client(new_client.uid)
|
||||
|
||||
if not client_obj is None:
|
||||
# User already created return False
|
||||
return False
|
||||
|
||||
self.CLIENT_DB.append(new_client)
|
||||
|
||||
return True
|
||||
|
||||
def update_nickname(self, uid: str, new_nickname: str) -> bool:
|
||||
"""Update the nickname starting from the UID
|
||||
|
||||
Args:
|
||||
uid (str): UID of the user
|
||||
new_nickname (str): New nickname
|
||||
|
||||
Returns:
|
||||
bool: True if updated
|
||||
"""
|
||||
user_obj = self.get_Client(uidornickname=uid)
|
||||
|
||||
if user_obj is None:
|
||||
return False
|
||||
|
||||
user_obj.nickname = new_nickname
|
||||
|
||||
return True
|
||||
|
||||
def update_mode(self, uidornickname: str, modes: str) -> bool:
|
||||
"""Updating user mode
|
||||
|
||||
Args:
|
||||
uidornickname (str): The UID or Nickname of the user
|
||||
modes (str): new modes to update
|
||||
|
||||
Returns:
|
||||
bool: True if user mode has been updaed
|
||||
"""
|
||||
response = True
|
||||
user_obj = self.get_Client(uidornickname=uidornickname)
|
||||
|
||||
if user_obj is None:
|
||||
return False
|
||||
|
||||
action = modes[0]
|
||||
new_modes = modes[1:]
|
||||
|
||||
existing_umodes = user_obj.umodes
|
||||
umodes = user_obj.umodes
|
||||
|
||||
if action == '+':
|
||||
|
||||
for nm in new_modes:
|
||||
if nm not in existing_umodes:
|
||||
umodes += nm
|
||||
|
||||
elif action == '-':
|
||||
for nm in new_modes:
|
||||
if nm in existing_umodes:
|
||||
umodes = umodes.replace(nm, '')
|
||||
else:
|
||||
return False
|
||||
|
||||
liste_umodes = list(umodes)
|
||||
final_umodes_liste = [x for x in self.Base.Settings.PROTOCTL_USER_MODES if x in liste_umodes]
|
||||
final_umodes = ''.join(final_umodes_liste)
|
||||
|
||||
user_obj.umodes = f"+{final_umodes}"
|
||||
|
||||
return response
|
||||
|
||||
def delete(self, uid: str) -> bool:
|
||||
"""Delete the User starting from the UID
|
||||
|
||||
Args:
|
||||
uid (str): UID of the user
|
||||
|
||||
Returns:
|
||||
bool: True if deleted
|
||||
"""
|
||||
|
||||
user_obj = self.get_Client(uidornickname=uid)
|
||||
|
||||
if user_obj is None:
|
||||
return False
|
||||
|
||||
self.CLIENT_DB.remove(user_obj)
|
||||
|
||||
return True
|
||||
|
||||
def get_Client(self, uidornickname: str) -> Optional['MClient']:
|
||||
"""Get The Client Object model
|
||||
|
||||
Args:
|
||||
uidornickname (str): UID or Nickname
|
||||
|
||||
Returns:
|
||||
UserModel|None: The UserModel Object | None
|
||||
"""
|
||||
for record in self.CLIENT_DB:
|
||||
if record.uid == uidornickname:
|
||||
return record
|
||||
elif record.nickname == uidornickname:
|
||||
return record
|
||||
|
||||
return None
|
||||
|
||||
def get_uid(self, uidornickname:str) -> Optional[str]:
|
||||
"""Get the UID of the user starting from the UID or the Nickname
|
||||
|
||||
Args:
|
||||
uidornickname (str): UID or Nickname
|
||||
|
||||
Returns:
|
||||
str|None: Return the UID
|
||||
"""
|
||||
|
||||
client_obj = self.get_Client(uidornickname=uidornickname)
|
||||
|
||||
if client_obj is None:
|
||||
return None
|
||||
|
||||
return client_obj.uid
|
||||
|
||||
def get_nickname(self, uidornickname:str) -> Union[str, None]:
|
||||
"""Get the Nickname starting from UID or the nickname
|
||||
|
||||
Args:
|
||||
uidornickname (str): UID or Nickname of the user
|
||||
|
||||
Returns:
|
||||
str|None: the nickname
|
||||
"""
|
||||
client_obj = self.get_Client(uidornickname=uidornickname)
|
||||
|
||||
if client_obj is None:
|
||||
return None
|
||||
|
||||
return client_obj.nickname
|
||||
|
||||
def get_client_asdict(self, uidornickname: str) -> Optional[dict[str, Any]]:
|
||||
"""Transform User Object to a dictionary
|
||||
|
||||
Args:
|
||||
uidornickname (str): The UID or The nickname
|
||||
|
||||
Returns:
|
||||
Union[dict[str, any], None]: User Object as a dictionary or None
|
||||
"""
|
||||
client_obj = self.get_Client(uidornickname=uidornickname)
|
||||
|
||||
if client_obj is None:
|
||||
return None
|
||||
|
||||
return client_obj.to_dict()
|
||||
|
||||
def is_exist(self, uidornickname: str) -> bool:
|
||||
"""Check if the UID or the nickname exist in the USER DB
|
||||
|
||||
Args:
|
||||
uidornickname (str): The UID or the NICKNAME
|
||||
|
||||
Returns:
|
||||
bool: True if exist
|
||||
"""
|
||||
user_obj = self.get_Client(uidornickname=uidornickname)
|
||||
|
||||
if user_obj is None:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def db_is_account_exist(self, account: str) -> bool:
|
||||
"""Check if the account exist in the database
|
||||
|
||||
Args:
|
||||
account (str): The account to check
|
||||
|
||||
Returns:
|
||||
bool: True if exist
|
||||
"""
|
||||
|
||||
table_client = self.Base.Config.TABLE_CLIENT
|
||||
account_to_check = {'account': account.lower()}
|
||||
account_to_check_query = self.Base.db_execute_query(f"""
|
||||
SELECT id FROM {table_client} WHERE LOWER(account) = :account
|
||||
""", account_to_check)
|
||||
|
||||
account_to_check_result = account_to_check_query.fetchone()
|
||||
if account_to_check_result:
|
||||
self.Logs.error(f"Account ({account}) already exist")
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def clean_uid(self, uid: str) -> Union[str, None]:
|
||||
"""Clean UID by removing @ / % / + / ~ / * / :
|
||||
|
||||
Args:
|
||||
uid (str): The UID to clean
|
||||
|
||||
Returns:
|
||||
str: Clean UID without any sign
|
||||
"""
|
||||
|
||||
pattern = fr'[:|@|%|\+|~|\*]*'
|
||||
parsed_uid = sub(pattern, '', uid)
|
||||
|
||||
if not parsed_uid:
|
||||
return None
|
||||
|
||||
return parsed_uid
|
||||
109
core/classes/modules/commands.py
Normal file
109
core/classes/modules/commands.py
Normal file
@@ -0,0 +1,109 @@
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
from core.definition import MCommand
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.loader import Loader
|
||||
|
||||
class Command:
|
||||
|
||||
DB_COMMANDS: list['MCommand'] = []
|
||||
|
||||
def __init__(self, loader: 'Loader'):
|
||||
"""
|
||||
Args:
|
||||
loader (Loader): The Loader instance.
|
||||
"""
|
||||
self.Loader = loader
|
||||
self.Base = loader.Base
|
||||
self.Logs = loader.Logs
|
||||
|
||||
def build(self, new_command_obj: MCommand) -> bool:
|
||||
|
||||
command = self.get_command(new_command_obj.command_name, new_command_obj.module_name)
|
||||
if command is None:
|
||||
self.DB_COMMANDS.append(new_command_obj)
|
||||
return True
|
||||
|
||||
# Update command if it exist
|
||||
# Removing the object
|
||||
if self.drop_command(command.command_name, command.module_name):
|
||||
# Add the new object
|
||||
self.DB_COMMANDS.append(new_command_obj)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def get_command(self, command_name: str, module_name: str) -> Optional[MCommand]:
|
||||
|
||||
for command in self.DB_COMMANDS:
|
||||
if command.command_name.lower() == command_name.lower() and command.module_name.lower() == module_name.lower():
|
||||
return command
|
||||
|
||||
return None
|
||||
|
||||
def drop_command(self, command_name: str, module_name: str) -> bool:
|
||||
|
||||
cmd = self.get_command(command_name, module_name)
|
||||
if cmd is not None:
|
||||
self.DB_COMMANDS.remove(cmd)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def drop_command_by_module(self, module_name: str) -> bool:
|
||||
"""Drop all command by module
|
||||
|
||||
Args:
|
||||
module_name (str): The module name
|
||||
|
||||
Returns:
|
||||
bool: True
|
||||
"""
|
||||
tmp_model: list[MCommand] = []
|
||||
|
||||
for command in self.DB_COMMANDS:
|
||||
if command.module_name.lower() == module_name.lower():
|
||||
tmp_model.append(command)
|
||||
|
||||
for c in tmp_model:
|
||||
self.DB_COMMANDS.remove(c)
|
||||
|
||||
self.Logs.debug(f"[COMMAND] Drop command for module {module_name}")
|
||||
return True
|
||||
|
||||
def get_ordered_commands(self) -> list[MCommand]:
|
||||
return sorted(self.DB_COMMANDS, key=lambda c: (c.command_level, c.module_name))
|
||||
|
||||
def get_commands_by_level(self, level: int = 0) -> Optional[list[MCommand]]:
|
||||
|
||||
cmd_list = self.get_ordered_commands()
|
||||
new_list: list[MCommand] = []
|
||||
|
||||
for cmd in cmd_list:
|
||||
if cmd.command_level <= level:
|
||||
new_list.append(cmd)
|
||||
|
||||
return new_list
|
||||
|
||||
def is_client_allowed_to_run_command(self, nickname: str, command_name: str) -> bool:
|
||||
admin = self.Loader.Admin.get_admin(nickname)
|
||||
admin_level = admin.level if admin else 0
|
||||
commands = self.get_commands_by_level(admin_level)
|
||||
|
||||
if command_name.lower() in [command.command_name.lower() for command in commands]:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def is_command_exist(self, command_name: str) -> bool:
|
||||
"""Check if the command name exist
|
||||
|
||||
Args:
|
||||
command_name (str): The command name you want to check
|
||||
|
||||
Returns:
|
||||
bool: True if the command exist
|
||||
"""
|
||||
if command_name.lower() in [command.command_name.lower() for command in self.get_ordered_commands()]:
|
||||
return True
|
||||
return False
|
||||
60
core/classes/modules/config.py
Normal file
60
core/classes/modules/config.py
Normal file
@@ -0,0 +1,60 @@
|
||||
import sys
|
||||
import yaml
|
||||
from json import load
|
||||
from sys import exit
|
||||
from os import sep
|
||||
from typing import Any, Optional, Union, TYPE_CHECKING
|
||||
from core.definition import MConfig
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.loader import Loader
|
||||
|
||||
class Configuration:
|
||||
|
||||
def __init__(self, loader: 'Loader') -> None:
|
||||
|
||||
self.Loader = loader
|
||||
self.Logs = loader.Logs
|
||||
self.configuration_model = self.__load_service_configuration()
|
||||
loader.ServiceLogging.set_file_handler_level(self._config_model.DEBUG_LEVEL)
|
||||
loader.ServiceLogging.set_stdout_handler_level(self._config_model.DEBUG_LEVEL)
|
||||
loader.ServiceLogging.update_handler_format(self._config_model.DEBUG_HARD)
|
||||
return None
|
||||
|
||||
@property
|
||||
def configuration_model(self) -> MConfig:
|
||||
return self._config_model
|
||||
|
||||
@configuration_model.setter
|
||||
def configuration_model(self, conf_model: MConfig):
|
||||
self._config_model = conf_model
|
||||
|
||||
def __load_config_file(self) -> Optional[dict[str, Any]]:
|
||||
try:
|
||||
conf_filename = f'config{sep}configuration.yaml'
|
||||
with open(conf_filename, 'r') as conf:
|
||||
configuration: dict[str, dict[str, Any]] = yaml.safe_load(conf)
|
||||
|
||||
return configuration.get('configuration', None)
|
||||
except FileNotFoundError as fe:
|
||||
self.Logs.error(f'FileNotFound: {fe}')
|
||||
self.Logs.error('Configuration file not found please create config/configuration.yaml')
|
||||
exit("Configuration file not found please create config/configuration.yaml")
|
||||
|
||||
def __load_service_configuration(self) -> MConfig:
|
||||
try:
|
||||
import_config = self.__load_config_file()
|
||||
if import_config is None:
|
||||
self.Logs.error("Error While importing configuration file!", exc_info=True)
|
||||
raise Exception("Error While importing yaml configuration")
|
||||
|
||||
list_key_to_remove: list[str] = [key_to_del for key_to_del in import_config if key_to_del not in MConfig().get_attributes()]
|
||||
for key_to_remove in list_key_to_remove:
|
||||
import_config.pop(key_to_remove, None)
|
||||
self.Logs.warning(f"[!] The key {key_to_remove} is not expected, it has been removed from the system ! please remove it from configuration.json file [!]")
|
||||
|
||||
self.Logs.debug(f"[LOADING CONFIGURATION]: Loading configuration with {len(import_config)} parameters!")
|
||||
return MConfig(**import_config)
|
||||
|
||||
except TypeError as te:
|
||||
self.Logs.error(te)
|
||||
125
core/classes/modules/rehash.py
Normal file
125
core/classes/modules/rehash.py
Normal file
@@ -0,0 +1,125 @@
|
||||
import importlib
|
||||
import sys
|
||||
import time
|
||||
from typing import TYPE_CHECKING
|
||||
import socket
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.irc import Irc
|
||||
|
||||
# Modules impacted by rehashing!
|
||||
REHASH_MODULES = [
|
||||
'core.definition',
|
||||
'core.utils',
|
||||
'core.classes.modules.config',
|
||||
'core.base',
|
||||
'core.classes.modules.commands',
|
||||
'core.classes.interfaces.iprotocol',
|
||||
'core.classes.protocols.command_handler',
|
||||
'core.classes.protocols.factory',
|
||||
'core.classes.protocols.unreal6',
|
||||
'core.classes.protocols.inspircd'
|
||||
]
|
||||
|
||||
|
||||
def restart_service(uplink: 'Irc', reason: str = "Restarting with no reason!") -> None:
|
||||
"""
|
||||
|
||||
Args:
|
||||
uplink (Irc): The Irc instance
|
||||
reason (str): The reason of the restart.
|
||||
"""
|
||||
# reload modules.
|
||||
for module in uplink.ModuleUtils.model_get_loaded_modules().copy():
|
||||
uplink.ModuleUtils.unload_one_module(uplink, module.module_name)
|
||||
|
||||
uplink.Base.garbage_collector_thread()
|
||||
|
||||
uplink.Logs.debug(f'[{uplink.Config.SERVICE_NICKNAME} RESTART]: Reloading configuration!')
|
||||
uplink.Protocol.send_squit(server_id=uplink.Config.SERVEUR_ID, server_link=uplink.Config.SERVEUR_LINK, reason=reason)
|
||||
uplink.Logs.debug('Restarting Defender ...')
|
||||
uplink.IrcSocket.shutdown(socket.SHUT_RDWR)
|
||||
uplink.IrcSocket.close()
|
||||
|
||||
while uplink.IrcSocket.fileno() != -1:
|
||||
time.sleep(0.5)
|
||||
uplink.Logs.warning('-- Waiting for socket to close ...')
|
||||
|
||||
# Reload configuration
|
||||
uplink.Loader.Config = uplink.Loader.ConfModule.Configuration(uplink.Loader).configuration_model
|
||||
uplink.Loader.Base = uplink.Loader.BaseModule.Base(uplink.Loader)
|
||||
|
||||
for mod in REHASH_MODULES:
|
||||
importlib.reload(sys.modules[mod])
|
||||
|
||||
uplink.Protocol = uplink.Loader.PFactory.get()
|
||||
uplink.Protocol.register_command()
|
||||
|
||||
uplink.ModuleUtils.model_clear() # Clear loaded modules.
|
||||
uplink.User.UID_DB.clear() # Clear User Object
|
||||
uplink.Channel.UID_CHANNEL_DB.clear() # Clear Channel Object
|
||||
uplink.Client.CLIENT_DB.clear() # Clear Client object
|
||||
|
||||
uplink.init_service_user()
|
||||
uplink.Utils.create_socket(uplink)
|
||||
uplink.Protocol.send_link()
|
||||
uplink.Config.DEFENDER_RESTART = 0
|
||||
|
||||
def rehash_service(uplink: 'Irc', nickname: str) -> None:
|
||||
need_a_restart = ["SERVEUR_ID"]
|
||||
uplink.Settings.set_cache('db_commands', uplink.Commands.DB_COMMANDS)
|
||||
restart_flag = False
|
||||
config_model_bakcup = uplink.Config
|
||||
mods = REHASH_MODULES
|
||||
for mod in mods:
|
||||
importlib.reload(sys.modules[mod])
|
||||
uplink.Protocol.send_priv_msg(
|
||||
nick_from=uplink.Config.SERVICE_NICKNAME,
|
||||
msg=f'[REHASH] Module [{mod}] reloaded',
|
||||
channel=uplink.Config.SERVICE_CHANLOG
|
||||
)
|
||||
uplink.Utils = sys.modules['core.utils']
|
||||
uplink.Config = uplink.Loader.ConfModule.Configuration(uplink.Loader).configuration_model
|
||||
uplink.Config.HSID = config_model_bakcup.HSID
|
||||
uplink.Config.DEFENDER_INIT = config_model_bakcup.DEFENDER_INIT
|
||||
uplink.Config.DEFENDER_RESTART = config_model_bakcup.DEFENDER_RESTART
|
||||
uplink.Config.SSL_VERSION = config_model_bakcup.SSL_VERSION
|
||||
uplink.Config.CURRENT_VERSION = config_model_bakcup.CURRENT_VERSION
|
||||
uplink.Config.LATEST_VERSION = config_model_bakcup.LATEST_VERSION
|
||||
|
||||
conf_bkp_dict: dict = config_model_bakcup.to_dict()
|
||||
config_dict: dict = uplink.Config.to_dict()
|
||||
|
||||
for key, value in conf_bkp_dict.items():
|
||||
if config_dict[key] != value and key != 'COLORS':
|
||||
uplink.Protocol.send_priv_msg(
|
||||
nick_from=uplink.Config.SERVICE_NICKNAME,
|
||||
msg=f'[{key}]: {value} ==> {config_dict[key]}',
|
||||
channel=uplink.Config.SERVICE_CHANLOG
|
||||
)
|
||||
if key in need_a_restart:
|
||||
restart_flag = True
|
||||
|
||||
if config_model_bakcup.SERVICE_NICKNAME != uplink.Config.SERVICE_NICKNAME:
|
||||
uplink.Protocol.send_set_nick(uplink.Config.SERVICE_NICKNAME)
|
||||
|
||||
if restart_flag:
|
||||
uplink.Config.SERVEUR_ID = config_model_bakcup.SERVEUR_ID
|
||||
uplink.Protocol.send_priv_msg(
|
||||
nick_from=uplink.Config.SERVICE_NICKNAME,
|
||||
channel=uplink.Config.SERVICE_CHANLOG,
|
||||
msg='You need to restart defender !')
|
||||
|
||||
# Reload Main Commands Module
|
||||
uplink.Commands = uplink.Loader.CommandModule.Command(uplink.Loader)
|
||||
uplink.Commands.DB_COMMANDS = uplink.Settings.get_cache('db_commands')
|
||||
|
||||
uplink.Loader.Base = uplink.Loader.BaseModule.Base(uplink.Loader)
|
||||
uplink.Protocol = uplink.Loader.PFactory.get()
|
||||
uplink.Protocol.register_command()
|
||||
|
||||
# Reload Service modules
|
||||
for module in uplink.ModuleUtils.model_get_loaded_modules().copy():
|
||||
uplink.ModuleUtils.reload_one_module(uplink, module.module_name, nickname)
|
||||
|
||||
return None
|
||||
162
core/classes/modules/reputation.py
Normal file
162
core/classes/modules/reputation.py
Normal file
@@ -0,0 +1,162 @@
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
from core.definition import MReputation
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.loader import Loader
|
||||
|
||||
class Reputation:
|
||||
|
||||
UID_REPUTATION_DB: list[MReputation] = []
|
||||
|
||||
def __init__(self, loader: 'Loader'):
|
||||
"""
|
||||
|
||||
Args:
|
||||
loader (Loader): The Loader instance.
|
||||
"""
|
||||
|
||||
self.Logs = loader.Logs
|
||||
self.MReputation: Optional[MReputation] = None
|
||||
|
||||
def insert(self, new_reputation_user: MReputation) -> bool:
|
||||
"""Insert a new Reputation User object
|
||||
|
||||
Args:
|
||||
new_reputation_user (MReputation): New Reputation Model object
|
||||
|
||||
Returns:
|
||||
bool: True if inserted
|
||||
"""
|
||||
result = False
|
||||
exist = False
|
||||
|
||||
for record in self.UID_REPUTATION_DB:
|
||||
if record.uid == new_reputation_user.uid:
|
||||
# If the user exist then return False and do not go further
|
||||
exist = True
|
||||
self.Logs.debug(f'{record.uid} already exist')
|
||||
return result
|
||||
|
||||
if not exist:
|
||||
self.UID_REPUTATION_DB.append(new_reputation_user)
|
||||
result = True
|
||||
self.Logs.debug(f'New Reputation User Captured: ({new_reputation_user})')
|
||||
|
||||
if not result:
|
||||
self.Logs.critical(f'The Reputation User Object was not inserted {new_reputation_user}')
|
||||
|
||||
return result
|
||||
|
||||
def update(self, uid: str, new_nickname: str) -> bool:
|
||||
"""Update the nickname starting from the UID
|
||||
|
||||
Args:
|
||||
uid (str): UID of the user
|
||||
new_nickname (str): New nickname
|
||||
|
||||
Returns:
|
||||
bool: True if updated
|
||||
"""
|
||||
|
||||
reputation_obj = self.get_reputation(uid)
|
||||
|
||||
if reputation_obj is None:
|
||||
return False
|
||||
|
||||
reputation_obj.nickname = new_nickname
|
||||
|
||||
return True
|
||||
|
||||
def delete(self, uid: str) -> bool:
|
||||
"""Delete the User starting from the UID
|
||||
|
||||
Args:
|
||||
uid (str): UID of the user
|
||||
|
||||
Returns:
|
||||
bool: True if deleted
|
||||
"""
|
||||
result = False
|
||||
|
||||
if not self.is_exist(uid):
|
||||
return result
|
||||
|
||||
for record in self.UID_REPUTATION_DB:
|
||||
if record.uid == uid:
|
||||
# If the user exist then remove and return True and do not go further
|
||||
self.UID_REPUTATION_DB.remove(record)
|
||||
result = True
|
||||
self.Logs.debug(f'UID ({record.uid}) has been deleted')
|
||||
return result
|
||||
|
||||
if not result:
|
||||
self.Logs.critical(f'The UID {uid} was not deleted')
|
||||
|
||||
return result
|
||||
|
||||
def get_reputation(self, uidornickname: str) -> Optional[MReputation]:
|
||||
"""Get The User Object model
|
||||
|
||||
Args:
|
||||
uidornickname (str): UID or Nickname
|
||||
|
||||
Returns:
|
||||
UserModel|None: The UserModel Object | None
|
||||
"""
|
||||
for record in self.UID_REPUTATION_DB:
|
||||
if record.uid == uidornickname:
|
||||
return record
|
||||
elif record.nickname == uidornickname:
|
||||
return record
|
||||
|
||||
return None
|
||||
|
||||
def get_uid(self, uidornickname: str) -> Optional[str]:
|
||||
"""Get the UID of the user starting from the UID or the Nickname
|
||||
|
||||
Args:
|
||||
uidornickname (str): UID or Nickname
|
||||
|
||||
Returns:
|
||||
str|None: Return the UID
|
||||
"""
|
||||
|
||||
reputation_obj = self.get_reputation(uidornickname)
|
||||
|
||||
if reputation_obj is None:
|
||||
return None
|
||||
|
||||
return reputation_obj.uid
|
||||
|
||||
def get_nickname(self, uidornickname: str) -> Optional[str]:
|
||||
"""Get the Nickname starting from UID or the nickname
|
||||
|
||||
Args:
|
||||
uidornickname (str): UID or Nickname of the user
|
||||
|
||||
Returns:
|
||||
str|None: the nickname
|
||||
"""
|
||||
reputation_obj = self.get_reputation(uidornickname)
|
||||
|
||||
if reputation_obj is None:
|
||||
return None
|
||||
|
||||
return reputation_obj.nickname
|
||||
|
||||
def is_exist(self, uidornickname: str) -> bool:
|
||||
"""Check if the UID or the nickname exist in the reputation DB
|
||||
|
||||
Args:
|
||||
uidornickname (str): The UID or the NICKNAME
|
||||
|
||||
Returns:
|
||||
bool: True if exist
|
||||
"""
|
||||
|
||||
reputation_obj = self.get_reputation(uidornickname)
|
||||
|
||||
if isinstance(reputation_obj, MReputation):
|
||||
return True
|
||||
|
||||
return False
|
||||
75
core/classes/modules/sasl.py
Normal file
75
core/classes/modules/sasl.py
Normal file
@@ -0,0 +1,75 @@
|
||||
from typing import Optional, TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.definition import MSasl
|
||||
from core.loader import Loader
|
||||
|
||||
class Sasl:
|
||||
|
||||
DB_SASL: list['MSasl'] = []
|
||||
|
||||
def __init__(self, loader: 'Loader'):
|
||||
"""
|
||||
|
||||
Args:
|
||||
loader (Loader): The Loader instance.
|
||||
"""
|
||||
self.Logs = loader.Logs # logger
|
||||
|
||||
def insert_sasl_client(self, psasl: 'MSasl') -> bool:
|
||||
"""Insert a new Sasl authentication
|
||||
|
||||
Args:
|
||||
psasl (MSasl): New userModel object
|
||||
|
||||
Returns:
|
||||
bool: True if inserted
|
||||
"""
|
||||
|
||||
if psasl is None:
|
||||
return False
|
||||
|
||||
sasl_obj = self.get_sasl_obj(psasl.client_uid)
|
||||
|
||||
if sasl_obj is not None:
|
||||
# User already created return False
|
||||
return False
|
||||
|
||||
self.DB_SASL.append(psasl)
|
||||
|
||||
return True
|
||||
|
||||
def delete_sasl_client(self, client_uid: str) -> bool:
|
||||
"""Delete the User starting from the UID
|
||||
|
||||
Args:
|
||||
client_uid (str): UID of the user
|
||||
|
||||
Returns:
|
||||
bool: True if deleted
|
||||
"""
|
||||
|
||||
sasl_obj = self.get_sasl_obj(client_uid)
|
||||
|
||||
if sasl_obj is None:
|
||||
return False
|
||||
|
||||
self.DB_SASL.remove(sasl_obj)
|
||||
|
||||
return True
|
||||
|
||||
def get_sasl_obj(self, client_uid: str) -> Optional['MSasl']:
|
||||
"""Get sasl client Object model
|
||||
|
||||
Args:
|
||||
client_uid (str): UID of the client
|
||||
|
||||
Returns:
|
||||
UserModel|None: The SASL Object | None
|
||||
"""
|
||||
|
||||
for record in self.DB_SASL:
|
||||
if record.client_uid == client_uid:
|
||||
return record
|
||||
|
||||
return None
|
||||
127
core/classes/modules/settings.py
Normal file
127
core/classes/modules/settings.py
Normal file
@@ -0,0 +1,127 @@
|
||||
"""This class should never be reloaded.
|
||||
"""
|
||||
from logging import Logger
|
||||
from threading import Timer, Thread, RLock
|
||||
from socket import socket
|
||||
from typing import Any, Optional, TYPE_CHECKING
|
||||
from core.definition import MSModule, MAdmin
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.classes.user import User
|
||||
|
||||
class Settings:
|
||||
"""This Class will never be reloaded.
|
||||
Means that the variables are available during
|
||||
the whole life of the app
|
||||
"""
|
||||
|
||||
RUNNING_TIMERS: list[Timer] = []
|
||||
RUNNING_THREADS: list[Thread] = []
|
||||
RUNNING_SOCKETS: list[socket] = []
|
||||
PERIODIC_FUNC: dict[str, Any] = {}
|
||||
LOCK: RLock = RLock()
|
||||
|
||||
CONSOLE: bool = False
|
||||
|
||||
MAIN_SERVER_HOSTNAME: str = None
|
||||
MAIN_SERVER_ID: str = None
|
||||
PROTOCTL_PREFIX_MODES_SIGNES : dict[str, str] = {}
|
||||
PROTOCTL_PREFIX_SIGNES_MODES : dict[str, str] = {}
|
||||
PROTOCTL_USER_MODES: list[str] = []
|
||||
PROTOCTL_CHANNEL_MODES: list[str] = []
|
||||
PROTOCTL_PREFIX: list[str] = []
|
||||
|
||||
SMOD_MODULES: list[MSModule] = []
|
||||
"""List contains all Server modules"""
|
||||
|
||||
__CACHE: dict[str, Any] = {}
|
||||
"""Use set_cache or get_cache instead"""
|
||||
|
||||
__TRANSLATION: dict[str, list[list[str]]] = dict()
|
||||
"""Translation Varibale"""
|
||||
|
||||
__LANG: str = "EN"
|
||||
|
||||
__INSTANCE_OF_USER_UTILS: Optional['User'] = None
|
||||
"""Instance of the User Utils class"""
|
||||
|
||||
__CURRENT_ADMIN: Optional['MAdmin'] = None
|
||||
"""The Current Admin Object Model"""
|
||||
|
||||
__LOGGER: Optional[Logger] = None
|
||||
"""Instance of the logger"""
|
||||
|
||||
def set_cache(self, key: str, value_to_cache: Any):
|
||||
"""When you want to store a variable
|
||||
|
||||
Ex.
|
||||
```python
|
||||
set_cache('MY_KEY', {'key1': 'value1', 'key2', 'value2'})
|
||||
```
|
||||
Args:
|
||||
key (str): The key you want to add.
|
||||
value_to_cache (Any): The Value you want to store.
|
||||
"""
|
||||
self.__CACHE[key] = value_to_cache
|
||||
|
||||
def get_cache(self, key) -> Optional[Any]:
|
||||
"""It returns the value associated to the key and finally it removes the entry"""
|
||||
if self.__CACHE.get(key, None) is not None:
|
||||
return self.__CACHE.pop(key)
|
||||
|
||||
return None
|
||||
|
||||
def get_cache_size(self) -> int:
|
||||
return len(self.__CACHE)
|
||||
|
||||
def clear_cache(self) -> None:
|
||||
self.__CACHE.clear()
|
||||
|
||||
def show_cache(self) -> dict[str, Any]:
|
||||
return self.__CACHE.copy()
|
||||
|
||||
@property
|
||||
def global_translation(self) -> dict[str, list[list[str]]]:
|
||||
"""Get/set global translation variable"""
|
||||
return self.__TRANSLATION
|
||||
|
||||
@global_translation.setter
|
||||
def global_translation(self, translation_var: dict) -> None:
|
||||
self.__TRANSLATION = translation_var
|
||||
|
||||
@property
|
||||
def global_lang(self) -> str:
|
||||
"""Global default language."""
|
||||
return self.__LANG
|
||||
|
||||
@global_lang.setter
|
||||
def global_lang(self, lang: str) -> None:
|
||||
self.__LANG = lang
|
||||
|
||||
@property
|
||||
def global_user(self) -> 'User':
|
||||
return self.__INSTANCE_OF_USER_UTILS
|
||||
|
||||
@global_user.setter
|
||||
def global_user(self, user_utils_instance: 'User') -> None:
|
||||
self.__INSTANCE_OF_USER_UTILS = user_utils_instance
|
||||
|
||||
@property
|
||||
def current_admin(self) -> MAdmin:
|
||||
"""Current admin data model."""
|
||||
return self.__CURRENT_ADMIN
|
||||
|
||||
@current_admin.setter
|
||||
def current_admin(self, current_admin: MAdmin) -> None:
|
||||
self.__CURRENT_ADMIN = current_admin
|
||||
|
||||
@property
|
||||
def global_logger(self) -> Logger:
|
||||
"""Global logger Instance"""
|
||||
return self.__LOGGER
|
||||
|
||||
@global_logger.setter
|
||||
def global_logger(self, logger: Logger) -> None:
|
||||
self.__LOGGER = logger
|
||||
|
||||
global_settings = Settings()
|
||||
96
core/classes/modules/translation.py
Normal file
96
core/classes/modules/translation.py
Normal file
@@ -0,0 +1,96 @@
|
||||
import yaml
|
||||
import yaml.scanner
|
||||
from os import sep
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.loader import Loader
|
||||
|
||||
|
||||
class Translation:
|
||||
|
||||
def __init__(self, loader: 'Loader') -> None:
|
||||
"""
|
||||
|
||||
Args:
|
||||
loader (Loader): The Loader instance.
|
||||
"""
|
||||
self.Logs = loader.Logs
|
||||
self.Settings = loader.Settings
|
||||
|
||||
def get_translation(self) -> dict[str, list[list[str]]]:
|
||||
try:
|
||||
translation: dict[str, list[list[str]]] = dict()
|
||||
sfs: dict[str, list[list[str]]] = {}
|
||||
|
||||
module_translation_directory = Path("mods")
|
||||
core_translation_directory = Path("core")
|
||||
sfs_core = self.get_subfolders_name(core_translation_directory.__str__())
|
||||
sfs_module = self.get_subfolders_name(module_translation_directory.__str__())
|
||||
|
||||
# Combine the 2 dict
|
||||
for d in (sfs_core, sfs_module):
|
||||
for k, v in d.items():
|
||||
sfs.setdefault(k, []).extend(v)
|
||||
|
||||
loaded_files: list[str] = []
|
||||
|
||||
for module, filenames in sfs.items():
|
||||
translation[module] = []
|
||||
for filename in filenames:
|
||||
with open(f"{filename}", "r", encoding="utf-8") as fyaml:
|
||||
data: dict[str, list[dict[str, str]]] = yaml.safe_load(fyaml)
|
||||
|
||||
if not isinstance(data, dict):
|
||||
continue
|
||||
|
||||
for key, list_trad in data.items():
|
||||
for vlist in list_trad:
|
||||
translation[module].append([vlist["orig"], vlist["trad"]])
|
||||
|
||||
loaded_files.append(f"{filename}")
|
||||
|
||||
return translation
|
||||
|
||||
except yaml.scanner.ScannerError as se:
|
||||
self.Logs.error(f"[!] {se} [!]")
|
||||
return {}
|
||||
except yaml.YAMLError as ye:
|
||||
if hasattr(ye, 'problem_mark'):
|
||||
mark = ye.problem_mark
|
||||
self.Logs.error(f"Error YAML: {ye.with_traceback(None)}")
|
||||
self.Logs.error("Error position: (%s:%s)" % (mark.line+1, mark.column+1))
|
||||
return {}
|
||||
except yaml.error.MarkedYAMLError as me:
|
||||
self.Logs.error(f"[!] {me} [!]")
|
||||
return {}
|
||||
except Exception as err:
|
||||
self.Logs.error(f'General Error: {err}', exc_info=True)
|
||||
return {}
|
||||
|
||||
finally:
|
||||
self.Logs.debug("Translation files loaded")
|
||||
for f in loaded_files:
|
||||
self.Logs.debug(f" - {f}")
|
||||
|
||||
def get_subfolders_name(self, directory: str) -> dict[str, list[str]]:
|
||||
try:
|
||||
translation_information: dict[str, list[str]] = dict()
|
||||
main_directory = Path(directory)
|
||||
|
||||
# Init the dictionnary
|
||||
for subfolder in main_directory.rglob(f'*language{sep}*{sep}*.yaml'):
|
||||
if subfolder.name != '__pycache__':
|
||||
translation_information[subfolder.parent.name.lower()] = []
|
||||
|
||||
|
||||
for subfolder in main_directory.rglob(f'*language{sep}*{sep}*.yaml'):
|
||||
if subfolder.name != '__pycache__':
|
||||
translation_information[subfolder.parent.name.lower()].append(subfolder)
|
||||
|
||||
return translation_information
|
||||
|
||||
except Exception as err:
|
||||
self.Logs.error(f'General Error: {err}')
|
||||
return {}
|
||||
256
core/classes/modules/user.py
Normal file
256
core/classes/modules/user.py
Normal file
@@ -0,0 +1,256 @@
|
||||
from re import sub
|
||||
from typing import Any, Optional, TYPE_CHECKING
|
||||
from datetime import datetime
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.loader import Loader
|
||||
from core.definition import MUser
|
||||
|
||||
class User:
|
||||
|
||||
UID_DB: list['MUser'] = []
|
||||
|
||||
@property
|
||||
def get_current_user(self) -> 'MUser':
|
||||
return self.current_user
|
||||
|
||||
def __init__(self, loader: 'Loader'):
|
||||
|
||||
self.Logs = loader.Logs
|
||||
self.Base = loader.Base
|
||||
self.current_user: Optional['MUser'] = None
|
||||
|
||||
def insert(self, new_user: 'MUser') -> bool:
|
||||
"""Insert a new User object
|
||||
|
||||
Args:
|
||||
newUser (UserModel): New userModel object
|
||||
|
||||
Returns:
|
||||
bool: True if inserted
|
||||
"""
|
||||
|
||||
user_obj = self.get_user(new_user.uid)
|
||||
if not user_obj is None:
|
||||
# User already created return False
|
||||
return False
|
||||
|
||||
self.UID_DB.append(new_user)
|
||||
|
||||
return True
|
||||
|
||||
def update_nickname(self, uid: str, new_nickname: str) -> bool:
|
||||
"""Update the nickname starting from the UID
|
||||
|
||||
Args:
|
||||
uid (str): UID of the user
|
||||
new_nickname (str): New nickname
|
||||
|
||||
Returns:
|
||||
bool: True if updated
|
||||
"""
|
||||
user_obj = self.get_user(uidornickname=uid)
|
||||
|
||||
if user_obj is None:
|
||||
return False
|
||||
|
||||
user_obj.nickname = new_nickname
|
||||
self.Logs.debug(f"UID ({uid}) has benn update with new nickname ({new_nickname}).")
|
||||
return True
|
||||
|
||||
def update_mode(self, uidornickname: str, modes: str) -> bool:
|
||||
"""Updating user mode
|
||||
|
||||
Args:
|
||||
uidornickname (str): The UID or Nickname of the user
|
||||
modes (str): new modes to update
|
||||
|
||||
Returns:
|
||||
bool: True if user mode has been updaed
|
||||
"""
|
||||
response = True
|
||||
user_obj = self.get_user(uidornickname=uidornickname)
|
||||
|
||||
if user_obj is None:
|
||||
return False
|
||||
|
||||
action = modes[0]
|
||||
new_modes = modes[1:]
|
||||
|
||||
existing_umodes = user_obj.umodes
|
||||
umodes = user_obj.umodes
|
||||
|
||||
if action == '+':
|
||||
|
||||
for nm in new_modes:
|
||||
if nm not in existing_umodes:
|
||||
umodes += nm
|
||||
|
||||
elif action == '-':
|
||||
for nm in new_modes:
|
||||
if nm in existing_umodes:
|
||||
umodes = umodes.replace(nm, '')
|
||||
else:
|
||||
return False
|
||||
|
||||
liste_umodes = list(umodes)
|
||||
final_umodes_liste = [x for x in self.Base.Settings.PROTOCTL_USER_MODES if x in liste_umodes]
|
||||
final_umodes = ''.join(final_umodes_liste)
|
||||
|
||||
user_obj.umodes = f"+{final_umodes}"
|
||||
|
||||
return response
|
||||
|
||||
def delete(self, uid: str) -> bool:
|
||||
"""Delete the User starting from the UID
|
||||
|
||||
Args:
|
||||
uid (str): UID of the user
|
||||
|
||||
Returns:
|
||||
bool: True if deleted
|
||||
"""
|
||||
|
||||
user_obj = self.get_user(uidornickname=uid)
|
||||
|
||||
if user_obj is None:
|
||||
return False
|
||||
|
||||
self.UID_DB.remove(user_obj)
|
||||
|
||||
return True
|
||||
|
||||
def get_user(self, uidornickname: str) -> Optional['MUser']:
|
||||
"""Get The User Object model
|
||||
|
||||
Args:
|
||||
uidornickname (str): UID or Nickname
|
||||
|
||||
Returns:
|
||||
UserModel|None: The UserModel Object | None
|
||||
"""
|
||||
for record in self.UID_DB:
|
||||
if record.uid == uidornickname:
|
||||
self.current_user = record
|
||||
return record
|
||||
elif record.nickname == uidornickname:
|
||||
self.current_user = record
|
||||
return record
|
||||
|
||||
return None
|
||||
|
||||
def get_uid(self, uidornickname:str) -> Optional[str]:
|
||||
"""Get the UID of the user starting from the UID or the Nickname
|
||||
|
||||
Args:
|
||||
uidornickname (str): UID or Nickname
|
||||
|
||||
Returns:
|
||||
str|None: Return the UID
|
||||
"""
|
||||
|
||||
user_obj = self.get_user(uidornickname=uidornickname)
|
||||
|
||||
if user_obj is None:
|
||||
return None
|
||||
|
||||
self.current_user = user_obj
|
||||
return user_obj.uid
|
||||
|
||||
def get_nickname(self, uidornickname:str) -> Optional[str]:
|
||||
"""Get the Nickname starting from UID or the nickname
|
||||
|
||||
Args:
|
||||
uidornickname (str): UID or Nickname of the user
|
||||
|
||||
Returns:
|
||||
str|None: the nickname
|
||||
"""
|
||||
user_obj = self.get_user(uidornickname=uidornickname)
|
||||
|
||||
if user_obj is None:
|
||||
return None
|
||||
|
||||
self.current_user = user_obj
|
||||
return user_obj.nickname
|
||||
|
||||
def get_user_asdict(self, uidornickname: str) -> Optional[dict[str, Any]]:
|
||||
"""Transform User Object to a dictionary
|
||||
|
||||
Args:
|
||||
uidornickname (str): The UID or The nickname
|
||||
|
||||
Returns:
|
||||
Union[dict[str, any], None]: User Object as a dictionary or None
|
||||
"""
|
||||
user_obj = self.get_user(uidornickname=uidornickname)
|
||||
|
||||
if user_obj is None:
|
||||
return None
|
||||
|
||||
return user_obj.to_dict()
|
||||
|
||||
def is_exist(self, uidornikname: str) -> bool:
|
||||
"""Check if the UID or the nickname exist in the USER DB
|
||||
|
||||
Args:
|
||||
uidornickname (str): The UID or the NICKNAME
|
||||
|
||||
Returns:
|
||||
bool: True if exist
|
||||
"""
|
||||
user_obj = self.get_user(uidornickname=uidornikname)
|
||||
|
||||
if user_obj is None:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def clean_uid(self, uid: str) -> Optional[str]:
|
||||
"""Clean UID by removing @ / % / + / ~ / * / :
|
||||
|
||||
Args:
|
||||
uid (str): The UID to clean
|
||||
|
||||
Returns:
|
||||
str: Clean UID without any sign
|
||||
"""
|
||||
|
||||
pattern = fr'[:|@|%|\+|~|\*]*'
|
||||
parsed_UID = sub(pattern, '', uid)
|
||||
|
||||
if not parsed_UID:
|
||||
return None
|
||||
|
||||
return parsed_UID
|
||||
|
||||
def get_user_uptime_in_minutes(self, uidornickname: str) -> float:
|
||||
"""Retourne depuis quand l'utilisateur est connecté (in minutes).
|
||||
|
||||
Args:
|
||||
uid (str): The uid or le nickname
|
||||
|
||||
Returns:
|
||||
int: How long in minutes has the user been connected?
|
||||
"""
|
||||
|
||||
get_user = self.get_user(uidornickname)
|
||||
if get_user is None:
|
||||
return 0
|
||||
|
||||
# Convertir la date enregistrée dans UID_DB en un objet {datetime}
|
||||
connected_time_string = get_user.connexion_datetime
|
||||
|
||||
if isinstance(connected_time_string, datetime):
|
||||
connected_time = connected_time_string
|
||||
else:
|
||||
connected_time = datetime.strptime(connected_time_string, "%Y-%m-%d %H:%M:%S.%f")
|
||||
|
||||
# What time is it ?
|
||||
current_datetime = datetime.now()
|
||||
|
||||
uptime = current_datetime - connected_time
|
||||
convert_to_minutes = uptime.seconds / 60
|
||||
uptime_minutes = round(number=convert_to_minutes, ndigits=2)
|
||||
|
||||
return uptime_minutes
|
||||
Reference in New Issue
Block a user