mirror of
https://github.com/iio612/DEFENDER.git
synced 2026-02-13 19:24:23 +00:00
Updating cmd by handling all functions, Threads and timers and schemas in separate files. code should be clear
This commit is contained in:
48
core/base.py
48
core/base.py
@@ -1,6 +1,8 @@
|
||||
import importlib
|
||||
import os
|
||||
import re
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
import random
|
||||
import socket
|
||||
@@ -8,12 +10,12 @@ import hashlib
|
||||
import logging
|
||||
import threading
|
||||
import ipaddress
|
||||
|
||||
import ast
|
||||
from pathlib import Path
|
||||
from types import ModuleType
|
||||
import requests
|
||||
|
||||
from dataclasses import fields
|
||||
from typing import Union, Literal, TYPE_CHECKING
|
||||
from typing import Union, TYPE_CHECKING
|
||||
from base64 import b64decode, b64encode
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from sqlalchemy import create_engine, Engine, Connection, CursorResult
|
||||
@@ -155,15 +157,41 @@ class Base:
|
||||
currentdate = datetime.now().strftime('%d-%m-%Y %H:%M:%S')
|
||||
return currentdate
|
||||
|
||||
def get_all_modules(self) -> list:
|
||||
def get_all_modules(self) -> list[str]:
|
||||
"""Get list of all main modules
|
||||
using this pattern mod_*.py
|
||||
|
||||
all_files = os.listdir('mods/')
|
||||
all_modules: list = []
|
||||
for module in all_files:
|
||||
if module.endswith('.py') and not module == '__init__.py':
|
||||
all_modules.append(module.replace('.py', '').lower())
|
||||
Returns:
|
||||
list[str]: List of module names.
|
||||
"""
|
||||
base_path = Path('mods')
|
||||
return [file.name.replace('.py', '') for file in base_path.rglob('mod_*.py')]
|
||||
|
||||
return all_modules
|
||||
def reload_modules_with_dependencies(self, prefix: str = 'mods'):
|
||||
"""
|
||||
Reload all modules in sys.modules that start with the given prefix.
|
||||
Useful for reloading a full package during development.
|
||||
"""
|
||||
modules_to_reload = []
|
||||
|
||||
# Collect target modules
|
||||
for name, module in sys.modules.items():
|
||||
if (
|
||||
isinstance(module, ModuleType)
|
||||
and module is not None
|
||||
and name.startswith(prefix)
|
||||
):
|
||||
modules_to_reload.append((name, module))
|
||||
|
||||
# Sort to reload submodules before parent modules
|
||||
for name, module in sorted(modules_to_reload, key=lambda x: x[0], reverse=True):
|
||||
try:
|
||||
if 'mod_' not in name:
|
||||
importlib.reload(module)
|
||||
self.logs.debug(f'[LOAD_MODULE] Module {module} success')
|
||||
|
||||
except Exception:
|
||||
self.logs.error(f'[LOAD_MODULE] Module {module} failed [!]')
|
||||
|
||||
def create_log(self, log_message: str) -> None:
|
||||
"""Enregiste les logs
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
from typing import Union
|
||||
import core.definition as df
|
||||
from core.base import Base
|
||||
|
||||
import core.definition as df
|
||||
|
||||
class Admin:
|
||||
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
from re import findall
|
||||
from typing import Union, Literal, TYPE_CHECKING
|
||||
from dataclasses import asdict
|
||||
|
||||
from typing import Any, Optional, Literal, TYPE_CHECKING
|
||||
from core.classes import user
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -170,25 +168,22 @@ class Channel:
|
||||
except Exception as err:
|
||||
self.Logs.error(f'{err}')
|
||||
|
||||
def get_Channel(self, channel_name: str) -> Union['MChannel', None]:
|
||||
|
||||
Channel = None
|
||||
def get_Channel(self, channel_name: str) -> Optional['MChannel']:
|
||||
|
||||
for record in self.UID_CHANNEL_DB:
|
||||
if record.name == channel_name:
|
||||
Channel = record
|
||||
return record
|
||||
|
||||
return Channel
|
||||
return None
|
||||
|
||||
def get_Channel_AsDict(self, chan_name: str) -> Union[dict[str, any], None]:
|
||||
def get_channel_asdict(self, chan_name: str) -> Optional[dict[str, Any]]:
|
||||
|
||||
chanObj = self.get_Channel(chan_name=chan_name)
|
||||
channel_obj: Optional['MChannel'] = self.get_Channel(chan_name=chan_name)
|
||||
|
||||
if not chanObj is None:
|
||||
chan_as_dict = asdict(chanObj)
|
||||
return chan_as_dict
|
||||
else:
|
||||
if channel_obj is None:
|
||||
return None
|
||||
|
||||
return channel_obj.to_dict()
|
||||
|
||||
def Is_Channel(self, channelToCheck: str) -> bool:
|
||||
"""Check if the string has the # caractere and return True if this is a channel
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
from re import sub
|
||||
from typing import Union, TYPE_CHECKING
|
||||
from typing import Any, Optional, Union, TYPE_CHECKING
|
||||
from dataclasses import asdict
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -169,7 +169,7 @@ class Client:
|
||||
|
||||
return userObj.nickname
|
||||
|
||||
def get_Client_AsDict(self, uidornickname: str) -> Union[dict[str, any], None]:
|
||||
def get_client_asdict(self, uidornickname: str) -> Optional[dict[str, Any]]:
|
||||
"""Transform User Object to a dictionary
|
||||
|
||||
Args:
|
||||
@@ -178,12 +178,12 @@ class Client:
|
||||
Returns:
|
||||
Union[dict[str, any], None]: User Object as a dictionary or None
|
||||
"""
|
||||
userObj = self.get_Client(uidornickname=uidornickname)
|
||||
client_obj = self.get_Client(uidornickname=uidornickname)
|
||||
|
||||
if userObj is None:
|
||||
if client_obj is None:
|
||||
return None
|
||||
|
||||
return asdict(userObj)
|
||||
return client_obj.to_dict()
|
||||
|
||||
def is_exist(self, uidornikname: str) -> bool:
|
||||
"""Check if the UID or the nickname exist in the USER DB
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
from dataclasses import asdict
|
||||
from core.definition import MClone
|
||||
from typing import Union
|
||||
from typing import Any, Optional
|
||||
from core.base import Base
|
||||
|
||||
class Clone:
|
||||
@@ -74,13 +73,11 @@ class Clone:
|
||||
Returns:
|
||||
bool: True if the nickname exist
|
||||
"""
|
||||
response = False
|
||||
|
||||
for cloneObject in self.UID_CLONE_DB:
|
||||
if cloneObject.nickname == nickname:
|
||||
response = True
|
||||
|
||||
return response
|
||||
clone = self.get_Clone(nickname)
|
||||
if isinstance(clone, MClone):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def uid_exists(self, uid: str) -> bool:
|
||||
"""Check if the nickname exist
|
||||
@@ -91,15 +88,13 @@ class Clone:
|
||||
Returns:
|
||||
bool: True if the nickname exist
|
||||
"""
|
||||
response = False
|
||||
clone = self.get_Clone(uid)
|
||||
if isinstance(clone, MClone):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
for cloneObject in self.UID_CLONE_DB:
|
||||
if cloneObject.uid == uid:
|
||||
response = True
|
||||
|
||||
return response
|
||||
|
||||
def get_Clone(self, uidornickname: str) -> Union[MClone, None]:
|
||||
def get_Clone(self, uidornickname: str) -> Optional[MClone]:
|
||||
"""Get MClone object or None
|
||||
|
||||
Args:
|
||||
@@ -108,17 +103,15 @@ class Clone:
|
||||
Returns:
|
||||
Union[MClone, None]: Return MClone object or None
|
||||
"""
|
||||
cloneObj = None
|
||||
|
||||
for clone in self.UID_CLONE_DB:
|
||||
if clone.uid == uidornickname:
|
||||
cloneObj = clone
|
||||
return clone
|
||||
if clone.nickname == uidornickname:
|
||||
cloneObj = clone
|
||||
return clone
|
||||
|
||||
return cloneObj
|
||||
return None
|
||||
|
||||
def get_uid(self, uidornickname: str) -> Union[str, None]:
|
||||
def get_uid(self, uidornickname: str) -> Optional[str]:
|
||||
"""Get the UID of the clone starting from the UID or the Nickname
|
||||
|
||||
Args:
|
||||
@@ -127,27 +120,22 @@ class Clone:
|
||||
Returns:
|
||||
str|None: Return the UID
|
||||
"""
|
||||
uid = None
|
||||
for record in self.UID_CLONE_DB:
|
||||
if record.uid == uidornickname:
|
||||
uid = record.uid
|
||||
return record.uid
|
||||
if record.nickname == uidornickname:
|
||||
uid = record.uid
|
||||
return record.uid
|
||||
|
||||
# if not uid is None:
|
||||
# self.Logs.debug(f'The UID that you are looking for {uidornickname} has been found {uid}')
|
||||
return None
|
||||
|
||||
return uid
|
||||
def get_Clone_AsDict(self, uidornickname: str) -> Optional[dict[str, Any]]:
|
||||
|
||||
def get_Clone_AsDict(self, uidornickname: str) -> Union[dict[str, any], None]:
|
||||
clone_obj = self.get_Clone(uidornickname=uidornickname)
|
||||
|
||||
cloneObj = self.get_Clone(uidornickname=uidornickname)
|
||||
|
||||
if not cloneObj is None:
|
||||
cloneObj_as_dict = asdict(cloneObj)
|
||||
return cloneObj_as_dict
|
||||
else:
|
||||
if clone_obj is None:
|
||||
return None
|
||||
|
||||
return clone_obj.to_dict()
|
||||
|
||||
def kill(self, nickname:str) -> bool:
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from re import sub
|
||||
from typing import Union, TYPE_CHECKING
|
||||
from typing import Any, Optional, Union, TYPE_CHECKING
|
||||
from dataclasses import asdict
|
||||
from datetime import datetime
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.base import Base
|
||||
@@ -169,7 +170,7 @@ class User:
|
||||
|
||||
return userObj.nickname
|
||||
|
||||
def get_User_AsDict(self, uidornickname: str) -> Union[dict[str, any], None]:
|
||||
def get_user_asdict(self, uidornickname: str) -> Optional[dict[str, Any]]:
|
||||
"""Transform User Object to a dictionary
|
||||
|
||||
Args:
|
||||
@@ -183,7 +184,7 @@ class User:
|
||||
if userObj is None:
|
||||
return None
|
||||
|
||||
return asdict(userObj)
|
||||
return userObj.to_dict()
|
||||
|
||||
def is_exist(self, uidornikname: str) -> bool:
|
||||
"""Check if the UID or the nickname exist in the USER DB
|
||||
@@ -217,4 +218,35 @@ class User:
|
||||
if not parsed_UID:
|
||||
return None
|
||||
|
||||
return parsed_UID
|
||||
return parsed_UID
|
||||
|
||||
def get_user_uptime_in_minutes(self, uidornickname: str) -> float:
|
||||
"""Retourne depuis quand l'utilisateur est connecté (in minutes).
|
||||
|
||||
Args:
|
||||
uid (str): The uid or le nickname
|
||||
|
||||
Returns:
|
||||
int: How long in minutes has the user been connected?
|
||||
"""
|
||||
|
||||
get_user = self.get_User(uidornickname)
|
||||
if get_user is None:
|
||||
return 0
|
||||
|
||||
# Convertir la date enregistrée dans UID_DB en un objet {datetime}
|
||||
connected_time_string = get_user.connexion_datetime
|
||||
|
||||
if isinstance(connected_time_string, datetime):
|
||||
connected_time = connected_time_string
|
||||
else:
|
||||
connected_time = datetime.strptime(connected_time_string, "%Y-%m-%d %H:%M:%S.%f")
|
||||
|
||||
# What time is it ?
|
||||
current_datetime = datetime.now()
|
||||
|
||||
uptime = current_datetime - connected_time
|
||||
convert_to_minutes = uptime.seconds / 60
|
||||
uptime_minutes = round(number=convert_to_minutes, ndigits=2)
|
||||
|
||||
return uptime_minutes
|
||||
|
||||
@@ -616,10 +616,13 @@ class Irc:
|
||||
module_folder = module_name.split('_')[1].lower() # ==> defender
|
||||
class_name = module_name.split('_')[1].capitalize() # ==> Defender
|
||||
|
||||
if 'mods.' + module_name in sys.modules:
|
||||
if f'mods.{module_folder}.{module_name}' in sys.modules:
|
||||
self.Logs.info('Unload the module ...')
|
||||
self.loaded_classes[class_name].unload()
|
||||
self.Logs.info('Module Already Loaded ... reloading the module ...')
|
||||
|
||||
# Load dependencies
|
||||
self.Base.reload_modules_with_dependencies(f'mods.{module_folder}')
|
||||
the_module = sys.modules[f'mods.{module_folder}.{module_name}']
|
||||
importlib.reload(the_module)
|
||||
|
||||
@@ -683,7 +686,7 @@ class Irc:
|
||||
if self.User.get_User(uid) is None:
|
||||
return None
|
||||
|
||||
getUser = self.User.get_User_AsDict(uid)
|
||||
getUser = self.User.get_user_asdict(uid)
|
||||
|
||||
level = int(level)
|
||||
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
import sys
|
||||
import importlib
|
||||
from types import ModuleType
|
||||
from typing import Literal, Union
|
||||
from datetime import datetime
|
||||
from time import time
|
||||
|
||||
Reference in New Issue
Block a user