Create DataClasses

This commit is contained in:
adator85
2024-08-09 02:08:33 +02:00
parent b0325d4db5
commit e4a0c530a3
6 changed files with 483 additions and 283 deletions

View File

@@ -1,31 +1,22 @@
import time, threading, os, random, socket, hashlib, ipaddress, logging, requests, json, sys
from typing import Union
from base64 import b64decode
from datetime import datetime
from sqlalchemy import create_engine, Engine, Connection, CursorResult
from sqlalchemy.sql import text
from core.loadConf import Config
from core.loadConf import ConfigDataModel
class Base:
CORE_DB_PATH = 'core' + os.sep + 'db' + os.sep # Le dossier bases de données core
MODS_DB_PATH = 'mods' + os.sep + 'db' + os.sep # Le dossier bases de données des modules
PYTHON_MIN_VERSION = '3.10' # Version min de python
DB_SCHEMA:list[str] = {
'admins': 'sys_admins',
'commandes': 'sys_commandes',
'logs': 'sys_logs',
'modules': 'sys_modules'
}
DEFENDER_VERSION = '' # MAJOR.MINOR.BATCH
LATEST_DEFENDER_VERSION = '' # Latest Version of Defender in git
DEFENDER_DB_PATH = 'db' + os.sep # Séparateur en fonction de l'OS
DEFENDER_DB_NAME = 'defender' # Le nom de la base de données principale
def __init__(self, Config: Config) -> None:
def __init__(self, Config: ConfigDataModel) -> None:
self.Config = Config # Assigner l'objet de configuration
self.init_log_system() # Demarrer le systeme de log
self.check_for_new_version() # Verifier si une nouvelle version est disponible
self.check_for_new_version(True) # Verifier si une nouvelle version est disponible
self.running_timers:list[threading.Timer] = [] # Liste des timers en cours
self.running_threads:list[threading.Thread] = [] # Liste des threads en cours
@@ -48,12 +39,15 @@ class Base:
with open(version_filename, 'r') as version_data:
current_version:dict[str, str] = json.load(version_data)
self.DEFENDER_VERSION = current_version["version"]
# self.DEFENDER_VERSION = current_version["version"]
self.Config.current_version = current_version['version']
return None
def __get_latest_defender_version(self) -> None:
try:
self.logs.debug(f'Looking for a new version available on Github')
print(f'===> Looking for a new version available on Github')
token = ''
json_url = f'https://raw.githubusercontent.com/adator85/IRC_DEFENDER_MODULES/main/version.json'
headers = {
@@ -68,7 +62,8 @@ class Base:
response.raise_for_status() # Vérifie si la requête a réussi
json_response:dict = response.json()
self.LATEST_DEFENDER_VERSION = json_response["version"]
# self.LATEST_DEFENDER_VERSION = json_response["version"]
self.Config.latest_version = json_response['version']
return None
except requests.HTTPError as err:
@@ -76,16 +71,20 @@ class Base:
except:
self.logs.warning(f'Github not available to fetch latest version')
def check_for_new_version(self) -> bool:
def check_for_new_version(self, online:bool) -> bool:
try:
self.logs.debug(f'Checking for a new service version')
# Assigner la version actuelle de Defender
self.__set_current_defender_version()
self.__set_current_defender_version()
# Récuperer la dernier version disponible dans github
self.__get_latest_defender_version()
if online:
self.logs.debug(f'Retrieve the latest version from Github')
self.__get_latest_defender_version()
isNewVersion = False
latest_version = self.LATEST_DEFENDER_VERSION
current_version = self.DEFENDER_VERSION
latest_version = self.Config.latest_version
current_version = self.Config.current_version
curr_major , curr_minor, curr_patch = current_version.split('.')
last_major, last_minor, last_patch = latest_version.split('.')
@@ -130,7 +129,7 @@ class Base:
Returns:
None: Aucun retour
"""
sql_insert = f"INSERT INTO {self.DB_SCHEMA['logs']} (datetime, server_msg) VALUES (:datetime, :server_msg)"
sql_insert = f"INSERT INTO {self.Config.table_log} (datetime, server_msg) VALUES (:datetime, :server_msg)"
mes_donnees = {'datetime': str(self.get_datetime()),'server_msg': f'{log_message}'}
self.db_execute_query(sql_insert, mes_donnees)
@@ -166,7 +165,7 @@ class Base:
cmd_list[2] = '*******'
cmd = ' '.join(cmd_list)
insert_cmd_query = f"INSERT INTO {self.DB_SCHEMA['commandes']} (datetime, user, commande) VALUES (:datetime, :user, :commande)"
insert_cmd_query = f"INSERT INTO {self.Config.table_commande} (datetime, user, commande) VALUES (:datetime, :user, :commande)"
mes_donnees = {'datetime': self.get_datetime(), 'user': user_cmd, 'commande': cmd}
self.db_execute_query(insert_cmd_query, mes_donnees)
@@ -181,7 +180,7 @@ class Base:
Returns:
bool: True si le module existe déja dans la base de données sinon False
"""
query = f"SELECT id FROM {self.DB_SCHEMA['modules']} WHERE module = :module"
query = f"SELECT id FROM {self.Config.table_module} WHERE module = :module"
mes_donnes = {'module': module_name}
results = self.db_execute_query(query, mes_donnes)
@@ -199,7 +198,7 @@ class Base:
if not self.db_isModuleExist(module_name):
self.logs.debug(f"Le module {module_name} n'existe pas alors ont le créer")
insert_cmd_query = f"INSERT INTO {self.DB_SCHEMA['modules']} (datetime, user, module) VALUES (:datetime, :user, :module)"
insert_cmd_query = f"INSERT INTO {self.Config.table_module} (datetime, user, module) VALUES (:datetime, :user, :module)"
mes_donnees = {'datetime': self.get_datetime(), 'user': user_cmd, 'module': module_name}
self.db_execute_query(insert_cmd_query, mes_donnees)
# self.db_close_session(self.session)
@@ -214,7 +213,7 @@ class Base:
Args:
cmd (str): le module a enregistrer
"""
insert_cmd_query = f"DELETE FROM {self.DB_SCHEMA['modules']} WHERE module = :module"
insert_cmd_query = f"DELETE FROM {self.Config.table_module} WHERE module = :module"
mes_donnees = {'module': module_name}
self.db_execute_query(insert_cmd_query, mes_donnees)
@@ -222,14 +221,20 @@ class Base:
def db_create_first_admin(self) -> None:
user = self.db_execute_query(f"SELECT id FROM {self.DB_SCHEMA['admins']}")
user = self.db_execute_query(f"SELECT id FROM {self.Config.table_admin}")
if not user.fetchall():
admin = self.Config.OWNER
password = self.crypt_password(self.Config.PASSWORD)
mes_donnees = {'createdOn': self.get_datetime(), 'user': admin, 'password': password, 'hostname': '*', 'vhost': '*', 'level': 5}
mes_donnees = {'createdOn': self.get_datetime(),
'user': admin,
'password': password,
'hostname': '*',
'vhost': '*',
'level': 5
}
self.db_execute_query(f"""
INSERT INTO {self.DB_SCHEMA['admins']}
INSERT INTO {self.Config.table_admin}
(createdOn, user, password, hostname, vhost, level)
VALUES
(:createdOn, :user, :password, :hostname, :vhost, :level)"""
@@ -348,8 +353,8 @@ class Base:
def db_init(self) -> tuple[Engine, Connection]:
db_directory = self.DEFENDER_DB_PATH
full_path_db = self.DEFENDER_DB_PATH + self.DEFENDER_DB_NAME
db_directory = self.Config.db_path
full_path_db = self.Config.db_path + self.Config.db_name
if not os.path.exists(db_directory):
os.makedirs(db_directory)
@@ -361,14 +366,14 @@ class Base:
def __create_db(self) -> None:
table_logs = f'''CREATE TABLE IF NOT EXISTS {self.DB_SCHEMA['logs']} (
table_logs = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_log} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
server_msg TEXT
)
'''
table_cmds = f'''CREATE TABLE IF NOT EXISTS {self.DB_SCHEMA['commandes']} (
table_cmds = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_commande} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
user TEXT,
@@ -376,7 +381,7 @@ class Base:
)
'''
table_modules = f'''CREATE TABLE IF NOT EXISTS {self.DB_SCHEMA['modules']} (
table_modules = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_module} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
user TEXT,
@@ -384,7 +389,7 @@ class Base:
)
'''
table_admins = f'''CREATE TABLE IF NOT EXISTS {self.DB_SCHEMA['admins']} (
table_admins = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_admin} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
createdOn TEXT,
user TEXT,
@@ -464,6 +469,17 @@ class Base:
except ValueError:
return False
def decode_ip(self, ip_b64encoded: str) -> Union[str, None]:
binary_ip = b64decode(ip_b64encoded)
try:
decoded_ip = ipaddress.ip_address(binary_ip)
return decoded_ip.exploded
except ValueError as ve:
self.logs.critical(f'This remote ip is not valid : {ve}')
return None
def get_random(self, lenght:int) -> str:
"""
Retourn une chaîne aléatoire en fonction de la longueur spécifiée.