mirror of
https://github.com/iio612/DEFENDER.git
synced 2026-02-13 19:24:23 +00:00
250 lines
7.2 KiB
Python
250 lines
7.2 KiB
Python
"""
|
|
Main utils library.
|
|
"""
|
|
import gc
|
|
import ssl
|
|
from pathlib import Path
|
|
from re import match, sub
|
|
from typing import Literal, Optional, Any, TYPE_CHECKING
|
|
from datetime import datetime, timedelta
|
|
from time import time, sleep
|
|
from random import choice
|
|
from hashlib import md5, sha3_512
|
|
from core.classes.modules.settings import global_settings
|
|
from asyncio import iscoroutinefunction
|
|
|
|
if TYPE_CHECKING:
|
|
from threading import Event
|
|
from core.loader import Loader
|
|
|
|
def tr(message: str, *args) -> str:
|
|
"""Translation Engine system
|
|
```python
|
|
example:
|
|
_('Hello my firstname is %s and my lastname is %s', firstname, lastname)
|
|
```
|
|
Args:
|
|
message (str): The message to translate
|
|
*args (any) : Whatever the variable you want to pass
|
|
|
|
Returns:
|
|
str: The translated message
|
|
"""
|
|
count_args = len(args) # Count number of args sent
|
|
count_placeholder = message.count('%s') # Count number of placeholder in the message
|
|
is_args_available = True if args else False
|
|
g = global_settings
|
|
try:
|
|
# Access to admin object
|
|
client_language = g.current_admin.language if g.current_admin else g.global_lang
|
|
|
|
if count_args != count_placeholder:
|
|
g.global_logger.error(f"Translation: Original message: {message} | Args: {count_args} - Placeholder: {count_placeholder}")
|
|
return message
|
|
|
|
if g.global_lang is None:
|
|
return message % args if is_args_available else message
|
|
|
|
if client_language.lower() == 'en':
|
|
return message % args if is_args_available else message
|
|
|
|
for trads in g.global_translation[client_language.lower()]:
|
|
if sub(r"\s+", "", message) == sub(r"\s+", "", trads[0]):
|
|
return trads[1] % args if is_args_available else trads[1]
|
|
|
|
return message % args if is_args_available else message
|
|
|
|
except KeyError as ke:
|
|
g.global_logger.error(f"KeyError: {ke}")
|
|
return message % args if is_args_available else message
|
|
|
|
except Exception as err:
|
|
global_settings.global_logger.error(f"General Error: {err} / {message}")
|
|
return message
|
|
|
|
def convert_to_int(value: Any) -> Optional[int]:
|
|
"""Convert a value to int
|
|
|
|
Args:
|
|
value (Any): Value to convert to int if possible
|
|
|
|
Returns:
|
|
int: Return the int value or None if not possible
|
|
"""
|
|
try:
|
|
value_to_int = int(value)
|
|
return value_to_int
|
|
except ValueError:
|
|
return None
|
|
|
|
def get_unixtime() -> int:
|
|
"""Cette fonction retourne un UNIXTIME de type 12365456
|
|
|
|
Returns:
|
|
int: Current time in seconds since the Epoch (int)
|
|
"""
|
|
# cet_offset = timezone(timedelta(hours=2))
|
|
# now_cet = datetime.now(cet_offset)
|
|
# unixtime_cet = int(now_cet.timestamp())
|
|
return int(time())
|
|
|
|
def get_sdatetime() -> str:
|
|
"""Retourne une date au format string (24-12-2023 20:50:59)
|
|
|
|
Returns:
|
|
str: Current datetime in this format %d-%m-%Y %H:%M:%S
|
|
"""
|
|
currentdate = datetime.now().strftime('%d-%m-%Y %H:%M:%S')
|
|
return currentdate
|
|
|
|
def get_datetime() -> datetime:
|
|
"""
|
|
Return the current datetime in a datetime object
|
|
"""
|
|
return datetime.now()
|
|
|
|
def get_ssl_context() -> ssl.SSLContext:
|
|
"""Generate the ssl context
|
|
|
|
Returns:
|
|
SSLContext: The SSL Context
|
|
"""
|
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
return ctx
|
|
|
|
def get_defender_uptime(loader: 'Loader') -> str:
|
|
"""Savoir depuis quand Defender est connecté
|
|
|
|
Returns:
|
|
str: L'écart entre la date du jour et celle de la connexion de Defender
|
|
"""
|
|
current_datetime = datetime.now()
|
|
diff_date = current_datetime - loader.Irc.defender_connexion_datetime
|
|
uptime = timedelta(days=diff_date.days, seconds=diff_date.seconds)
|
|
|
|
return uptime
|
|
|
|
def run_python_garbage_collector() -> int:
|
|
"""Run Python garbage collector
|
|
|
|
Returns:
|
|
int: The number of unreachable objects is returned.
|
|
"""
|
|
return gc.collect()
|
|
|
|
def get_number_gc_objects(your_object_to_count: Optional[Any] = None) -> int:
|
|
"""Get The number of objects tracked by the collector (excluding the list returned).
|
|
|
|
Returns:
|
|
int: Number of tracked objects by the collector
|
|
"""
|
|
if your_object_to_count is None:
|
|
return len(gc.get_objects())
|
|
|
|
return sum(1 for obj in gc.get_objects() if isinstance(obj, your_object_to_count))
|
|
|
|
def heartbeat(event: 'Event', loader: 'Loader', beat: float) -> None:
|
|
"""Execute certaines commandes de nettoyage toutes les x secondes
|
|
x étant définit a l'initialisation de cette class (self.beat)
|
|
|
|
Args:
|
|
beat (float): Nombre de secondes entre chaque exécution
|
|
"""
|
|
|
|
while event.is_set():
|
|
loader.Base.execute_periodic_action()
|
|
sleep(beat)
|
|
|
|
loader.Logs.debug("Heartbeat is off!")
|
|
return None
|
|
|
|
def generate_random_string(lenght: int) -> str:
|
|
"""Retourn une chaîne aléatoire en fonction de la longueur spécifiée.
|
|
|
|
Returns:
|
|
str: The random string
|
|
"""
|
|
caracteres = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
|
|
randomize = ''.join(choice(caracteres) for _ in range(lenght))
|
|
|
|
return randomize
|
|
|
|
def hash_password(password: str, algorithm: Literal["md5", "sha3_512"] = 'md5') -> str:
|
|
"""Return the crypted password following the selected algorithm
|
|
|
|
Args:
|
|
password (str): The plain text password
|
|
algorithm (str): The algorithm to use
|
|
|
|
Returns:
|
|
str: The crypted password, default md5
|
|
"""
|
|
|
|
match algorithm:
|
|
case 'md5':
|
|
hashed_password = md5(password.encode()).hexdigest()
|
|
return hashed_password
|
|
|
|
case 'sha3_512':
|
|
hashed_password = sha3_512(password.encode()).hexdigest()
|
|
return hashed_password
|
|
|
|
case _:
|
|
hashed_password = md5(password.encode()).hexdigest()
|
|
return hashed_password
|
|
|
|
def get_all_modules() -> list[str]:
|
|
"""Get list of all main modules
|
|
using this pattern mod_*.py
|
|
|
|
Returns:
|
|
list[str]: List of module names.
|
|
"""
|
|
base_path = Path('mods')
|
|
return [file.name.replace('.py', '') for file in base_path.rglob('mod_*.py')]
|
|
|
|
def clean_uid(uid: str) -> Optional[str]:
|
|
"""Clean UID by removing @ / % / + / ~ / * / :
|
|
|
|
Args:
|
|
uid (str): The UID to clean
|
|
|
|
Returns:
|
|
str: Clean UID without any sign
|
|
"""
|
|
if uid is None:
|
|
return None
|
|
|
|
pattern = fr'[:|@|%|\+|~|\*]*'
|
|
parsed_uid = sub(pattern, '', uid)
|
|
|
|
return parsed_uid
|
|
|
|
def hide_sensitive_data(srvmsg: list[str]) -> list[str]:
|
|
try:
|
|
srv_msg = srvmsg.copy()
|
|
privmsg_index = srv_msg.index('PRIVMSG')
|
|
auth_index = privmsg_index + 2
|
|
if match(r'^:{1}\W?(auth)$', srv_msg[auth_index]) is None:
|
|
return srv_msg
|
|
|
|
for l in range(auth_index + 1, len(srv_msg)):
|
|
srv_msg[l] = '*' * len(srv_msg[l])
|
|
|
|
return srv_msg
|
|
|
|
except ValueError:
|
|
return srvmsg
|
|
|
|
def is_coroutinefunction(func: Any) -> bool:
|
|
"""Check if the function is a coroutine or not
|
|
|
|
Args:
|
|
func (Any): an callable object
|
|
|
|
Returns:
|
|
bool: True if the function is a coroutine
|
|
"""
|
|
return iscoroutinefunction(func) |