Files
DEFENDER/core/base.py
2024-10-02 23:38:42 +02:00

779 lines
30 KiB
Python

import os
import re
import json
import time
import random
import socket
import hashlib
import logging
import threading
import ipaddress
import ast
import requests
from dataclasses import fields
from typing import Union, Literal
from base64 import b64decode, b64encode
from datetime import datetime, timedelta, timezone
from sqlalchemy import create_engine, Engine, Connection, CursorResult
from sqlalchemy.sql import text
from core.loadConf import ConfigDataModel
class Base:
def __init__(self, Config: ConfigDataModel) -> None:
self.Config = Config # Assigner l'objet de configuration
self.init_log_system() # Demarrer le systeme de log
self.check_for_new_version(True) # Verifier si une nouvelle version est disponible
self.running_timers:list[threading.Timer] = [] # Liste des timers en cours
self.running_threads:list[threading.Thread] = [] # Liste des threads en cours
self.running_sockets: list[socket.socket] = [] # Les sockets ouvert
self.periodic_func:dict[object] = {} # Liste des fonctions en attentes
self.lock = threading.RLock() # Création du lock
self.install: bool = False # Initialisation de la variable d'installation
self.engine, self.cursor = self.db_init() # Initialisation de la connexion a la base de données
self.__create_db() # Initialisation de la base de données
def __set_current_defender_version(self) -> None:
"""This will put the current version of Defender
located in version.json
"""
version_filename = f'.{os.sep}version.json'
with open(version_filename, 'r') as version_data:
current_version:dict[str, str] = json.load(version_data)
# self.DEFENDER_VERSION = current_version["version"]
self.Config.current_version = current_version['version']
return None
def __get_latest_defender_version(self) -> None:
try:
self.logs.debug(f'Looking for a new version available on Github')
# print(f'===> Looking for a new version available on Github')
token = ''
json_url = f'https://raw.githubusercontent.com/adator85/IRC_DEFENDER_MODULES/main/version.json'
headers = {
'Authorization': f'token {token}',
'Accept': 'application/vnd.github.v3.raw' # Indique à GitHub que nous voulons le contenu brut du fichier
}
if token == '':
response = requests.get(json_url, timeout=self.Config.API_TIMEOUT)
else:
response = requests.get(json_url, headers=headers, timeout=self.Config.API_TIMEOUT)
response.raise_for_status() # Vérifie si la requête a réussi
json_response:dict = response.json()
# self.LATEST_DEFENDER_VERSION = json_response["version"]
self.Config.latest_version = json_response['version']
return None
except requests.HTTPError as err:
self.logs.error(f'Github not available to fetch latest version: {err}')
except:
self.logs.warning(f'Github not available to fetch latest version')
def check_for_new_version(self, online:bool) -> bool:
try:
self.logs.debug(f'Checking for a new service version')
# Assigner la version actuelle de Defender
self.__set_current_defender_version()
# Récuperer la dernier version disponible dans github
if online:
self.logs.debug(f'Retrieve the latest version from Github')
self.__get_latest_defender_version()
isNewVersion = False
latest_version = self.Config.latest_version
current_version = self.Config.current_version
curr_major , curr_minor, curr_patch = current_version.split('.')
last_major, last_minor, last_patch = latest_version.split('.')
if int(last_major) > int(curr_major):
self.logs.info(f'New version available: {current_version} >>> {latest_version}')
isNewVersion = True
elif int(last_major) == int(curr_major) and int(last_minor) > int(curr_minor):
self.logs.info(f'New version available: {current_version} >>> {latest_version}')
isNewVersion = True
elif int(last_major) == int(curr_major) and int(last_minor) == int(curr_minor) and int(last_patch) > int(curr_patch):
self.logs.info(f'New version available: {current_version} >>> {latest_version}')
isNewVersion = True
else:
isNewVersion = False
return isNewVersion
except ValueError as ve:
self.logs.error(f'Impossible to convert in version number : {ve}')
def get_unixtime(self) -> int:
"""
Cette fonction retourne un UNIXTIME de type 12365456
Return: Current time in seconds since the Epoch (int)
"""
cet_offset = timezone(timedelta(hours=2))
now_cet = datetime.now(cet_offset)
unixtime_cet = int(now_cet.timestamp())
unixtime = int( time.time() )
return unixtime
def get_datetime(self) -> str:
"""
Retourne une date au format string (24-12-2023 20:50:59)
"""
currentdate = datetime.now().strftime('%d-%m-%Y %H:%M:%S')
return currentdate
def get_all_modules(self) -> list:
all_files = os.listdir('mods/')
all_modules: list = []
for module in all_files:
if module.endswith('.py') and not module == '__init__.py':
all_modules.append(module.replace('.py', '').lower())
return all_modules
def create_log(self, log_message: str) -> None:
"""Enregiste les logs
Args:
string (str): Le message a enregistrer
Returns:
None: Aucun retour
"""
sql_insert = f"INSERT INTO {self.Config.table_log} (datetime, server_msg) VALUES (:datetime, :server_msg)"
mes_donnees = {'datetime': str(self.get_datetime()),'server_msg': f'{log_message}'}
self.db_execute_query(sql_insert, mes_donnees)
return None
def init_log_system(self) -> None:
# Create folder if not available
logs_directory = f'logs{os.sep}'
if not os.path.exists(f'{logs_directory}'):
os.makedirs(logs_directory)
# Init logs object
self.logs = logging
self.logs.basicConfig(level=self.Config.DEBUG_LEVEL,
filename='logs/defender.log',
encoding='UTF-8',
format='%(asctime)s - %(levelname)s - %(filename)s - %(lineno)d - %(funcName)s - %(message)s')
self.logs.info('#################### STARTING DEFENDER ####################')
return None
def log_cmd(self, user_cmd:str, cmd:str) -> None:
"""Enregistre les commandes envoyées par les utilisateurs
Args:
cmd (str): la commande a enregistrer
"""
cmd_list = cmd.split()
if len(cmd_list) == 3:
if cmd_list[0].replace('.', '') == 'auth':
cmd_list[1] = '*******'
cmd_list[2] = '*******'
cmd = ' '.join(cmd_list)
insert_cmd_query = f"INSERT INTO {self.Config.table_commande} (datetime, user, commande) VALUES (:datetime, :user, :commande)"
mes_donnees = {'datetime': self.get_datetime(), 'user': user_cmd, 'commande': cmd}
self.db_execute_query(insert_cmd_query, mes_donnees)
return False
def db_isModuleExist(self, module_name:str) -> bool:
"""Teste si un module existe déja dans la base de données
Args:
module_name (str): le non du module a chercher dans la base de données
Returns:
bool: True si le module existe déja dans la base de données sinon False
"""
query = f"SELECT id FROM {self.Config.table_module} WHERE module_name = :module_name"
mes_donnes = {'module_name': module_name}
results = self.db_execute_query(query, mes_donnes)
if results.fetchall():
return True
else:
return False
def db_record_module(self, user_cmd:str, module_name:str, isdefault:int = 0) -> None:
"""Enregistre les modules dans la base de données
Args:
cmd (str): le module a enregistrer
"""
if not self.db_isModuleExist(module_name):
self.logs.debug(f"Le module {module_name} n'existe pas alors ont le créer")
insert_cmd_query = f"INSERT INTO {self.Config.table_module} (datetime, user, module_name, isdefault) VALUES (:datetime, :user, :module_name, :isdefault)"
mes_donnees = {'datetime': self.get_datetime(), 'user': user_cmd, 'module_name': module_name, 'isdefault': isdefault}
self.db_execute_query(insert_cmd_query, mes_donnees)
else:
self.logs.debug(f"Le module {module_name} existe déja dans la base de données")
return False
def db_delete_module(self, module_name:str) -> None:
"""Supprime les modules de la base de données
Args:
cmd (str): le module a supprimer
"""
insert_cmd_query = f"DELETE FROM {self.Config.table_module} WHERE module_name = :module_name"
mes_donnees = {'module_name': module_name}
self.db_execute_query(insert_cmd_query, mes_donnees)
return False
def db_sync_core_config(self, module_name: str, dataclassObj: object) -> bool:
"""Sync module local parameters with the database
if new module then local param will be stored in the database
if old module then db param will be moved to the local dataclassObj
if new local param it will be stored in the database
if local param was removed then it will also be removed from the database
Args:
module_name (str): The module name ex. mod_defender
dataclassObj (object): The Dataclass object
Returns:
bool: _description_
"""
try:
response = True
current_date = self.get_datetime()
core_table = self.Config.table_config
# Add local parameters to DB
for field in fields(dataclassObj):
param_key = field.name
param_value = str(getattr(dataclassObj, field.name))
param_to_search = {'module_name': module_name, 'param_key': param_key}
search_query = f'''SELECT id FROM {core_table} WHERE module_name = :module_name AND param_key = :param_key'''
excecute_search_query = self.db_execute_query(search_query, param_to_search)
result_search_query = excecute_search_query.fetchone()
if result_search_query is None:
# If param and module_name doesn't exist create the record
param_to_insert = {'datetime': current_date,'module_name': module_name,
'param_key': param_key,'param_value': param_value
}
insert_query = f'''INSERT INTO {core_table} (datetime, module_name, param_key, param_value)
VALUES (:datetime, :module_name, :param_key, :param_value)
'''
execution = self.db_execute_query(insert_query, param_to_insert)
if execution.rowcount > 0:
self.logs.debug(f'New parameter added to the database: {param_key} --> {param_value}')
# Delete from DB unused parameter
query_select = f"SELECT module_name, param_key, param_value FROM {core_table} WHERE module_name = :module_name"
parameter = {'module_name': module_name}
execute_query_select = self.db_execute_query(query_select, parameter)
result_query_select = execute_query_select.fetchall()
for result in result_query_select:
db_mod_name, db_param_key, db_param_value = result
if not hasattr(dataclassObj, db_param_key):
mes_donnees = {'param_key': db_param_key, 'module_name': db_mod_name}
execute_delete = self.db_execute_query(f'DELETE FROM {core_table} WHERE module_name = :module_name and param_key = :param_key', mes_donnees)
row_affected = execute_delete.rowcount
if row_affected > 0:
self.logs.debug(f'A parameter has been deleted from the database: {db_param_key} --> {db_param_value} | Mod: {db_mod_name}')
# Sync local variable with Database
query = f"SELECT param_key, param_value FROM {core_table} WHERE module_name = :module_name"
parameter = {'module_name': module_name}
response = self.db_execute_query(query, parameter)
result = response.fetchall()
for param, value in result:
if type(getattr(dataclassObj, param)) == list:
value = ast.literal_eval(value)
setattr(dataclassObj, param, self.int_if_possible(value))
return response
except AttributeError as attrerr:
self.logs.error(f'Attribute Error: {attrerr}')
except Exception as err:
self.logs.error(err)
return False
def db_update_core_config(self, module_name:str, dataclassObj: object, param_key:str, param_value: str) -> bool:
core_table = self.Config.table_config
# Check if the param exist
if not hasattr(dataclassObj, param_key):
self.logs.error(f"Le parametre {param_key} n'existe pas dans la variable global")
return False
mes_donnees = {'module_name': module_name, 'param_key': param_key, 'param_value': param_value}
search_param_query = f"SELECT id FROM {core_table} WHERE module_name = :module_name AND param_key = :param_key"
result = self.db_execute_query(search_param_query, mes_donnees)
isParamExist = result.fetchone()
if not isParamExist is None:
mes_donnees = {'datetime': self.get_datetime(),
'module_name': module_name,
'param_key': param_key,
'param_value': param_value
}
query = f'''UPDATE {core_table} SET datetime = :datetime, param_value = :param_value WHERE module_name = :module_name AND param_key = :param_key'''
update = self.db_execute_query(query, mes_donnees)
updated_rows = update.rowcount
if updated_rows > 0:
setattr(dataclassObj, param_key, self.int_if_possible(param_value))
self.logs.debug(f'Parameter updated : {param_key} - {param_value} | Module: {module_name}')
else:
self.logs.error(f'Parameter NOT updated : {param_key} - {param_value} | Module: {module_name}')
else:
self.logs.error(f'Parameter and Module do not exist: Param ({param_key}) - Value ({param_value}) | Module ({module_name})')
self.logs.debug(dataclassObj)
return True
def db_query_channel(self, action: Literal['add','del'], module_name: str, channel_name: str) -> bool:
"""You can add a channel or delete a channel.
Args:
action (Literal['add','del']): Action on the database
module_name (str): The module name (mod_test)
channel_name (str): The channel name (With #)
Returns:
bool: True if action done
"""
try:
channel_name = channel_name.lower() if self.Is_Channel(channel_name) else None
core_table = 'core_channel'
if not channel_name:
self.logs.warn(f'The channel [{channel_name}] is not correct')
return False
match action:
case 'add':
mes_donnees = {'module_name': module_name, 'channel_name': channel_name}
response = self.db_execute_query(f"SELECT id FROM {core_table} WHERE module_name = :module_name AND channel_name = :channel_name", mes_donnees)
isChannelExist = response.fetchone()
if isChannelExist is None:
mes_donnees = {'datetime': self.get_datetime(), 'channel_name': channel_name, 'module_name': module_name}
insert = self.db_execute_query(f"INSERT INTO {core_table} (datetime, channel_name, module_name) VALUES (:datetime, :channel_name, :module_name)", mes_donnees)
if insert.rowcount:
self.logs.debug(f'New channel added: channel={channel_name} / module_name={module_name}')
return True
else:
return False
pass
case 'del':
mes_donnes = {'channel_name': channel_name, 'module_name': module_name}
response = self.db_execute_query(f"DELETE FROM {core_table} WHERE channel_name = :channel_name AND module_name = :module_name", mes_donnes)
if response.rowcount > 0:
self.logs.debug(f'Channel deleted: channel={channel_name} / module: {module_name}')
return True
else:
return False
case _:
return False
except Exception as err:
self.logs.error(err)
def db_create_first_admin(self) -> None:
user = self.db_execute_query(f"SELECT id FROM {self.Config.table_admin}")
if not user.fetchall():
admin = self.Config.OWNER
password = self.crypt_password(self.Config.PASSWORD)
mes_donnees = {'createdOn': self.get_datetime(),
'user': admin,
'password': password,
'hostname': '*',
'vhost': '*',
'level': 5
}
self.db_execute_query(f"""
INSERT INTO {self.Config.table_admin}
(createdOn, user, password, hostname, vhost, level)
VALUES
(:createdOn, :user, :password, :hostname, :vhost, :level)"""
, mes_donnees)
return None
def create_timer(self, time_to_wait: float, func: object, func_args: tuple = ()) -> None:
try:
t = threading.Timer(interval=time_to_wait, function=func, args=func_args)
t.setName(func.__name__)
t.start()
self.running_timers.append(t)
self.logs.debug(f"Timer ID : {str(t.ident)} | Running Threads : {len(threading.enumerate())}")
except AssertionError as ae:
self.logs.error(f'Assertion Error -> {ae}')
def create_thread(self, func:object, func_args: tuple = (), run_once:bool = False, daemon: bool = True) -> None:
"""Create a new thread and store it into running_threads variable
Args:
func (object): The method/function you want to execute via this thread
func_args (tuple, optional): Arguments of the function/method. Defaults to ().
run_once (bool, optional): If you want to ensure that this method/function run once. Defaults to False.
"""
try:
func_name = func.__name__
if run_once:
for thread in self.running_threads:
if thread.getName() == func_name:
return None
th = threading.Thread(target=func, args=func_args, name=str(func_name), daemon=daemon)
th.start()
self.running_threads.append(th)
self.logs.debug(f"Thread ID : {str(th.ident)} | Thread name : {th.getName()} | Running Threads : {len(threading.enumerate())}")
except AssertionError as ae:
self.logs.error(f'{ae}')
def garbage_collector_timer(self) -> None:
"""Methode qui supprime les timers qui ont finis leurs job
"""
try:
for timer in self.running_timers:
if not timer.is_alive():
timer.cancel()
self.running_timers.remove(timer)
self.logs.info(f"Timer {str(timer)} removed")
else:
self.logs.debug(f"===> Timer {str(timer)} Still running ...")
except AssertionError as ae:
self.logs.error(f'Assertion Error -> {ae}')
def garbage_collector_thread(self) -> None:
"""Methode qui supprime les threads qui ont finis leurs job
"""
try:
for thread in self.running_threads:
if thread.getName() != 'heartbeat':
if not thread.is_alive():
self.running_threads.remove(thread)
self.logs.info(f"Thread {str(thread.getName())} {str(thread.native_id)} removed")
# print(threading.enumerate())
except AssertionError as ae:
self.logs.error(f'Assertion Error -> {ae}')
def garbage_collector_sockets(self) -> None:
for soc in self.running_sockets:
while soc.fileno() != -1:
self.logs.debug(soc.fileno())
soc.close()
soc.close()
self.running_sockets.remove(soc)
self.logs.debug(f"Socket ==> closed {str(soc.fileno())}")
def shutdown(self) -> None:
"""Methode qui va préparer l'arrêt complêt du service
"""
# Nettoyage des timers
self.logs.debug(f"=======> Checking for Timers to stop")
for timer in self.running_timers:
while timer.is_alive():
self.logs.debug(f"> waiting for {timer.getName()} to close")
timer.cancel()
time.sleep(0.2)
self.running_timers.remove(timer)
self.logs.debug(f"> Cancelling {timer.getName()} {timer.native_id}")
self.logs.debug(f"=======> Checking for Threads to stop")
for thread in self.running_threads:
if thread.getName() == 'heartbeat' and thread.is_alive():
self.execute_periodic_action()
self.logs.debug(f"> Running the last periodic action")
self.running_threads.remove(thread)
self.logs.debug(f"> Cancelling {thread.getName()} {thread.native_id}")
self.logs.debug(f"=======> Checking for Sockets to stop")
for soc in self.running_sockets:
soc.close()
while soc.fileno() != -1:
soc.close()
self.running_sockets.remove(soc)
self.logs.debug(f"> Socket ==> closed {str(soc.fileno())}")
return None
def db_init(self) -> tuple[Engine, Connection]:
db_directory = self.Config.db_path
full_path_db = self.Config.db_path + self.Config.db_name
if not os.path.exists(db_directory):
self.install = True
os.makedirs(db_directory)
engine = create_engine(f'sqlite:///{full_path_db}.db', echo=False)
cursor = engine.connect()
self.logs.info("database connexion has been initiated")
return engine, cursor
def __create_db(self) -> None:
table_core_log = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_log} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
server_msg TEXT
)
'''
table_core_config = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_config} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
module_name TEXT,
param_key TEXT,
param_value TEXT
)
'''
table_core_log_command = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_commande} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
user TEXT,
commande TEXT
)
'''
table_core_module = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_module} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
user TEXT,
module_name TEXT,
isdefault INTEGER
)
'''
table_core_channel = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_channel} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
module_name TEXT,
channel_name TEXT
)
'''
table_core_admin = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_admin} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
createdOn TEXT,
user TEXT,
hostname TEXT,
vhost TEXT,
password TEXT,
level INTEGER
)
'''
self.db_execute_query(table_core_log)
self.db_execute_query(table_core_log_command)
self.db_execute_query(table_core_module)
self.db_execute_query(table_core_admin)
self.db_execute_query(table_core_channel)
self.db_execute_query(table_core_config)
if self.install:
self.db_record_module('sys', 'mod_command', 1)
self.db_record_module('sys', 'mod_defender', 1)
self.install = False
return None
def db_execute_query(self, query:str, params:dict = {}) -> CursorResult:
with self.lock:
insert_query = text(query)
if not params:
response = self.cursor.execute(insert_query)
else:
response = self.cursor.execute(insert_query, params)
self.cursor.commit()
return response
def db_close(self) -> None:
try:
self.cursor.close()
except AttributeError as ae:
self.logs.error(f"Attribute Error : {ae}")
def crypt_password(self, password:str) -> str:
"""Retourne un mot de passe chiffré en MD5
Args:
password (str): Le password en clair
Returns:
str: Le password en MD5
"""
md5_password = hashlib.md5(password.encode()).hexdigest()
return md5_password
def int_if_possible(self, value):
"""Convertit la valeur reçue en entier, si possible.
Sinon elle retourne la valeur initiale.
Args:
value (any): la valeur à convertir
Returns:
any: Retour un entier, si possible. Sinon la valeur initiale.
"""
try:
response = int(value)
return response
except ValueError:
return value
except TypeError:
return value
def is_valid_ip(self, ip_to_control:str) -> bool:
try:
if ip_to_control in self.Config.WHITELISTED_IP:
return False
ipaddress.ip_address(ip_to_control)
return True
except ValueError:
return False
def decode_ip(self, ip_b64encoded: str) -> Union[str, None]:
binary_ip = b64decode(ip_b64encoded)
try:
decoded_ip = ipaddress.ip_address(binary_ip)
return decoded_ip.exploded
except ValueError as ve:
self.logs.critical(f'This remote ip is not valid : {ve}')
return None
# def encode_ip(self, remote_ip_address: str) -> Union[str, None]:
# binary_ip = b64encode()
# try:
# decoded_ip = ipaddress.ip_address(binary_ip)
# return decoded_ip.exploded
# except ValueError as ve:
# self.logs.critical(f'This remote ip is not valid : {ve}')
# return None
def get_random(self, lenght:int) -> str:
"""
Retourn une chaîne aléatoire en fonction de la longueur spécifiée.
"""
caracteres = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
randomize = ''.join(random.choice(caracteres) for _ in range(lenght))
return randomize
def execute_periodic_action(self) -> None:
if not self.periodic_func:
# Run Garbage Collector Timer
self.garbage_collector_timer()
self.garbage_collector_thread()
# self.garbage_collector_sockets()
return None
for key, value in self.periodic_func.items():
obj = value['object']
method_name = value['method_name']
param = value['param']
f = getattr(obj, method_name, None)
f(*param)
# Vider le dictionnaire de fonction
self.periodic_func.clear()
def clean_uid(self, uid:str) -> str:
"""Clean UID by removing @ / % / + / ~ / * / :
Args:
uid (str): The UID to clean
Returns:
str: Clean UID without any sign
"""
pattern = fr'[:|@|%|\+|~|\*]*'
parsed_UID = re.sub(pattern, '', uid)
return parsed_UID
def Is_Channel(self, channelToCheck: str) -> bool:
"""Check if the string has the # caractere and return True if this is a channel
Args:
channelToCheck (str): The string to test if it is a channel or not
Returns:
bool: True if the string is a channel / False if this is not a channel
"""
try:
if channelToCheck is None:
return False
pattern = fr'^#'
isChannel = re.findall(pattern, channelToCheck)
if not isChannel:
return False
else:
return True
except TypeError as te:
self.logs.error(f'TypeError: [{channelToCheck}] - {te}')
except Exception as err:
self.logs.error(f'Error Not defined: {err}')