diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c3c8df7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +.pyenv/ +db/ +logs/ +__pycache__/ +configuration.json +test.py \ No newline at end of file diff --git a/core/Model.py b/core/Model.py new file mode 100644 index 0000000..277d7f7 --- /dev/null +++ b/core/Model.py @@ -0,0 +1,375 @@ +from dataclasses import dataclass, field +from datetime import datetime +from typing import Union +from core.base import Base + +class User: + + @dataclass + class UserModel: + uid: str + nickname: str + username: str + hostname: str + umodes: str + vhost: str + isWebirc: bool + remote_ip: str + score_connexion: int + connexion_datetime: datetime = field(default=datetime.now()) + + UID_DB: list[UserModel] = [] + + def __init__(self, Base: Base) -> None: + self.log = Base.logs + pass + + def insert(self, newUser: UserModel) -> bool: + """Insert a new User object + + Args: + newUser (UserModel): New userModel object + + Returns: + bool: True if inserted + """ + result = False + exist = False + + for record in self.UID_DB: + if record.uid == newUser.uid: + exist = True + self.log.debug(f'{record.uid} already exist') + + if not exist: + self.UID_DB.append(newUser) + result = True + self.log.debug(f'New User Created: ({newUser})') + + if not result: + self.log.critical(f'The User Object was not inserted {newUser}') + + return result + + def update(self, uid: str, newNickname: str) -> bool: + """Update the nickname starting from the UID + + Args: + uid (str): UID of the user + newNickname (str): New nickname + + Returns: + bool: True if updated + """ + result = False + + for record in self.UID_DB: + if record.uid == uid: + record.nickname = newNickname + result = True + self.log.debug(f'UID ({record.uid}) has been updated with new nickname {newNickname}') + + if not result: + self.log.critical(f'The new nickname {newNickname} was not updated, uid = {uid}') + + return result + + def delete(self, uid: str) -> bool: + """Delete the User starting from the UID + + Args: + uid (str): UID of the user + + Returns: + bool: True if deleted + """ + result = False + + for record in self.UID_DB: + if record.uid == uid: + self.UID_DB.remove(record) + result = True + self.log.debug(f'UID ({record.uid}) has been deleted') + + if not result: + self.log.critical(f'The UID {uid} was not deleted') + + return result + + def get_User(self, uidornickname: str) -> Union[UserModel, None]: + """Get The User Object model + + Args: + uidornickname (str): UID or Nickname + + Returns: + UserModel|None: The UserModel Object | None + """ + User = None + for record in self.UID_DB: + if record.uid == uidornickname: + User = record + elif record.nickname == uidornickname: + User = record + + self.log.debug(f'Search {uidornickname} -- result = {User}') + + return User + + def get_uid(self, uidornickname:str) -> Union[str, None]: + """Get the UID of the user starting from the UID or the Nickname + + Args: + uidornickname (str): UID or Nickname + + Returns: + str|None: Return the UID + """ + uid = None + for record in self.UID_DB: + if record.uid == uidornickname: + uid = record.uid + if record.nickname == uidornickname: + uid = record.uid + + self.log.debug(f'The UID that you are looking for {uidornickname} has been found {uid}') + return uid + + def get_nickname(self, uidornickname:str) -> Union[str, None]: + """Get the Nickname starting from UID or the nickname + + Args: + uidornickname (str): UID or Nickname of the user + + Returns: + str|None: the nickname + """ + nickname = None + for record in self.UID_DB: + if record.nickname == uidornickname: + nickname = record.nickname + if record.uid == uidornickname: + nickname = record.nickname + self.log.debug(f'The value to check {uidornickname} -> {nickname}') + return nickname + +class Admin: + + @dataclass + class AdminModel: + uid: str + nickname: str + username: str + hostname: str + umodes: str + vhost: str + level: int + connexion_datetime: datetime = field(default=datetime.now()) + + UID_ADMIN_DB: list[AdminModel] = [] + + def __init__(self, Base: Base) -> None: + self.log = Base.logs + pass + + def insert(self, newAdmin: AdminModel) -> bool: + + result = False + exist = False + + for record in self.UID_ADMIN_DB: + if record.uid == newAdmin.uid: + exist = True + self.log.debug(f'{record.uid} already exist') + + if not exist: + self.UID_ADMIN_DB.append(newAdmin) + result = True + self.log.debug(f'UID ({newAdmin.uid}) has been created') + + if not result: + self.log.critical(f'The User Object was not inserted {newAdmin}') + + return result + + def update(self, uid: str, newNickname: str) -> bool: + + result = False + + for record in self.UID_ADMIN_DB: + if record.uid == uid: + record.nickname = newNickname + result = True + self.log.debug(f'UID ({record.uid}) has been updated with new nickname {newNickname}') + + if not result: + self.log.critical(f'The new nickname {newNickname} was not updated, uid = {uid}') + + return result + + def delete(self, uid: str) -> bool: + + result = False + + for record in self.UID_ADMIN_DB: + if record.uid == uid: + self.UID_ADMIN_DB.remove(record) + result = True + self.log.debug(f'UID ({record.uid}) has been created') + + if not result: + self.log.critical(f'The UID {uid} was not deleted') + + return result + + def get_Admin(self, uidornickname: str) -> Union[AdminModel, None]: + + Admin = None + for record in self.UID_ADMIN_DB: + if record.uid == uidornickname: + Admin = record + elif record.nickname == uidornickname: + Admin = record + + self.log.debug(f'Search {uidornickname} -- result = {Admin}') + + return Admin + + def get_uid(self, uidornickname:str) -> Union[str, None]: + + uid = None + for record in self.UID_ADMIN_DB: + if record.uid == uidornickname: + uid = record.uid + if record.nickname == uidornickname: + uid = record.uid + + self.log.debug(f'The UID that you are looking for {uidornickname} has been found {uid}') + return uid + + def get_nickname(self, uidornickname:str) -> Union[str, None]: + + nickname = None + for record in self.UID_ADMIN_DB: + if record.nickname == uidornickname: + nickname = record.nickname + if record.uid == uidornickname: + nickname = record.nickname + self.log.debug(f'The value {uidornickname} -- {nickname}') + return nickname + +class Channel: + + @dataclass + class ChannelModel: + name: str + mode: str + uids: list + + UID_CHANNEL_DB: list[ChannelModel] = [] + + def __init__(self, Base: Base) -> None: + self.log = Base.logs + self.Base = Base + pass + + def insert(self, newChan: ChannelModel) -> bool: + + result = False + exist = False + + for record in self.UID_CHANNEL_DB: + if record.name == newChan.name: + exist = True + self.log.debug(f'{record.name} already exist') + + for user in newChan.uids: + record.uids.append(user) + + # Supprimer les doublons + del_duplicates = list(set(record.uids)) + record.uids = del_duplicates + self.log.debug(f'Updating a new UID to the channel {record}') + + + if not exist: + self.UID_CHANNEL_DB.append(newChan) + result = True + self.log.debug(f'New Channel Created: ({newChan})') + + if not result: + self.log.critical(f'The Channel Object was not inserted {newChan}') + + return result + + def update(self, name: str, newMode: str) -> bool: + + result = False + + for record in self.UID_CHANNEL_DB: + if record.name == name: + record.mode = newMode + result = True + self.log.debug(f'Mode ({record.name}) has been updated with new mode {newMode}') + + if not result: + self.log.critical(f'The channel mode {newMode} was not updated, name = {name}') + + return result + + def delete(self, name: str) -> bool: + + result = False + + for record in self.UID_CHANNEL_DB: + if record.name == name: + self.UID_CHANNEL_DB.remove(record) + result = True + self.log.debug(f'Channel ({record.name}) has been created') + + if not result: + self.log.critical(f'The Channel {name} was not deleted') + + return result + + def delete_user_from_channel(self, chan_name: str, uid:str) -> bool: + try: + result = False + + for record in self.UID_CHANNEL_DB: + if record.name == chan_name: + for user_id in record.uids: + if self.Base.clean_uid(user_id) == uid: + record.uids.remove(user_id) + self.log.debug(f'uid {uid} has been removed, here is the new object: {record}') + result = True + + for record in self.UID_CHANNEL_DB: + if not record.uids: + self.UID_CHANNEL_DB.remove(record) + self.log.debug(f'Channel {record.name} has been removed, here is the new object: {record}') + + return result + except ValueError as ve: + self.log.error(f'{ve}') + + def get_Channel(self, name: str) -> Union[ChannelModel, None]: + + Channel = None + for record in self.UID_CHANNEL_DB: + if record.name == name: + Channel = record + + self.log.debug(f'Search {name} -- result = {Channel}') + + return Channel + + def get_mode(self, name:str) -> Union[str, None]: + + mode = None + for record in self.UID_CHANNEL_DB: + if record.name == name: + mode = record.mode + + self.log.debug(f'The mode of the channel {name} has been found: {mode}') + return mode diff --git a/core/base.py b/core/base.py index a8e7639..53efbe2 100644 --- a/core/base.py +++ b/core/base.py @@ -1,31 +1,22 @@ -import time, threading, os, random, socket, hashlib, ipaddress, logging, requests, json, sys +import time, threading, os, random, socket, hashlib, ipaddress, logging, requests, json, re +from typing import Union +from base64 import b64decode from datetime import datetime from sqlalchemy import create_engine, Engine, Connection, CursorResult from sqlalchemy.sql import text -from core.loadConf import Config +from core.loadConf import ConfigDataModel class Base: CORE_DB_PATH = 'core' + os.sep + 'db' + os.sep # Le dossier bases de données core MODS_DB_PATH = 'mods' + os.sep + 'db' + os.sep # Le dossier bases de données des modules PYTHON_MIN_VERSION = '3.10' # Version min de python - DB_SCHEMA:list[str] = { - 'admins': 'sys_admins', - 'commandes': 'sys_commandes', - 'logs': 'sys_logs', - 'modules': 'sys_modules' - } - DEFENDER_VERSION = '' # MAJOR.MINOR.BATCH - LATEST_DEFENDER_VERSION = '' # Latest Version of Defender in git - DEFENDER_DB_PATH = 'db' + os.sep # Séparateur en fonction de l'OS - DEFENDER_DB_NAME = 'defender' # Le nom de la base de données principale - - def __init__(self, Config: Config) -> None: + def __init__(self, Config: ConfigDataModel) -> None: self.Config = Config # Assigner l'objet de configuration self.init_log_system() # Demarrer le systeme de log - self.check_for_new_version() # Verifier si une nouvelle version est disponible + self.check_for_new_version(True) # Verifier si une nouvelle version est disponible self.running_timers:list[threading.Timer] = [] # Liste des timers en cours self.running_threads:list[threading.Thread] = [] # Liste des threads en cours @@ -48,12 +39,15 @@ class Base: with open(version_filename, 'r') as version_data: current_version:dict[str, str] = json.load(version_data) - self.DEFENDER_VERSION = current_version["version"] + # self.DEFENDER_VERSION = current_version["version"] + self.Config.current_version = current_version['version'] return None def __get_latest_defender_version(self) -> None: try: + self.logs.debug(f'Looking for a new version available on Github') + print(f'===> Looking for a new version available on Github') token = '' json_url = f'https://raw.githubusercontent.com/adator85/IRC_DEFENDER_MODULES/main/version.json' headers = { @@ -68,7 +62,8 @@ class Base: response.raise_for_status() # Vérifie si la requête a réussi json_response:dict = response.json() - self.LATEST_DEFENDER_VERSION = json_response["version"] + # self.LATEST_DEFENDER_VERSION = json_response["version"] + self.Config.latest_version = json_response['version'] return None except requests.HTTPError as err: @@ -76,16 +71,20 @@ class Base: except: self.logs.warning(f'Github not available to fetch latest version') - def check_for_new_version(self) -> bool: + def check_for_new_version(self, online:bool) -> bool: try: + self.logs.debug(f'Checking for a new service version') + # Assigner la version actuelle de Defender - self.__set_current_defender_version() + self.__set_current_defender_version() # Récuperer la dernier version disponible dans github - self.__get_latest_defender_version() + if online: + self.logs.debug(f'Retrieve the latest version from Github') + self.__get_latest_defender_version() isNewVersion = False - latest_version = self.LATEST_DEFENDER_VERSION - current_version = self.DEFENDER_VERSION + latest_version = self.Config.latest_version + current_version = self.Config.current_version curr_major , curr_minor, curr_patch = current_version.split('.') last_major, last_minor, last_patch = latest_version.split('.') @@ -130,7 +129,7 @@ class Base: Returns: None: Aucun retour """ - sql_insert = f"INSERT INTO {self.DB_SCHEMA['logs']} (datetime, server_msg) VALUES (:datetime, :server_msg)" + sql_insert = f"INSERT INTO {self.Config.table_log} (datetime, server_msg) VALUES (:datetime, :server_msg)" mes_donnees = {'datetime': str(self.get_datetime()),'server_msg': f'{log_message}'} self.db_execute_query(sql_insert, mes_donnees) @@ -166,7 +165,7 @@ class Base: cmd_list[2] = '*******' cmd = ' '.join(cmd_list) - insert_cmd_query = f"INSERT INTO {self.DB_SCHEMA['commandes']} (datetime, user, commande) VALUES (:datetime, :user, :commande)" + insert_cmd_query = f"INSERT INTO {self.Config.table_commande} (datetime, user, commande) VALUES (:datetime, :user, :commande)" mes_donnees = {'datetime': self.get_datetime(), 'user': user_cmd, 'commande': cmd} self.db_execute_query(insert_cmd_query, mes_donnees) @@ -181,7 +180,7 @@ class Base: Returns: bool: True si le module existe déja dans la base de données sinon False """ - query = f"SELECT id FROM {self.DB_SCHEMA['modules']} WHERE module = :module" + query = f"SELECT id FROM {self.Config.table_module} WHERE module = :module" mes_donnes = {'module': module_name} results = self.db_execute_query(query, mes_donnes) @@ -199,7 +198,7 @@ class Base: if not self.db_isModuleExist(module_name): self.logs.debug(f"Le module {module_name} n'existe pas alors ont le créer") - insert_cmd_query = f"INSERT INTO {self.DB_SCHEMA['modules']} (datetime, user, module) VALUES (:datetime, :user, :module)" + insert_cmd_query = f"INSERT INTO {self.Config.table_module} (datetime, user, module) VALUES (:datetime, :user, :module)" mes_donnees = {'datetime': self.get_datetime(), 'user': user_cmd, 'module': module_name} self.db_execute_query(insert_cmd_query, mes_donnees) # self.db_close_session(self.session) @@ -214,7 +213,7 @@ class Base: Args: cmd (str): le module a enregistrer """ - insert_cmd_query = f"DELETE FROM {self.DB_SCHEMA['modules']} WHERE module = :module" + insert_cmd_query = f"DELETE FROM {self.Config.table_module} WHERE module = :module" mes_donnees = {'module': module_name} self.db_execute_query(insert_cmd_query, mes_donnees) @@ -222,14 +221,20 @@ class Base: def db_create_first_admin(self) -> None: - user = self.db_execute_query(f"SELECT id FROM {self.DB_SCHEMA['admins']}") + user = self.db_execute_query(f"SELECT id FROM {self.Config.table_admin}") if not user.fetchall(): admin = self.Config.OWNER password = self.crypt_password(self.Config.PASSWORD) - mes_donnees = {'createdOn': self.get_datetime(), 'user': admin, 'password': password, 'hostname': '*', 'vhost': '*', 'level': 5} + mes_donnees = {'createdOn': self.get_datetime(), + 'user': admin, + 'password': password, + 'hostname': '*', + 'vhost': '*', + 'level': 5 + } self.db_execute_query(f""" - INSERT INTO {self.DB_SCHEMA['admins']} + INSERT INTO {self.Config.table_admin} (createdOn, user, password, hostname, vhost, level) VALUES (:createdOn, :user, :password, :hostname, :vhost, :level)""" @@ -348,8 +353,8 @@ class Base: def db_init(self) -> tuple[Engine, Connection]: - db_directory = self.DEFENDER_DB_PATH - full_path_db = self.DEFENDER_DB_PATH + self.DEFENDER_DB_NAME + db_directory = self.Config.db_path + full_path_db = self.Config.db_path + self.Config.db_name if not os.path.exists(db_directory): os.makedirs(db_directory) @@ -361,14 +366,14 @@ class Base: def __create_db(self) -> None: - table_logs = f'''CREATE TABLE IF NOT EXISTS {self.DB_SCHEMA['logs']} ( + table_logs = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_log} ( id INTEGER PRIMARY KEY AUTOINCREMENT, datetime TEXT, server_msg TEXT ) ''' - table_cmds = f'''CREATE TABLE IF NOT EXISTS {self.DB_SCHEMA['commandes']} ( + table_cmds = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_commande} ( id INTEGER PRIMARY KEY AUTOINCREMENT, datetime TEXT, user TEXT, @@ -376,7 +381,7 @@ class Base: ) ''' - table_modules = f'''CREATE TABLE IF NOT EXISTS {self.DB_SCHEMA['modules']} ( + table_modules = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_module} ( id INTEGER PRIMARY KEY AUTOINCREMENT, datetime TEXT, user TEXT, @@ -384,7 +389,7 @@ class Base: ) ''' - table_admins = f'''CREATE TABLE IF NOT EXISTS {self.DB_SCHEMA['admins']} ( + table_admins = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_admin} ( id INTEGER PRIMARY KEY AUTOINCREMENT, createdOn TEXT, user TEXT, @@ -464,6 +469,17 @@ class Base: except ValueError: return False + def decode_ip(self, ip_b64encoded: str) -> Union[str, None]: + + binary_ip = b64decode(ip_b64encoded) + try: + decoded_ip = ipaddress.ip_address(binary_ip) + + return decoded_ip.exploded + except ValueError as ve: + self.logs.critical(f'This remote ip is not valid : {ve}') + return None + def get_random(self, lenght:int) -> str: """ Retourn une chaîne aléatoire en fonction de la longueur spécifiée. @@ -491,3 +507,36 @@ class Base: # Vider le dictionnaire de fonction self.periodic_func.clear() + + def clean_uid(self, uid:str) -> str: + """Clean UID by removing @ / % / + / Owner / and * + + Args: + uid (str): The UID to clean + + Returns: + str: Clean UID without any sign + """ + + pattern = fr'[@|%|\+|~|\*]*' + parsed_UID = re.sub(pattern, '', uid) + + return parsed_UID + + def Is_Channel(self, channelToCheck: str) -> bool: + """Check if the string has the # caractere and return True if this is a channel + + Args: + channelToCheck (str): The string to test if it is a channel or not + + Returns: + bool: True if the string is a channel / False if this is not a channel + """ + + pattern = fr'^#' + isChannel = re.findall(pattern, channelToCheck) + + if not isChannel: + return False + else: + return True \ No newline at end of file diff --git a/core/dataClass.py b/core/dataClass.py new file mode 100644 index 0000000..4f99cb4 --- /dev/null +++ b/core/dataClass.py @@ -0,0 +1,274 @@ +from dataclasses import dataclass, field +from datetime import datetime +from typing import Union + +class User: + + @dataclass + class UserDataClass: + uid: str + nickname: str + username: str + hostname: str + umodes: str + vhost: str + isWebirc: bool + connexion_datetime: datetime = field(default=datetime.now()) + + UID_DB:list[UserDataClass] = [] + + def __init__(self) -> None: + pass + + def insert(self, user: UserDataClass) -> bool: + """Insert new user + + Args: + user (UserDataClass): The User dataclass + + Returns: + bool: True if the record has been created + """ + exists = False + inserted = False + + for record in self.UID_DB: + if record.uid == user.uid: + exists = True + print(f'{user.uid} already exist') + + if not exists: + self.UID_DB.append(user) + print(f'New record with uid: {user.uid}') + inserted = True + + return inserted + + def update(self, uid: str, newnickname: str) -> bool: + """Updating a single record with a new nickname + + Args: + uid (str): the uid of the user + newnickname (str): the new nickname + + Returns: + bool: True if the record has been updated + """ + status = False + for user in self.UID_DB: + if user.uid == uid: + user.nickname = newnickname + status = True + print(f'Updating record with uid: {uid}') + + return status + + def delete(self, uid: str) -> bool: + """Delete a user based on his uid + + Args: + uid (str): The UID of the user + + Returns: + bool: True if the record has been deleted + """ + status = False + for user in self.UID_DB: + if user.uid == uid: + self.UID_DB.remove(user) + status = True + print(f'Removing record with uid: {uid}') + + return status + + def isexist(self, uidornickname:str) -> bool: + """do the UID or Nickname exist ? + + Args: + uidornickname (str): The UID or the Nickname + + Returns: + bool: True if exist or False if don't exist + """ + result = False + for record in self.UID_DB: + if record.uid == uidornickname: + result = True + if record.nickname == uidornickname: + result = True + + return result + + def get_User(self, uidornickname) -> Union[UserDataClass, None]: + + UserObject = None + for record in self.UID_DB: + if record.uid == uidornickname: + UserObject = record + elif record.nickname == uidornickname: + UserObject = record + + return UserObject + + def get_uid(self, uidornickname:str) -> Union[str, None]: + + uid = None + for record in self.UID_DB: + if record.uid == uidornickname: + uid = record.uid + if record.nickname == uidornickname: + uid = record.uid + + return uid + + def get_nickname(self, uidornickname:str) -> Union[str, None]: + + nickname = None + for record in self.UID_DB: + if record.nickname == uidornickname: + nickname = record.nickname + if record.uid == uidornickname: + nickname = record.nickname + + return nickname + +class Admin: + @dataclass + class AdminDataClass: + uid: str + nickname: str + username: str + hostname: str + umodes: str + vhost: str + level: int + connexion_datetime: datetime = field(default=datetime.now()) + + UID_ADMIN_DB:list[AdminDataClass] = [] + + def __init__(self) -> None: + pass + + def insert(self, admin: AdminDataClass) -> bool: + """Insert new user + + Args: + user (UserDataClass): The User dataclass + + Returns: + bool: True if the record has been created + """ + exists = False + inserted = False + + for record in self.UID_ADMIN_DB: + if record.uid == admin.uid: + exists = True + print(f'{admin.uid} already exist') + + if not exists: + self.UID_ADMIN_DB.append(admin) + print(f'New record with uid: {admin.uid}') + inserted = True + + return inserted + + def update(self, uid: str, newnickname: str) -> bool: + """Updating a single record with a new nickname + + Args: + uid (str): the uid of the user + newnickname (str): the new nickname + + Returns: + bool: True if the record has been updated + """ + status = False + for admin in self.UID_ADMIN_DB: + if admin.uid == uid: + admin.nickname = newnickname + status = True + print(f'Updating record with uid: {uid}') + + return status + + def delete(self, uid: str) -> bool: + """Delete a user based on his uid + + Args: + uid (str): The UID of the user + + Returns: + bool: True if the record has been deleted + """ + status = False + for admin in self.UID_ADMIN_DB: + if admin.uid == uid: + self.UID_ADMIN_DB.remove(admin) + status = True + print(f'Removing record with uid: {uid}') + + return status + + def isexist(self, uidornickname:str) -> bool: + """do the UID or Nickname exist ? + + Args: + uidornickname (str): The UID or the Nickname + + Returns: + bool: True if exist or False if don't exist + """ + result = False + for record in self.UID_ADMIN_DB: + if record.uid == uidornickname: + result = True + if record.nickname == uidornickname: + result = True + + return result + + def get_Admin(self, uidornickname) -> Union[AdminDataClass, None]: + + AdminObject = None + for record in self.UID_ADMIN_DB: + if record.uid == uidornickname: + AdminObject = record + elif record.nickname == uidornickname: + AdminObject = record + + return AdminObject + + def get_uid(self, uidornickname:str) -> Union[str, None]: + + uid = None + for record in self.UID_ADMIN_DB: + if record.uid == uidornickname: + uid = record.uid + if record.nickname == uidornickname: + uid = record.uid + + return uid + + def get_nickname(self, uidornickname:str) -> Union[str, None]: + + nickname = None + for record in self.UID_ADMIN_DB: + if record.nickname == uidornickname: + nickname = record.nickname + if record.uid == uidornickname: + nickname = record.nickname + + return nickname + + def get_level(self, uidornickname:str) -> int: + + level = 0 + for record in self.UID_ADMIN_DB: + if record.uid == uidornickname: + level = record.level + if record.nickname == uidornickname: + level = record.level + + return level + diff --git a/core/irc.py b/core/irc.py index df9b802..26cdb05 100644 --- a/core/irc.py +++ b/core/irc.py @@ -3,6 +3,7 @@ from ssl import SSLSocket from datetime import datetime, timedelta from typing import Union from core.loadConf import Config +from core.Model import User, Admin, Channel from core.base import Base class Irc: @@ -10,8 +11,7 @@ class Irc: def __init__(self) -> 'Irc': self.defender_connexion_datetime = datetime.now() # Date et heure de la premiere connexion de Defender - self.db_uid = {} # Definir la variable qui contiendra la liste des utilisateurs connectés au réseau - self.db_admin = {} # Definir la variable qui contiendra la liste des administrateurs + self.first_score: int = 100 self.db_chan = [] # Definir la variable qui contiendra la liste des salons self.loaded_classes:dict[str, 'Irc'] = {} # Definir la variable qui contiendra la liste modules chargés self.beat = 30 # Lancer toutes les 30 secondes des actions de nettoyages @@ -24,13 +24,13 @@ class Irc: self.CHARSET = ['utf-8', 'iso-8859-1'] # Charset utiliser pour décoder/encoder les messages self.SSL_VERSION = None # Version SSL - self.Config = Config().ConfigModel + self.Config = Config().ConfigObject # Liste des commandes internes du bot self.commands_level = { 0: ['help', 'auth', 'copyright'], 1: ['load','reload','unload', 'deauth', 'uptime', 'checkversion'], - 2: ['show_modules', 'show_timers', 'show_threads', 'sentinel'], + 2: ['show_modules', 'show_timers', 'show_threads', 'show_channels'], 3: ['quit', 'restart','addaccess','editaccess', 'delaccess'] } @@ -41,12 +41,20 @@ class Irc: self.commands.append(command) self.Base = Base(self.Config) + self.User = User(self.Base) + self.Admin = Admin(self.Base) + self.Channel = Channel(self.Base) self.Base.create_thread(func=self.heartbeat, func_args=(self.beat, )) ############################################## # CONNEXION IRC # ############################################## def init_irc(self, ircInstance:'Irc') -> None: + """Create a socket and connect to irc server + + Args: + ircInstance (Irc): Instance of Irc object. + """ try: self.__create_socket() self.__connect_to_irc(ircInstance) @@ -54,7 +62,8 @@ class Irc: self.Base.logs.critical(f'Assertion error: {ae}') def __create_socket(self) -> None: - + """Create a socket to connect SSL or Normal connection + """ try: soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM or socket.SOCK_NONBLOCK) connexion_information = (self.Config.SERVEUR_IP, self.Config.SERVEUR_PORT) @@ -65,9 +74,8 @@ class Irc: ssl_connexion = ssl_context.wrap_socket(soc, server_hostname=self.Config.SERVEUR_HOSTNAME) ssl_connexion.connect(connexion_information) self.IrcSocket:SSLSocket = ssl_connexion - - self.Base.logs.info(f"Connexion en mode SSL : Version = {self.IrcSocket.version()}") self.SSL_VERSION = self.IrcSocket.version() + self.Base.logs.info(f"Connexion en mode SSL : Version = {self.SSL_VERSION}") else: soc.connect(connexion_information) self.IrcSocket:socket.socket = soc @@ -113,7 +121,7 @@ class Irc: # Reload configuration self.Base.logs.debug('Reloading configuration') - self.Config = Config().ConfigModel + self.Config = Config().ConfigObject self.Base = Base(self.Config) self.__create_socket() @@ -162,7 +170,7 @@ class Irc: # except Exception as e: # self.debug(f"Exception: {e}") - def __link(self, writer:socket.socket) -> None: + def __link(self, writer:Union[socket.socket, SSLSocket]) -> None: """Créer le link et envoyer les informations nécessaires pour la connexion au serveur. @@ -186,7 +194,7 @@ class Irc: sid = self.Config.SERVEUR_ID service_id = self.Config.SERVICE_ID - version = self.Base.DEFENDER_VERSION + version = self.Config.current_version unixtime = self.Base.get_unixtime() # Envoyer un message d'identification @@ -252,6 +260,10 @@ class Irc: except AssertionError as ae: self.Base.logs.error(f"Assertion error : {ae}") + def unload(self) -> None: + # This is only to reference the method + return None + ############################################## # FIN CONNEXION IRC # ############################################## @@ -262,7 +274,7 @@ class Irc: Returns: None: Aucun retour requis, elle charge puis c'est tout """ - result = self.Base.db_execute_query(f"SELECT module FROM {self.Base.DB_SCHEMA['modules']}") + result = self.Base.db_execute_query(f"SELECT module FROM {self.Config.table_module}") for r in result.fetchall(): self.load_module('sys', r[0], True) @@ -383,7 +395,15 @@ class Irc: my_class = getattr(loaded_module, class_name, None) # Récuperer le nom de classe create_instance_of_the_class = my_class(self.ircObject) # Créer une nouvelle instance de la classe - self.loaded_classes[class_name] = create_instance_of_the_class # Charger la nouvelle class dans la variable globale + + if not hasattr(create_instance_of_the_class, 'cmd'): + self.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} :Module {module_name} ne contient pas de méthode cmd") + self.Base.logs.critical(f"The Module {module_name} has not been loaded because cmd method is not available") + self.Base.db_delete_module(module_name) + return False + + # Charger la nouvelle class dans la variable globale + self.loaded_classes[class_name] = create_instance_of_the_class # Enregistrer le module dans la base de données if not init: @@ -399,149 +419,54 @@ class Irc: except Exception as e: self.Base.logs.error(f"Something went wrong with a module you want to load : {e}") - def insert_db_uid(self, uid:str, nickname:str, username:str, hostname:str, umodes:str, vhost:str, isWebirc: bool) -> None: - - if uid in self.db_uid: - return None - - self.db_uid[uid] = { - 'nickname': nickname, - 'username': username, - 'hostname': hostname, - 'umodes': umodes, - 'vhost': vhost, - 'isWebirc': isWebirc, - 'datetime': datetime.now() - } - - self.db_uid[nickname] = { - 'uid': uid, - 'username': username, - 'hostname': hostname, - 'umodes': umodes, - 'vhost': vhost, - 'isWebirc': isWebirc, - 'datetime': datetime.now() - } - - return None - - def update_db_uid(self, uid:str, newnickname:str) -> None: - - # Récupérer l'ancien nickname - oldnickname = self.db_uid[uid]['nickname'] - - # Enregistrement du nouveau nickname - self.db_uid[newnickname] = { - 'uid': uid, - 'username': self.db_uid[uid]['username'], - 'hostname': self.db_uid[uid]['hostname'], - 'umodes': self.db_uid[uid]['umodes'], - 'vhost': self.db_uid[uid]['vhost'] - } - - # Modification du nickname dans la ligne UID - self.db_uid[uid]['nickname'] = newnickname - - # Supprimer l'ancien nickname - if oldnickname in self.db_uid: - del self.db_uid[oldnickname] - else: - self.Base.logs.debug(f"L'ancien nickname {oldnickname} n'existe pas dans UID_DB") - response = False - - self.Base.logs.debug(f"{oldnickname} changed to {newnickname}") - - return None - - def delete_db_uid(self, uid:str) -> None: - - uid_reel = self.get_uid(uid) - nickname = self.get_nickname(uid_reel) - - if uid_reel in self.db_uid: - del self.db_uid[uid] - - if nickname in self.db_uid: - del self.db_uid[nickname] - - return None - def insert_db_admin(self, uid:str, level:int) -> None: - if not uid in self.db_uid: + if self.User.get_User(uid) is None: return None + + getUser = self.User.get_User(uid) - nickname = self.db_uid[uid]['nickname'] - username = self.db_uid[uid]['username'] - hostname = self.db_uid[uid]['hostname'] - umodes = self.db_uid[uid]['umodes'] - vhost = self.db_uid[uid]['vhost'] + nickname = getUser.nickname + username = getUser.username + hostname = getUser.hostname + umodes = getUser.umodes + vhost = getUser.vhost level = int(level) - self.db_admin[uid] = { - 'nickname': nickname, - 'username': username, - 'hostname': hostname, - 'umodes': umodes, - 'vhost': vhost, - 'datetime': self.Base.get_datetime(), - 'level': level - } - - self.db_admin[nickname] = { - 'uid': uid, - 'username': username, - 'hostname': hostname, - 'umodes': umodes, - 'vhost': vhost, - 'datetime': self.Base.get_datetime(), - 'level': level - } + self.Admin.insert( + self.Admin.AdminModel( + uid=uid, + nickname=nickname, + username=username, + hostname=hostname, + umodes=umodes, + vhost=vhost, + level=level, + connexion_datetime=datetime.now() + ) + ) return None def delete_db_admin(self, uid:str) -> None: - if not uid in self.db_admin: + if self.Admin.get_Admin(uid) is None: return None - nickname_admin = self.db_admin[uid]['nickname'] - - if uid in self.db_admin: - del self.db_admin[uid] - - if nickname_admin in self.db_admin: - del self.db_admin[nickname_admin] + if not self.Admin.delete(uid): + self.Base.logs.critical(f'UID: {uid} was not deleted') return None - def insert_db_chan(self, channel:str) -> bool: - """Ajouter l'ensemble des salons dans la variable {CHAN_DB} - - Args: - channel (str): le salon à insérer dans {CHAN_DB} - - Returns: - bool: True si insertion OK / False si insertion KO - """ - if channel in self.db_chan: - return False - - response = True - # Ajouter un nouveau salon - self.db_chan.append(channel) - - # Supprimer les doublons de la liste - self.db_chan = list(set(self.db_chan)) - - self.Base.logs.debug(f"Le salon {channel} a été ajouté à la liste CHAN_DB") - - return response - def create_defender_user(self, nickname:str, level: int, password:str) -> str: - nickname = self.get_nickname(nickname) + get_user = self.User.get_User(nickname) + if get_user is None: + response = f'This nickname {nickname} does not exist, it is not possible to create this user' + self.Base.logs.warning(response) + return response + + nickname = get_user.nickname response = '' if level > 4: @@ -549,25 +474,19 @@ class Irc: self.Base.logs.warning(response) return response - # Verification si le user existe dans notre UID_DB - if not nickname in self.db_uid: - response = f"{nickname} n'est pas connecté, impossible de l'enregistrer pour le moment" - self.Base.logs.warning(response) - return response - - hostname = self.db_uid[nickname]['hostname'] - vhost = self.db_uid[nickname]['vhost'] + hostname = get_user.hostname + vhost = get_user.vhost spassword = self.Base.crypt_password(password) mes_donnees = {'admin': nickname} - query_search_user = f"SELECT id FROM {self.Base.DB_SCHEMA['admins']} WHERE user=:admin" + query_search_user = f"SELECT id FROM {self.Config.table_admin} WHERE user=:admin" r = self.Base.db_execute_query(query_search_user, mes_donnees) exist_user = r.fetchone() # On verifie si le user exist dans la base if not exist_user: mes_donnees = {'datetime': self.Base.get_datetime(), 'user': nickname, 'password': spassword, 'hostname': hostname, 'vhost': vhost, 'level': level} - self.Base.db_execute_query(f'''INSERT INTO {self.Base.DB_SCHEMA['admins']} + self.Base.db_execute_query(f'''INSERT INTO {self.Config.table_admin} (createdOn, user, password, hostname, vhost, level) VALUES (:datetime, :user, :password, :hostname, :vhost, :level) ''', mes_donnees) @@ -581,41 +500,15 @@ class Irc: self.Base.logs.info(response) return response - def get_uid(self, uidornickname:str) -> Union[str, None]: - - uid_recherche = uidornickname - response = None - for uid, value in self.db_uid.items(): - if uid == uid_recherche: - if 'nickname' in value: - response = uid - if 'uid' in value: - response = value['uid'] - - return response - - def get_nickname(self, uidornickname:str) -> Union[str, None]: - - nickname_recherche = uidornickname - - response = None - for nickname, value in self.db_uid.items(): - if nickname == nickname_recherche: - if 'nickname' in value: - response = value['nickname'] - if 'uid' in value: - response = nickname - - return response - - def is_cmd_allowed(self,nickname:str, cmd:str) -> bool: + def is_cmd_allowed(self, nickname:str, cmd:str) -> bool: # Vérifier si le user est identifié et si il a les droits is_command_allowed = False - uid = self.get_uid(nickname) + uid = self.User.get_uid(nickname) + get_admin = self.Admin.get_Admin(uid) - if uid in self.db_admin: - admin_level = self.db_admin[uid]['level'] + if not get_admin is None: + admin_level = get_admin.level for ref_level, ref_commands in self.commands_level.items(): # print(f"LevelNo: {ref_level} - {ref_commands} - {admin_level}") @@ -651,6 +544,18 @@ class Irc: return None + def thread_check_for_new_version(self, fromuser: str) -> None: + + dnickname = self.Config.SERVICE_NICKNAME + + if self.Base.check_for_new_version(True): + self.send2socket(f':{dnickname} NOTICE {fromuser} : New Version available : {self.Config.current_version} >>> {self.Config.latest_version}') + self.send2socket(f':{dnickname} NOTICE {fromuser} : Please run (git pull origin main) in the current folder') + else: + self.send2socket(f':{dnickname} NOTICE {fromuser} : You have the latest version of defender') + + return None + def cmd(self, data:list) -> None: try: @@ -713,10 +618,15 @@ class Irc: try: # if self.Config.ABUSEIPDB == 1: # self.Base.create_thread(self.abuseipdb_scan, (cmd[2], )) + self.first_connexion_ip = cmd[2] + self.first_score = int(cmd[3]) pass # Possibilité de déclancher les bans a ce niveau. except IndexError as ie: self.Base.logs.error(f'{ie}') + except ValueError as ve: + self.first_score = 0 + self.Base.logs.error(f'Impossible to convert first_score: {ve}') case '320': #:irc.deb.biz.st 320 PyDefender IRCParis07 :is in security-groups: known-users,webirc-users,tls-and-known-users,tls-users @@ -735,10 +645,12 @@ class Irc: hsid = str(cmd[0]).replace(':','') if hsid == self.HSID: if self.INIT == 1: - if self.Base.check_for_new_version(): - version = f'{self.Base.DEFENDER_VERSION} >>> {self.Base.LATEST_DEFENDER_VERSION}' + current_version = self.Config.current_version + latest_version = self.Config.latest_version + if self.Base.check_for_new_version(False): + version = f'{current_version} >>> {latest_version}' else: - version = f'{self.Base.DEFENDER_VERSION}' + version = f'{current_version}' self.send2socket(f"MODE {self.Config.SERVICE_NICKNAME} +B") self.send2socket(f"JOIN {self.Config.SERVICE_CHANLOG}") @@ -764,13 +676,11 @@ class Irc: self.Base.logs.info(f"# VERSION : {version} ") self.Base.logs.info(f"################################################") - if self.Base.check_for_new_version(): + if self.Base.check_for_new_version(False): self.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} : New Version available {version}") # Initialisation terminé aprés le premier PING self.INIT = 0 - # self.send2socket(f':{self.Config.SERVICE_ID} PING :{hsid}') - # print(self.db_uid) case _: pass @@ -784,7 +694,7 @@ class Irc: # :001N1WD7L QUIT :Quit: free_znc_1 cmd.pop(0) uid_who_quit = str(cmd[0]).replace(':', '') - self.delete_db_uid(uid_who_quit) + self.User.delete(uid_who_quit) case 'PONG': # ['@msgid=aTNJhp17kcPboF5diQqkUL;time=2023-12-28T20:35:58.411Z', ':irc.deb.biz.st', 'PONG', 'irc.deb.biz.st', ':Dev-PyDefender'] @@ -798,17 +708,63 @@ class Irc: cmd.pop(0) uid = str(cmd[0]).replace(':','') newnickname = cmd[2] + self.User.update(uid, newnickname) - self.update_db_uid(uid, newnickname) + case 'MODE': + #['@msgid=d0ySx56Yd0nc35oHts2SkC-/J9mVUA1hfM6+Z4494xWUg;time=2024-08-09T12:45:36.651Z', + # ':001', 'MODE', '#a', '+nt', '1723207536'] + cmd.pop(0) + if '#' in cmd[2]: + channel = cmd[2] + mode = cmd[3] + self.Channel.update(channel, mode) case 'SJOIN': - # ['@msgid=ictnEBhHmTUHzkEeVZl6rR;time=2023-12-28T20:03:18.482Z', ':001', 'SJOIN', '1702139101', '#stats', '+nst', ':@001SB890A', '@00BAAAAAI'] + # ['@msgid=5sTwGdj349D82L96p749SY;time=2024-08-15T09:50:23.528Z', ':001', 'SJOIN', '1721564574', '#welcome', ':001JD94QH'] + # ['@msgid=bvceb6HthbLJapgGLXn1b0;time=2024-08-15T09:50:11.464Z', ':001', 'SJOIN', '1721564574', '#welcome', '+lnrt', '13', ':001CIVLQF', '+11ZAAAAAB', '001QGR10C', '*@0014UE10B', '001NL1O07', '001SWZR05', '001HB8G04', '@00BAAAAAJ', '0019M7101'] cmd.pop(0) - channel = cmd[3] - self.insert_db_chan(channel) + channel = str(cmd[3]).lower() + mode = cmd[4] + len_cmd = len(cmd) + list_users:list = [] + + start_boucle = 0 + + # Trouver le premier user + for i in range(len_cmd): + s: list = re.findall(fr':', cmd[i]) + if s: + start_boucle = i + + # Boucle qui va ajouter l'ensemble des users (UID) + for i in range(start_boucle, len(cmd)): + parsed_UID = str(cmd[i]) + pattern = fr'[:|@|%|\+|~|\*]*' + pattern = fr':' + parsed_UID = re.sub(pattern, '', parsed_UID) + list_users.append(parsed_UID) + + self.Channel.insert( + self.Channel.ChannelModel( + name=channel, + mode=mode, + uids=list_users + ) + ) + + case 'PART': + # ['@unrealircd.org/geoip=FR;unrealircd.org/userhost=50d6492c@80.214.73.44;unrealircd.org/userip=50d6492c@80.214.73.44;msgid=YSIPB9q4PcRu0EVfC9ci7y-/mZT0+Gj5FLiDSZshH5NCw;time=2024-08-15T15:35:53.772Z', + # ':001EPFBRD', 'PART', '#welcome', ':WEB', 'IRC', 'Paris'] + uid = str(cmd[1]).replace(':','') + channel = str(cmd[3]) + self.Channel.delete_user_from_channel(channel, uid) + + pass case 'UID': - + # ['@s2s-md/geoip=cc=GB|cd=United\\sKingdom|asn=16276|asname=OVH\\sSAS;s2s-md/tls_cipher=TLSv1.3-TLS_CHACHA20_POLY1305_SHA256;s2s-md/creationtime=1721564601', + # ':001', 'UID', 'albatros', '0', '1721564597', 'albatros', 'vps-91b2f28b.vps.ovh.net', + # '001HB8G04', '0', '+iwxz', 'Clk-A62F1D18.vps.ovh.net', 'Clk-A62F1D18.vps.ovh.net', 'MyZBwg==', ':...'] if 'webirc' in cmd[0]: isWebirc = True else: @@ -820,8 +776,27 @@ class Irc: hostname = str(cmd[7]) umodes = str(cmd[10]) vhost = str(cmd[11]) + if not 'S' in umodes: + remote_ip = self.Base.decode_ip(str(cmd[13])) + else: + remote_ip = '127.0.0.1' - self.insert_db_uid(uid, nickname, username, hostname, umodes, vhost, isWebirc) + score_connexion = self.first_score + + self.User.insert( + self.User.UserModel( + uid=uid, + nickname=nickname, + username=username, + hostname=hostname, + umodes=umodes, + vhost=vhost, + isWebirc=isWebirc, + remote_ip=remote_ip, + score_connexion=score_connexion, + connexion_datetime=datetime.now() + ) + ) for classe_name, classe_object in self.loaded_classes.items(): classe_object.cmd(cmd_to_send) @@ -842,7 +817,8 @@ class Irc: else: self.Base.logs.info(f'{cmd}') # user_trigger = get_user.split('!')[0] - user_trigger = self.get_nickname(get_uid_or_nickname) + # user_trigger = self.get_nickname(get_uid_or_nickname) + user_trigger = self.User.get_nickname(get_uid_or_nickname) dnickname = self.Config.SERVICE_NICKNAME pattern = fr'(:\{self.Config.SERVICE_PREFIX})(.*)$' @@ -860,7 +836,7 @@ class Irc: cmd_to_send = convert_to_string.replace(':','') self.Base.log_cmd(user_trigger, cmd_to_send) - self._hcmds(user_trigger, arg) + self._hcmds(user_trigger, arg, cmd) if cmd[2] == self.Config.SERVICE_ID: pattern = fr'^:.*?:(.*)$' @@ -874,7 +850,7 @@ class Irc: # Réponse a un CTCP VERSION if arg[0] == '\x01VERSION\x01': - self.send2socket(f':{dnickname} NOTICE {user_trigger} :\x01VERSION Service {self.Config.SERVICE_NICKNAME} V{self.Base.DEFENDER_VERSION}\x01') + self.send2socket(f':{dnickname} NOTICE {user_trigger} :\x01VERSION Service {self.Config.SERVICE_NICKNAME} V{self.Config.current_version}\x01') return False # Réponse a un TIME @@ -896,9 +872,9 @@ class Irc: return False cmd_to_send = convert_to_string.replace(':','') - self.Base.log_cmd(self.get_nickname(user_trigger), cmd_to_send) + self.Base.log_cmd(self.User.get_nickname(user_trigger), cmd_to_send) - self._hcmds(user_trigger, arg) + self._hcmds(user_trigger, arg, cmd) except IndexError as io: self.Base.logs.error(f'{io}') @@ -914,10 +890,10 @@ class Irc: except IndexError as ie: self.Base.logs.error(f"{ie} / {cmd} / length {str(len(cmd))}") - def _hcmds(self, user: str, cmd:list) -> None: + def _hcmds(self, user: str, cmd:list, fullcmd: list = []) -> None: - fromuser = self.get_nickname(user) # Nickname qui a lancé la commande - uid = self.get_uid(fromuser) # Récuperer le uid de l'utilisateur + fromuser = self.User.get_nickname(user) # Nickname qui a lancé la commande + uid = self.User.get_uid(fromuser) # Récuperer le uid de l'utilisateur # Defender information dnickname = self.Config.SERVICE_NICKNAME # Defender nickname @@ -935,14 +911,14 @@ class Irc: # Envoyer la commande aux classes dynamiquement chargées if command != 'notallowed': for classe_name, classe_object in self.loaded_classes.items(): - classe_object._hcmds(user, cmd) + classe_object._hcmds(user, cmd, fullcmd) match command: case 'notallowed': try: current_command = cmd[0] - self.send2socket(f':{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR["rouge"]}{current_command}{self.Config.CONFIG_COLOR["noire"]} ] - Accès Refusé à {self.get_nickname(fromuser)}') + self.send2socket(f':{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR["rouge"]}{current_command}{self.Config.CONFIG_COLOR["noire"]} ] - Accès Refusé à {self.User.get_nickname(fromuser)}') self.send2socket(f':{dnickname} NOTICE {fromuser} : Accès Refusé') except IndexError as ie: self.Base.logs.error(f'{ie}') @@ -950,29 +926,29 @@ class Irc: case 'deauth': current_command = cmd[0] - uid_to_deauth = self.get_uid(fromuser) + uid_to_deauth = self.User.get_uid(fromuser) self.delete_db_admin(uid_to_deauth) - self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['rouge']}{current_command}{self.Config.CONFIG_COLOR['noire']} ] - {self.get_nickname(fromuser)} est désormais déconnecter de {dnickname}") + self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['rouge']}{current_command}{self.Config.CONFIG_COLOR['noire']} ] - {self.User.get_nickname(fromuser)} est désormais déconnecter de {dnickname}") case 'auth': # ['auth', 'adator', 'password'] current_command = cmd[0] - user_to_log = self.get_nickname(cmd[1]) + user_to_log = self.User.get_nickname(cmd[1]) password = cmd[2] if not user_to_log is None: mes_donnees = {'user': user_to_log, 'password': self.Base.crypt_password(password)} - query = f"SELECT id, level FROM {self.Base.DB_SCHEMA['admins']} WHERE user = :user AND password = :password" + query = f"SELECT id, level FROM {self.Config.table_admin} WHERE user = :user AND password = :password" result = self.Base.db_execute_query(query, mes_donnees) user_from_db = result.fetchone() if not user_from_db is None: - uid_user = self.get_uid(user_to_log) + uid_user = self.User.get_uid(user_to_log) self.insert_db_admin(uid_user, user_from_db[1]) - self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['verte']}{current_command}{self.Config.CONFIG_COLOR['noire']} ] - {self.get_nickname(fromuser)} est désormais connecté a {dnickname}") + self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['verte']}{current_command}{self.Config.CONFIG_COLOR['noire']} ] - {self.User.get_nickname(fromuser)} est désormais connecté a {dnickname}") self.send2socket(f":{self.Config.SERVICE_NICKNAME} NOTICE {fromuser} :Connexion a {dnickname} réussie!") else: - self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['rouge']}{current_command}{self.Config.CONFIG_COLOR['noire']} ] - {self.get_nickname(fromuser)} a tapé un mauvais mot de pass") + self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['rouge']}{current_command}{self.Config.CONFIG_COLOR['noire']} ] - {self.User.get_nickname(fromuser)} a tapé un mauvais mot de pass") self.send2socket(f":{self.Config.SERVICE_NICKNAME} NOTICE {fromuser} :Mot de passe incorrecte") else: @@ -1007,9 +983,14 @@ class Irc: self.send2socket(f':{dnickname} NOTICE {fromuser} : .editaccess [USER] [NEWPASSWORD] [NEWLEVEL]') return None - current_user = self.get_nickname(fromuser) - current_uid = self.get_uid(fromuser) - current_user_level = self.db_admin[current_uid]['level'] + get_admin = self.Admin.get_Admin(fromuser) + if get_admin is None: + self.send2socket(f':{dnickname} NOTICE {fromuser} : This user {fromuser} has no Admin access') + return None + + current_user = self.User.get_nickname(fromuser) + current_uid = self.User.get_uid(fromuser) + current_user_level = get_admin.level if user_new_level > 5: self.send2socket(f':{dnickname} NOTICE {fromuser} : Maximum authorized level is 5') @@ -1017,7 +998,7 @@ class Irc: # Rechercher le user dans la base de données. mes_donnees = {'user': user_to_edit} - query = f"SELECT user, level FROM {self.Base.DB_SCHEMA['admins']} WHERE user = :user" + query = f"SELECT user, level FROM {self.Config.table_admin} WHERE user = :user" result = self.Base.db_execute_query(query, mes_donnees) isUserExist = result.fetchone() @@ -1033,7 +1014,7 @@ class Irc: # Le user existe dans la base de données data_to_update = {'user': user_to_edit, 'password': user_password, 'level': user_new_level} - sql_update = f"UPDATE {self.Base.DB_SCHEMA['admins']} SET level = :level, password = :password WHERE user = :user" + sql_update = f"UPDATE {self.Config.table_admin} SET level = :level, password = :password WHERE user = :user" exec_query = self.Base.db_execute_query(sql_update, data_to_update) if exec_query.rowcount > 0: self.send2socket(f':{dnickname} NOTICE {fromuser} : User {user_to_edit} has been modified with level {str(user_new_level)}') @@ -1059,14 +1040,20 @@ class Irc: if len(cmd) < 3: self.send2socket(f':{dnickname} NOTICE {fromuser} : .delaccess [USER] [CONFIRMUSER]') return None + + get_admin = self.Admin.get_Admin(fromuser) + + if get_admin is None: + self.send2socket(f':{dnickname} NOTICE {fromuser} : This user {fromuser} has no admin access') + return None - current_user = self.get_nickname(fromuser) - current_uid = self.get_uid(fromuser) - current_user_level = self.db_admin[current_uid]['level'] + current_user = self.User.get_nickname(fromuser) + current_uid = self.User.get_uid(fromuser) + current_user_level = get_admin.level # Rechercher le user dans la base de données. mes_donnees = {'user': user_to_del} - query = f"SELECT user, level FROM {self.Base.DB_SCHEMA['admins']} WHERE user = :user" + query = f"SELECT user, level FROM {self.Config.table_admin} WHERE user = :user" result = self.Base.db_execute_query(query, mes_donnees) info_user = result.fetchone() @@ -1078,7 +1065,7 @@ class Irc: return None data_to_delete = {'user': user_to_del} - sql_delete = f"DELETE FROM {self.Base.DB_SCHEMA['admins']} WHERE user = :user" + sql_delete = f"DELETE FROM {self.Config.table_admin} WHERE user = :user" exec_query = self.Base.db_execute_query(sql_delete, data_to_delete) if exec_query.rowcount > 0: self.send2socket(f':{dnickname} NOTICE {fromuser} : User {user_to_del} has been deleted !') @@ -1090,8 +1077,9 @@ class Irc: help = '' count_level_definition = 0 - if uid in self.db_admin: - user_level = self.db_admin[uid]['level'] + get_admin = self.Admin.get_Admin(uid) + if not get_admin is None: + user_level = get_admin.level else: user_level = 0 @@ -1152,7 +1140,7 @@ class Irc: if 'mods.' + module_name in sys.modules: self.loaded_classes[class_name].unload() - self.Base.logs.info('Module Already Loaded ... reload the module ...') + self.Base.logs.info('Module Already Loaded ... reloading the module ...') the_module = sys.modules['mods.' + module_name] importlib.reload(the_module) @@ -1206,8 +1194,11 @@ class Irc: reason.append(cmd[i]) final_reason = ' '.join(reason) - self.db_uid.clear() #Vider UID_DB - self.db_chan = [] #Vider les salons + # self.db_uid.clear() #Vider UID_DB + # self.db_chan = [] #Vider les salons + + self.User.UID_DB.clear() # Clear User Object + self.Channel.UID_CHANNEL_DB.clear() # Clear Channel Object for class_name in self.loaded_classes: self.loaded_classes[class_name].unload() @@ -1223,7 +1214,7 @@ class Irc: self.Base.logs.debug(self.loaded_classes) - results = self.Base.db_execute_query(f'SELECT module FROM {self.Base.DB_SCHEMA["modules"]}') + results = self.Base.db_execute_query(f'SELECT module FROM {self.Config.table_module}') results = results.fetchall() if len(results) == 0: @@ -1248,37 +1239,30 @@ class Irc: self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :{str(running_thread_name)}") + case 'show_channels': + + for chan in self.Channel.UID_CHANNEL_DB: + list_nicknames: list = [] + for uid in chan.uids: + pattern = fr'[:|@|%|\+|~|\*]*' + parsed_UID = re.sub(pattern, '', uid) + list_nicknames.append(self.User.get_nickname(parsed_UID)) + + self.send2socket(f":{dnickname} NOTICE {fromuser} : Channel: {chan.name} - Users: {list_nicknames}") + case 'uptime': uptime = self.get_defender_uptime() self.send2socket(f':{dnickname} NOTICE {fromuser} : {uptime}') case 'copyright': - self.send2socket(f':{dnickname} NOTICE {fromuser} : # Defender V.{self.Base.DEFENDER_VERSION} Developped by adator® and dktmb® #') - - case 'sentinel': - # .sentinel on - activation = str(cmd[1]).lower() - service_id = self.Config.SERVICE_ID - - channel_to_dont_quit = [self.Config.SALON_JAIL, dchanlog] - - if activation == 'on': - for chan in self.db_chan: - if not chan in channel_to_dont_quit: - self.send2socket(f":{service_id} JOIN {chan}") - if activation == 'off': - for chan in self.db_chan: - if not chan in channel_to_dont_quit: - self.send2socket(f":{service_id} PART {chan}") + self.send2socket(f':{dnickname} NOTICE {fromuser} : # Defender V.{self.Config.current_version} Developped by adator® and dktmb® #') case 'checkversion': - if self.Base.check_for_new_version(): - self.send2socket(f':{dnickname} NOTICE {fromuser} : New Version available : {self.Base.DEFENDER_VERSION} >>> {self.Base.LATEST_DEFENDER_VERSION}') - self.send2socket(f':{dnickname} NOTICE {fromuser} : Please run (git pull origin main) in the current folder') - else: - self.send2socket(f':{dnickname} NOTICE {fromuser} : You have the latest version of defender') - pass + self.Base.create_thread( + self.thread_check_for_new_version, + (fromuser, ) + ) case _: pass diff --git a/core/loadConf.py b/core/loadConf.py index 0c15303..c078939 100644 --- a/core/loadConf.py +++ b/core/loadConf.py @@ -1,4 +1,5 @@ -import json, os +import json +from os import sep from typing import Union from dataclasses import dataclass, field @@ -45,20 +46,44 @@ class ConfigDataModel: DEBUG_LEVEL: int # Le niveau des logs DEBUG 10 | INFO 20 | WARNING 30 | ERROR 40 | CRITICAL 50 - CONFIG_COLOR: dict + CONFIG_COLOR: dict[str, str] + + table_admin: str + table_commande: str + table_log: str + table_module: str + + current_version: str + latest_version: str + db_name: str + db_path: str def __post_init__(self): # Initialiser SERVICE_ID après la création de l'objet self.SERVICE_ID:str = f"{self.SERVEUR_ID}AAAAAB" - class Config: def __init__(self): - import_config = self.__load_json_configuration() + self.ConfigObject: ConfigDataModel = self.__load_service_configuration() + return None - ConfigModel = ConfigDataModel( + def __load_json_service_configuration(self): + + conf_filename = f'core{sep}configuration.json' + with open(conf_filename, 'r') as configuration_data: + configuration:dict[str, Union[str, int, list, dict]] = json.load(configuration_data) + + for key, value in configuration['CONFIG_COLOR'].items(): + configuration['CONFIG_COLOR'][key] = str(value).encode('utf-8').decode('unicode_escape') + + return configuration + + def __load_service_configuration(self) -> ConfigDataModel: + import_config = self.__load_json_service_configuration() + + ConfigObject: ConfigDataModel = ConfigDataModel( SERVEUR_IP=import_config["SERVEUR_IP"], SERVEUR_HOSTNAME=import_config["SERVEUR_HOSTNAME"], SERVEUR_LINK=import_config["SERVEUR_LINK"], @@ -87,19 +112,15 @@ class Config: WHITELISTED_IP=import_config["WHITELISTED_IP"], GLINE_DURATION=import_config["GLINE_DURATION"], DEBUG_LEVEL=import_config["DEBUG_LEVEL"], - CONFIG_COLOR=import_config["CONFIG_COLOR"] + CONFIG_COLOR=import_config["CONFIG_COLOR"], + table_admin='sys_admins', + table_commande='sys_commandes', + table_log='sys_logs', + table_module='sys_modules', + current_version='', + latest_version='', + db_name='defender', + db_path=f'db{sep}' ) - self.ConfigModel = ConfigModel - return None - - def __load_json_configuration(self): - - conf_filename = f'core{os.sep}configuration.json' - with open(conf_filename, 'r') as configuration_data: - configuration:dict[str, Union[str, int, list, dict]] = json.load(configuration_data) - - for key, value in configuration['CONFIG_COLOR'].items(): - configuration['CONFIG_COLOR'][key] = value.encode('utf-8').decode('unicode_escape') - - return configuration + return ConfigObject diff --git a/mods/mod_defender.py b/mods/mod_defender.py index f96416e..1b067f0 100644 --- a/mods/mod_defender.py +++ b/mods/mod_defender.py @@ -1,3 +1,4 @@ +from dataclasses import dataclass, fields, field from datetime import datetime from typing import Union import re, socket, psutil, requests, json, time @@ -17,11 +18,47 @@ from core.irc import Irc class Defender(): + @dataclass + class ModConfModel: + reputation: int + reputation_timer: int + reputation_seuil: int + reputation_ban_all_chan: int + local_scan: int + psutil_scan: int + abuseipdb_scan: int + freeipapi_scan: int + cloudfilt_scan: int + flood: int + flood_message: int + flood_time: int + flood_timer: int + + @dataclass + class ReputationModel: + uid: str + nickname: str + username: str + hostname: str + umodes: str + vhost: str + ip: str + score: int + isWebirc: bool + secret_code: str + connected_datetime: str + updated_datetime: str + + UID_REPUTATION_DB: list[ReputationModel] = [] + def __init__(self, ircInstance:Irc) -> None: self.Irc = ircInstance # Ajouter l'object mod_irc a la classe ( Obligatoire ) self.Config = ircInstance.Config # Ajouter la configuration a la classe ( Obligatoire ) + self.User = ircInstance.User # Importer les liste des User connectés ( Obligatoire ) + self.Channel = ircInstance.Channel # Ajouter la liste des salons ( Obligatoire ) self.Base = ircInstance.Base # Ajouter l'objet Base au module ( Obligatoire ) + self.Logs = ircInstance.Base.logs # Ajouter l'objet log ( Obligatoire ) self.timeout = self.Config.API_TIMEOUT # API Timeout self.freeipapi_remote_ip:list = [] # Liste qui va contenir les adresses ip a scanner avec freeipapi @@ -37,14 +74,14 @@ class Defender(): self.localscan_isRunning:bool = True self.reputationTimer_isRunning:bool = True - self.Irc.Base.logs.info(f'Module {self.__class__.__name__} loaded ...') + self.Logs.info(f'Module {self.__class__.__name__} loaded ...') # Créer les nouvelles commandes du module self.commands_level = { 0: ['code'], 1: ['join','part', 'info'], 2: ['q', 'dq', 'o', 'do', 'h', 'dh', 'v', 'dv', 'b', 'ub','k', 'kb'], - 3: ['reputation','proxy_scan', 'flood', 'status', 'timer','show_reputation', 'show_users'] + 3: ['reputation','proxy_scan', 'flood', 'status', 'timer','show_reputation', 'show_users', 'sentinel'] } self.__set_commands(self.commands_level) # Enrigstrer les nouvelles commandes dans le code @@ -52,7 +89,7 @@ class Defender(): self.init_defender() # Créer une methode init ( ce n'es pas obligatoire ) - def __set_commands(self, commands:dict) -> None: + def __set_commands(self, commands:dict[int, list[str]]) -> None: """### Rajoute les commandes du module au programme principal Args: @@ -125,7 +162,7 @@ class Defender(): def init_defender(self) -> bool: - self.db_reputation = {} # Definir la variable qui contiendra la liste des user concerné par la réputation + # self.db_reputation = {} # Definir la variable qui contiendra la liste des user concerné par la réputation self.flood_system = {} # Variable qui va contenir les users self.reputation_first_connexion = {'ip': '', 'score': -1} # Contient les premieres informations de connexion # 13c34603fee4d2941a2c443cc5c77fd750757ca2a2c1b304bd0f418aff80c24be12651d1a3cfe674 @@ -137,23 +174,16 @@ class Defender(): self.join_saved_channels() # Variable qui va contenir les options de configuration du module Defender - self.defConfig = { - 'reputation': 0, - 'reputation_timer': 0, - 'reputation_seuil': 600, - 'reputation_ban_all_chan': 0, - 'local_scan': 0, - 'psutil_scan': 0, - 'abuseipdb_scan': 0, - 'freeipapi_scan': 0, - 'cloudfilt_scan': 0, - 'flood': 0, - 'flood_message': 5, - 'flood_time': 1, - 'flood_timer': 20 - } + self.ModConfig = self.ModConfModel( + reputation=0, reputation_timer=0, reputation_seuil=10, reputation_ban_all_chan=0, + local_scan=0, psutil_scan=0, abuseipdb_scan=0, freeipapi_scan=0, cloudfilt_scan=0, + flood=0, flood_message=5, flood_time=1, flood_timer=20 + ) - # Syncrhoniser la variable defConfig avec la configuration de la base de données. + # Logger en debug la variable de configuration + self.Logs.debug(self.ModConfig) + + # Syncrhoniser l'objet ModConfig avec la configuration de la base de données. self.sync_db_configuration() # Démarrer les threads pour démarrer les api @@ -176,26 +206,27 @@ class Defender(): # Si le resultat ne contient aucune valeur if not result: # Base de données vide Inserer la premiere configuration - for param, value in self.defConfig.items(): - mes_donnees = {'datetime': self.Base.get_datetime(), 'parameter': param, 'value': value} + + for field in fields(self.ModConfig): + mes_donnees = {'datetime': self.Base.get_datetime(), 'parameter': field.name, 'value': getattr(self.ModConfig, field.name)} insert = self.Base.db_execute_query('INSERT INTO def_config (datetime, parameter, value) VALUES (:datetime, :parameter, :value)', mes_donnees) insert_rows = insert.rowcount if insert_rows > 0: - self.Irc.Base.logs.debug(f'Row affected into def_config : {insert_rows}') + self.Logs.debug(f'Row affected into def_config : {insert_rows}') # Inserer une nouvelle configuration - for param, value in self.defConfig.items(): - mes_donnees = {'parameter': param} + for field in fields(self.ModConfig): + mes_donnees = {'parameter': field.name} search_param_query = "SELECT parameter, value FROM def_config WHERE parameter = :parameter" result = self.Base.db_execute_query(search_param_query, mes_donnees) isParamExist = result.fetchone() if isParamExist is None: - mes_donnees = {'datetime': self.Base.get_datetime(), 'parameter': param, 'value': value} + mes_donnees = {'datetime': self.Base.get_datetime(), 'parameter': field.name, 'value': getattr(self.ModConfig, field.name)} insert = self.Base.db_execute_query('INSERT INTO def_config (datetime, parameter, value) VALUES (:datetime, :parameter, :value)', mes_donnees) insert_rows = insert.rowcount if insert_rows > 0: - self.Irc.Base.logs.debug(f'DB_Def_config - new param included : {insert_rows}') + self.Logs.debug(f'DB_Def_config - new param included : {insert_rows}') # Supprimer un parameter si il n'existe plus dans la variable global query = "SELECT parameter FROM def_config" @@ -203,12 +234,12 @@ class Defender(): dbresult = response.fetchall() for dbparam in dbresult: - if not dbparam[0] in self.defConfig: + if not hasattr(self.ModConfig, dbparam[0]): mes_donnees = {'parameter': dbparam[0]} delete = self.Base.db_execute_query('DELETE FROM def_config WHERE parameter = :parameter', mes_donnees) row_affected = delete.rowcount if row_affected > 0: - self.Irc.Base.logs.debug(f'DB_Def_config - param [{dbparam[0]}] has been deleted') + self.Logs.debug(f'DB_Def_config - param [{dbparam[0]}] has been deleted') # Synchroniser la base de données avec la variable global query = "SELECT parameter, value FROM def_config" @@ -216,16 +247,27 @@ class Defender(): result = response.fetchall() for param, value in result: - self.defConfig[param] = self.Base.int_if_possible(value) + setattr(self.ModConfig, param, self.Base.int_if_possible(value)) - self.Irc.Base.logs.debug(self.defConfig) + self.Logs.debug(self.ModConfig) return None - def update_db_configuration(self, param:str, value:str) -> None: + def update_db_configuration(self, param:str, value:str) -> bool: + """Check the parameter if it exist and return True if success - if not param in self.defConfig: - self.Irc.Base.logs.error(f"Le parametre {param} n'existe pas dans la variable global") - return None + Args: + param (str): The parameter name + value (str): The value + + Returns: + bool: True if success or False + """ + response = False + + # Check if the param exist + if not hasattr(self.ModConfig, param): + self.Logs.error(f"Le parametre {param} n'existe pas dans la variable global") + return response mes_donnees = {'parameter': param} search_param_query = "SELECT parameter, value FROM def_config WHERE parameter = :parameter" @@ -237,10 +279,13 @@ class Defender(): update = self.Base.db_execute_query('UPDATE def_config SET datetime = :datetime, value = :value WHERE parameter = :parameter', mes_donnees) updated_rows = update.rowcount if updated_rows > 0: - self.defConfig[param] = self.Base.int_if_possible(value) - self.Irc.Base.logs.debug(f'DB_Def_config - new param updated : {param} {value}') + setattr(self.ModConfig, param, self.Base.int_if_possible(value)) + self.Logs.debug(f'DB_Def_config - new param updated : {param} {value}') + response = True - self.Irc.Base.logs.debug(self.defConfig) + self.Logs.debug(self.ModConfig) + + return response def add_defender_channel(self, channel:str) -> bool: """Cette fonction ajoute les salons de join de Defender @@ -276,66 +321,82 @@ class Defender(): else: return False - def insert_db_reputation(self, uid:str, ip:str, nickname:str, username:str, hostname:str, umodes:str, vhost:str, score:int, isWebirc:bool) -> None: - - currentDateTime = self.Base.get_datetime() - secret_code = self.Base.get_random(8) - # Vérifier si le uid existe déja - if uid in self.db_reputation: - return None - - self.db_reputation[uid] = { - 'nickname': nickname, - 'username': username, - 'hostname': hostname, - 'umodes': umodes, - 'vhost': vhost, - 'ip': ip, - 'score': score, - 'isWebirc': isWebirc, - 'secret_code': secret_code, - 'connected_datetime': currentDateTime, - 'updated_datetime': currentDateTime - } - - return None - - def update_db_reputation(self, uidornickname:str, newnickname:str) -> None: + def reputation_insert(self, reputationModel: ReputationModel) -> bool: - uid = self.Irc.get_uid(uidornickname) - currentDateTime = self.Base.get_datetime() - secret_code = self.Base.get_random(8) + response = False - if not uid in self.Irc.db_uid: - self.Irc.Base.logs.error(f'Etrange UID {uid}') - return None + # Check if the user already exist + for reputation in self.UID_REPUTATION_DB: + if reputation.uid == reputationModel.uid: + return response + + self.UID_REPUTATION_DB.append(reputationModel) + self.Logs.debug(f'Reputation inserted: {reputationModel}') + response = True - if uid in self.db_reputation: - self.db_reputation[uid]['nickname'] = newnickname - self.db_reputation[uid]['updated_datetime'] = currentDateTime - self.db_reputation[uid]['secret_code'] = secret_code - else: - self.Irc.Base.logs.error(f"L'UID {uid} n'existe pas dans REPUTATION_DB") + return response - return None + def reputation_delete(self, uidornickname:str) -> bool: - def delete_db_reputation(self, uid:str) -> None: - """Cette fonction va supprimer le UID du dictionnaire self.db_reputation + response = False + + for record in self.UID_REPUTATION_DB: + if record.uid == uidornickname: + self.UID_REPUTATION_DB.remove(record) + response = True + self.Logs.debug(f'UID ({record.uid}) has been deleted') + elif record.nickname == uidornickname: + self.UID_REPUTATION_DB.remove(record) + response = True + self.Logs.debug(f'Nickname ({record.nickname}) has been deleted') + + if not response: + self.Logs.critical(f'The UID {uidornickname} was not deleted') + + return response + + def reputation_check(self, uidornickname: str) -> bool: + """Check if the uid exist in the dataclass Args: - uid (str): le uid ou le nickname du user + uidornickname (str): UID or nickname of the user + + Returns: + bool: True if the user exist in the reputation dataclass + """ + response = False + + for reputation in self.UID_REPUTATION_DB: + if reputation.uid == uidornickname: + response = True + elif reputation.nickname == uidornickname: + response = True + + return response + + def reputation_get_Reputation(self, uidornickname: str) -> Union[ReputationModel, None]: + """Get a user reputation object with all information + + Args: + uidornickname (str): The UID or Nickname of the suspected user + + Returns: + ReputationModel or None: Return the Reputation Model or None if the user doesn't exist """ - # Si le UID existe dans le dictionnaire alors le supprimer - if uid in self.db_reputation: - # Si le nickname existe dans le dictionnaire alors le supprimer - del self.db_reputation[uid] - self.Irc.Base.logs.debug(f"Le UID {uid} a été supprimé du REPUTATION_DB") + Reputation_user = None + for reputation in self.UID_REPUTATION_DB: + if reputation.uid == uidornickname: + Reputation_user = reputation + elif reputation.nickname == uidornickname: + Reputation_user = reputation + + return Reputation_user def insert_db_trusted(self, uid: str, nickname:str) -> None: - uid = self.Irc.get_uid(uid) - nickname = self.Irc.get_nickname(nickname) + uid = self.User.get_uid(uid) + nickname = self.User.get_nickname(nickname) query = "SELECT id FROM def_trusted WHERE user = ?" exec_query = self.Base.db_execute_query(query, {"user": nickname}) @@ -378,13 +439,12 @@ class Defender(): int: Temps de connexion de l'utilisateur en secondes """ - get_uid = self.Irc.get_uid(uidornickname) - - if not get_uid in self.Irc.db_uid: + get_user = self.User.get_User(uidornickname) + if get_user is None: return 0 # Convertir la date enregistrée dans UID_DB en un objet {datetime} - connected_time_string = self.Irc.db_uid[get_uid]['datetime'] + connected_time_string = get_user.connexion_datetime if type(connected_time_string) == datetime: connected_time = connected_time_string else: @@ -410,51 +470,55 @@ class Defender(): # - Defender devra libérer l'utilisateur et l'envoyer vers un salon défini dans la configuration {welcome_chan} # - Defender devra intégrer une liste d'IDs (pseudo/host) exemptés de 'Reputation security' malgré un score de rép. faible et un pseudo non enregistré. try: + + get_reputation = self.reputation_get_Reputation(uid) - if not uid in self.db_reputation: + if get_reputation is None: + self.Logs.error(f'UID {uid} has not been found') return False - code = self.db_reputation[uid]['secret_code'] salon_logs = self.Config.SERVICE_CHANLOG salon_jail = self.Config.SALON_JAIL - jailed_nickname = self.db_reputation[uid]['nickname'] - jailed_score = self.db_reputation[uid]['score'] + + code = get_reputation.secret_code + jailed_nickname = get_reputation.nickname + jailed_score = get_reputation.score color_red = self.Config.CONFIG_COLOR['rouge'] color_black = self.Config.CONFIG_COLOR['noire'] color_bold = self.Config.CONFIG_COLOR['gras'] service_id = self.Config.SERVICE_ID service_prefix = self.Config.SERVICE_PREFIX - reputation_ban_all_chan = self.Base.int_if_possible(self.defConfig['reputation_ban_all_chan']) + reputation_ban_all_chan = self.Base.int_if_possible(self.ModConfig.reputation_ban_all_chan) - if not self.db_reputation[uid]['isWebirc']: + if not get_reputation.isWebirc: # Si le user ne vient pas de webIrc self.Irc.send2socket(f":{service_id} SAJOIN {jailed_nickname} {salon_jail}") self.Irc.send2socket(f":{service_id} PRIVMSG {salon_logs} :[{color_red} REPUTATION {color_black}] : Connexion de {jailed_nickname} ({jailed_score}) ==> {salon_jail}") self.Irc.send2socket(f":{service_id} NOTICE {jailed_nickname} :[{color_red} {jailed_nickname} {color_black}] : Merci de tapez la commande suivante {color_bold}{service_prefix}code {code}{color_bold}") if reputation_ban_all_chan == 1: - for chan in self.Irc.db_chan: - if chan != salon_jail: - self.Irc.send2socket(f":{service_id} MODE {chan} +b {jailed_nickname}!*@*") - self.Irc.send2socket(f":{service_id} KICK {chan} {jailed_nickname}") + for chan in self.Channel.UID_CHANNEL_DB: + if chan.name != salon_jail: + self.Irc.send2socket(f":{service_id} MODE {chan.name} +b {jailed_nickname}!*@*") + self.Irc.send2socket(f":{service_id} KICK {chan.name} {jailed_nickname}") - self.Irc.Base.logs.info(f"system_reputation : {jailed_nickname} à été capturé par le système de réputation") - # self.Irc.create_ping_timer(int(self.defConfig['reputation_timer']) * 60, 'Defender', 'system_reputation_timer') - # self.Base.create_timer(int(self.defConfig['reputation_timer']) * 60, self.system_reputation_timer) + self.Logs.info(f"system_reputation : {jailed_nickname} à été capturé par le système de réputation") + # self.Irc.create_ping_timer(int(self.ModConfig.reputation_timer) * 60, 'Defender', 'system_reputation_timer') + # self.Base.create_timer(int(self.ModConfig.reputation_timer) * 60, self.system_reputation_timer) else: - self.Irc.Base.logs.info(f"system_reputation : {jailed_nickname} à été supprimé du système de réputation car connecté via WebIrc ou il est dans la 'Trusted list'") - self.delete_db_reputation(uid) + self.Logs.info(f"system_reputation : {jailed_nickname} à été supprimé du système de réputation car connecté via WebIrc ou il est dans la 'Trusted list'") + self.reputation_delete(uid) except IndexError as e: - self.Irc.Base.logs.error(f"system_reputation : {str(e)}") + self.Logs.error(f"system_reputation : {str(e)}") def system_reputation_timer(self) -> None: try: - reputation_flag = int(self.defConfig['reputation']) - reputation_timer = int(self.defConfig['reputation_timer']) - reputation_seuil = self.defConfig['reputation_seuil'] - ban_all_chan = self.Base.int_if_possible(self.defConfig['reputation_ban_all_chan']) + reputation_flag = self.ModConfig.reputation + reputation_timer = self.ModConfig.reputation_timer + reputation_seuil = self.ModConfig.reputation_seuil + ban_all_chan = self.Base.int_if_possible(self.ModConfig.reputation_ban_all_chan) service_id = self.Config.SERVICE_ID dchanlog = self.Config.SERVICE_CHANLOG color_red = self.Config.CONFIG_COLOR['rouge'] @@ -468,31 +532,32 @@ class Defender(): uid_to_clean = [] - for uid in self.db_reputation: - if not self.db_reputation[uid]['isWebirc']: # Si il ne vient pas de WebIRC + for user in self.UID_REPUTATION_DB: + if not user.isWebirc: # Si il ne vient pas de WebIRC # self.Irc.debug(f"Nickname: {self.db_reputation[uid]['nickname']} | uptime: {self.get_user_uptime_in_minutes(uid)} | reputation time: {reputation_timer}") - if self.get_user_uptime_in_minutes(uid) >= reputation_timer and int(self.db_reputation[uid]['score']) <= int(reputation_seuil): - self.Irc.send2socket(f":{service_id} PRIVMSG {dchanlog} :[{color_red} REPUTATION {color_black}] : Action sur {self.db_reputation[uid]['nickname']} aprés {str(reputation_timer)} minutes d'inactivité") + if self.get_user_uptime_in_minutes(user.uid) >= reputation_timer and int(user.score) <= int(reputation_seuil): + self.Irc.send2socket(f":{service_id} PRIVMSG {dchanlog} :[{color_red} REPUTATION {color_black}] : Action sur {user.nickname} aprés {str(reputation_timer)} minutes d'inactivité") # if not system_reputation_timer_action(cglobal['reputation_timer_action'], uid, self.db_reputation[uid]['nickname']): # return False - self.Irc.send2socket(f":{service_id} KILL {self.db_reputation[uid]['nickname']} After {str(reputation_timer)} minutes of inactivity you should reconnect and type the password code ") + self.Irc.send2socket(f":{service_id} KILL {user.nickname} After {str(reputation_timer)} minutes of inactivity you should reconnect and type the password code ") - self.Irc.Base.logs.info(f"Nickname: {self.db_reputation[uid]['nickname']} KILLED after {str(reputation_timer)} minutes of inactivity") + self.Logs.info(f"Nickname: {user.nickname} KILLED after {str(reputation_timer)} minutes of inactivity") - uid_to_clean.append(uid) + uid_to_clean.append(user.uid) for uid in uid_to_clean: # Suppression des éléments dans {UID_DB} et {REPUTATION_DB} - for chan in self.Irc.db_chan: - if chan != salon_jail and ban_all_chan == 1: - self.Irc.send2socket(f":{service_id} MODE {chan} -b {self.db_reputation[uid]['nickname']}!*@*") + for chan in self.Channel.UID_CHANNEL_DB: + if chan.name != salon_jail and ban_all_chan == 1: + get_user_reputation = self.reputation_get_Reputation(uid) + self.Irc.send2socket(f":{service_id} MODE {chan.name} -b {get_user_reputation.nickname}!*@*") # Lorsqu'un utilisateur quitte, il doit être supprimé de {UID_DB}. - self.Irc.delete_db_uid(uid) - self.delete_db_reputation(uid) + self.User.delete(uid) + self.reputation_delete(uid) except AssertionError as ae: - self.Irc.Base.logs.error(f'Assertion Error -> {ae}') + self.Logs.error(f'Assertion Error -> {ae}') def thread_reputation_timer(self) -> None: try: @@ -528,22 +593,22 @@ class Defender(): def flood(self, detected_user:str, channel:str) -> None: - if self.defConfig['flood'] == 0: + if self.ModConfig.flood == 0: return None if not '#' in channel: return None - flood_time = self.defConfig['flood_time'] - flood_message = self.defConfig['flood_message'] - flood_timer = self.defConfig['flood_timer'] + flood_time = self.ModConfig.flood_time + flood_message = self.ModConfig.flood_message + flood_timer = self.ModConfig.flood_timer service_id = self.Config.SERVICE_ID dnickname = self.Config.SERVICE_NICKNAME color_red = self.Config.CONFIG_COLOR['rouge'] color_bold = self.Config.CONFIG_COLOR['gras'] - get_detected_uid = self.Irc.get_uid(detected_user) - get_detected_nickname = self.Irc.get_nickname(detected_user) + get_detected_uid = self.User.get_uid(detected_user) + get_detected_nickname = self.User.get_nickname(detected_user) unixtime = self.Base.get_unixtime() get_diff_secondes = 0 @@ -612,15 +677,15 @@ class Defender(): newSocket.shutdown(socket.SHUT_RDWR) newSocket.close() except (socket.timeout, ConnectionRefusedError): - self.Base.logs.info(f"Le port {remote_ip}:{str(port)} est fermé") + self.Logs.info(f"Le port {remote_ip}:{str(port)} est fermé") except AttributeError as ae: - self.Base.logs.warning(f"AttributeError ({remote_ip}): {ae}") + self.Logs.warning(f"AttributeError ({remote_ip}): {ae}") except socket.gaierror as err: - self.Base.logs.warning(f"Address Info Error ({remote_ip}): {err}") + self.Logs.warning(f"Address Info Error ({remote_ip}): {err}") finally: # newSocket.shutdown(socket.SHUT_RDWR) newSocket.close() - self.Base.logs.info('=======> Fermeture de la socket') + self.Logs.info('=======> Fermeture de la socket') pass @@ -641,29 +706,34 @@ class Defender(): return None except ValueError as ve: - self.Base.logs.warning(f"thread_local_scan Error : {ve}") + self.Logs.warning(f"thread_local_scan Error : {ve}") def get_ports_connexion(self, remote_ip: str) -> list[int]: - """psutil_scan + """psutil_scan for Linux Args: - remote_ip (str): _description_ + remote_ip (str): The remote ip address Returns: - list[int]: _description_ + list[int]: list of ports """ - if remote_ip in self.Config.WHITELISTED_IP: - return None + try: + if remote_ip in self.Config.WHITELISTED_IP: + return None - connections = psutil.net_connections(kind='inet') + connections = psutil.net_connections(kind='inet') - matching_ports = [conn.raddr.port for conn in connections if conn.raddr and conn.raddr.ip == remote_ip] - self.Base.logs.info(f"Connexion of {remote_ip} using ports : {str(matching_ports)}") + matching_ports = [conn.raddr.port for conn in connections if conn.raddr and conn.raddr.ip == remote_ip] + self.Logs.info(f"Connexion of {remote_ip} using ports : {str(matching_ports)}") - return matching_ports + return matching_ports + + except psutil.AccessDenied as ad: + self.Logs.critical(f'psutil_scan: Permission error: {ad}') def thread_psutil_scan(self) -> None: try: + while self.psutil_isRunning: list_to_remove:list = [] @@ -679,7 +749,7 @@ class Defender(): return None except ValueError as ve: - self.Base.logs.warning(f"thread_psutil_scan Error : {ve}") + self.Logs.warning(f"thread_psutil_scan Error : {ve}") def abuseipdb_scan(self, remote_ip:str) -> Union[dict[str, any], None]: """Analyse l'ip avec AbuseIpDB @@ -693,7 +763,7 @@ class Defender(): """ if remote_ip in self.Config.WHITELISTED_IP: return None - if self.defConfig['abuseipdb_scan'] == 0: + if self.ModConfig.abuseipdb_scan == 0: return None if self.abuseipdb_key == '': @@ -741,14 +811,15 @@ class Defender(): return result except KeyError as ke: - self.Base.logs.error(f"AbuseIpDb KeyError : {ke}") + self.Logs.error(f"AbuseIpDb KeyError : {ke}") except requests.ReadTimeout as rt: - self.Base.logs.error(f"AbuseIpDb Timeout : {rt}") + self.Logs.error(f"AbuseIpDb Timeout : {rt}") except requests.ConnectionError as ce: - self.Base.logs.error(f"AbuseIpDb Connection Error : {ce}") + self.Logs.error(f"AbuseIpDb Connection Error : {ce}") def thread_abuseipdb_scan(self) -> None: try: + while self.abuseipdb_isRunning: list_to_remove:list = [] @@ -764,7 +835,7 @@ class Defender(): return None except ValueError as ve: - self.Base.logs.error(f"thread_abuseipdb_scan Error : {ve}") + self.Logs.error(f"thread_abuseipdb_scan Error : {ve}") def freeipapi_scan(self, remote_ip:str) -> Union[dict[str, any], None]: """Analyse l'ip avec Freeipapi @@ -778,7 +849,7 @@ class Defender(): """ if remote_ip in self.Config.WHITELISTED_IP: return None - if self.defConfig['freeipapi_scan'] == 0: + if self.ModConfig.freeipapi_scan == 0: return None service_id = self.Config.SERVICE_ID @@ -799,10 +870,10 @@ class Defender(): try: status_code = response.status_code if status_code == 429: - self.Base.logs.warning(f'Too Many Requests - The rate limit for the API has been exceeded.') + self.Logs.warning(f'Too Many Requests - The rate limit for the API has been exceeded.') return None elif status_code != 200: - self.Base.logs.warning(f'status code = {str(status_code)}') + self.Logs.warning(f'status code = {str(status_code)}') return None result = { @@ -818,10 +889,11 @@ class Defender(): return result except KeyError as ke: - self.Base.logs.error(f"FREEIPAPI_SCAN KeyError : {ke}") + self.Logs.error(f"FREEIPAPI_SCAN KeyError : {ke}") def thread_freeipapi_scan(self) -> None: try: + while self.freeipapi_isRunning: list_to_remove:list = [] @@ -837,7 +909,7 @@ class Defender(): return None except ValueError as ve: - self.Base.logs.error(f"thread_freeipapi_scan Error : {ve}") + self.Logs.error(f"thread_freeipapi_scan Error : {ve}") def cloudfilt_scan(self, remote_ip:str) -> Union[dict[str, any], None]: """Analyse l'ip avec cloudfilt @@ -851,7 +923,7 @@ class Defender(): """ if remote_ip in self.Config.WHITELISTED_IP: return None - if self.defConfig['cloudfilt_scan'] == 0: + if self.ModConfig.cloudfilt_scan == 0: return None if self.cloudfilt_key == '': return None @@ -875,7 +947,7 @@ class Defender(): try: status_code = response.status_code if status_code != 200: - self.Base.logs.warning(f'Error connecting to cloudfilt API | Code: {str(status_code)}') + self.Logs.warning(f'Error connecting to cloudfilt API | Code: {str(status_code)}') return None result = { @@ -894,11 +966,12 @@ class Defender(): return result except KeyError as ke: - self.Base.logs.error(f"CLOUDFILT_SCAN KeyError : {ke}") + self.Logs.error(f"CLOUDFILT_SCAN KeyError : {ke}") return None def thread_cloudfilt_scan(self) -> None: try: + while self.cloudfilt_isRunning: list_to_remove:list = [] @@ -914,7 +987,7 @@ class Defender(): return None except ValueError as ve: - self.Base.logs.error(f"Thread_cloudfilt_scan Error : {ve}") + self.Logs.error(f"Thread_cloudfilt_scan Error : {ve}") def cmd(self, data:list) -> None: @@ -936,24 +1009,24 @@ class Defender(): return None # self.Base.scan_ports(cmd[2]) - if self.defConfig['local_scan'] == 1 and not cmd[2] in self.Config.WHITELISTED_IP: + if self.ModConfig.local_scan == 1 and not cmd[2] in self.Config.WHITELISTED_IP: self.localscan_remote_ip.append(cmd[2]) - if self.defConfig['psutil_scan'] == 1 and not cmd[2] in self.Config.WHITELISTED_IP: + if self.ModConfig.psutil_scan == 1 and not cmd[2] in self.Config.WHITELISTED_IP: self.psutil_remote_ip.append(cmd[2]) - if self.defConfig['abuseipdb_scan'] == 1 and not cmd[2] in self.Config.WHITELISTED_IP: + if self.ModConfig.abuseipdb_scan == 1 and not cmd[2] in self.Config.WHITELISTED_IP: self.abuseipdb_remote_ip.append(cmd[2]) - if self.defConfig['freeipapi_scan'] == 1 and not cmd[2] in self.Config.WHITELISTED_IP: + if self.ModConfig.freeipapi_scan == 1 and not cmd[2] in self.Config.WHITELISTED_IP: self.freeipapi_remote_ip.append(cmd[2]) - if self.defConfig['cloudfilt_scan'] == 1 and not cmd[2] in self.Config.WHITELISTED_IP: + if self.ModConfig.cloudfilt_scan == 1 and not cmd[2] in self.Config.WHITELISTED_IP: self.cloudfilt_remote_ip.append(cmd[2]) # Possibilité de déclancher les bans a ce niveau. - except IndexError: - self.Irc.Base.logs.error(f'cmd reputation: index error') + except IndexError as ie: + self.Logs.error(f'cmd reputation: index error: {ie}') match cmd[2]: @@ -961,7 +1034,7 @@ class Defender(): cmd.pop(0) user_trigger = str(cmd[0]).replace(':','') channel = cmd[2] - find_nickname = self.Irc.get_nickname(user_trigger) + find_nickname = self.User.get_nickname(user_trigger) self.flood(find_nickname, channel) case 'UID': @@ -984,8 +1057,8 @@ class Defender(): umodes = str(cmd[9]) vhost = str(cmd[10]) - reputation_flag = self.Base.int_if_possible(self.defConfig['reputation']) - reputation_seuil = self.Base.int_if_possible(self.defConfig['reputation_seuil']) + reputation_flag = self.Base.int_if_possible(self.ModConfig.reputation) + reputation_seuil = self.Base.int_if_possible(self.ModConfig.reputation_seuil) if self.Irc.INIT == 0: # A chaque nouvelle connexion chargé les données dans reputation @@ -1001,35 +1074,74 @@ class Defender(): if not re.match(fr'^.*[S|o?].*$', umodes): if reputation_flag == 1 and int(client_score) <= int(reputation_seuil): # if not db_isTrusted_user(user_id): - self.insert_db_reputation(uid, client_ip, nickname, username, hostname, umodes, vhost, client_score, isWebirc) + + # get user information + get_user = self.User.get_User(uid) + if get_user is None: + self.Logs.error(f'This UID {uid} does not exisit') + + # self.insert_db_reputation(uid, client_ip, nickname, username, hostname, umodes, vhost, client_score, isWebirc) + + currentDateTime = self.Base.get_datetime() + self.reputation_insert( + self.ReputationModel( + uid=uid, + nickname=nickname, + username=username, + hostname=hostname, + umodes=umodes, + vhost=vhost, + ip=client_ip, + score=client_score, + secret_code=self.Base.get_random(8), + isWebirc=isWebirc, + connected_datetime=currentDateTime, + updated_datetime=currentDateTime + ) + ) # self.Irc.send2socket(f":{service_id} WHOIS {nickname}") - if uid in self.db_reputation: + if self.reputation_check(uid): if reputation_flag == 1 and int(client_score) <= int(reputation_seuil): self.system_reputation(uid) - self.Base.logs.info('Démarrer le systeme de reputation') + self.Logs.info('Démarrer le systeme de reputation') case 'SJOIN': # ['@msgid=F9B7JeHL5pj9nN57cJ5pEr;time=2023-12-28T20:47:24.305Z', ':001', 'SJOIN', '1702138958', '#welcome', ':0015L1AHL'] try: cmd.pop(0) parsed_chan = cmd[3] - self.Irc.insert_db_chan(parsed_chan) + + ''' + mode = '' + if len(cmd) > 4: + mode = cmd[4] + + self.Channel.insert( + self.Channel.ChannelModel( + name=parsed_chan, + mode=mode + ) + ) + ''' - if self.defConfig['reputation'] == 1: + if self.ModConfig.reputation == 1: parsed_UID = cmd[4] pattern = fr'^:[@|%|\+|~|\*]*' parsed_UID = re.sub(pattern, '', parsed_UID) - if parsed_UID in self.db_reputation: - # print(f"====> {str(self.db_reputation)}") - isWebirc = self.db_reputation[parsed_UID]['isWebirc'] - if self.defConfig['reputation_ban_all_chan'] == 1 and not isWebirc: - if parsed_chan != self.Config.SALON_JAIL: - self.Irc.send2socket(f":{service_id} MODE {parsed_chan} +b {self.db_reputation[parsed_UID]['nickname']}!*@*") - self.Irc.send2socket(f":{service_id} KICK {parsed_chan} {self.db_reputation[parsed_UID]['nickname']}") - self.Base.logs.debug(f'SJOIN parsed_uid : {parsed_UID}') + get_reputation = self.reputation_get_Reputation(parsed_UID) + + if not get_reputation is None: + isWebirc = get_reputation.isWebirc + + if self.ModConfig.reputation_ban_all_chan == 1 and not isWebirc: + if parsed_chan != self.Config.SALON_JAIL: + self.Irc.send2socket(f":{service_id} MODE {parsed_chan} +b {get_reputation.nickname}!*@*") + self.Irc.send2socket(f":{service_id} KICK {parsed_chan} {get_reputation.nickname}") + + self.Logs.debug(f'SJOIN parsed_uid : {parsed_UID}') except KeyError as ke: - self.Base.logs.error(f"key error SJOIN : {ke}") + self.Logs.error(f"key error SJOIN : {ke}") case 'SLOG': # self.Base.scan_ports(cmd[7]) @@ -1038,19 +1150,19 @@ class Defender(): if not self.Base.is_valid_ip(cmd[7]): return None - if self.defConfig['local_scan'] == 1 and not cmd[7] in self.Config.WHITELISTED_IP: + if self.ModConfig.local_scan == 1 and not cmd[7] in self.Config.WHITELISTED_IP: self.localscan_remote_ip.append(cmd[7]) - if self.defConfig['psutil_scan'] == 1 and not cmd[7] in self.Config.WHITELISTED_IP: + if self.ModConfig.psutil_scan == 1 and not cmd[7] in self.Config.WHITELISTED_IP: self.psutil_remote_ip.append(cmd[7]) - if self.defConfig['abuseipdb_scan'] == 1 and not cmd[7] in self.Config.WHITELISTED_IP: + if self.ModConfig.abuseipdb_scan == 1 and not cmd[7] in self.Config.WHITELISTED_IP: self.abuseipdb_remote_ip.append(cmd[7]) - if self.defConfig['freeipapi_scan'] == 1 and not cmd[7] in self.Config.WHITELISTED_IP: + if self.ModConfig.freeipapi_scan == 1 and not cmd[7] in self.Config.WHITELISTED_IP: self.freeipapi_remote_ip.append(cmd[7]) - if self.defConfig['cloudfilt_scan'] == 1 and not cmd[7] in self.Config.WHITELISTED_IP: + if self.ModConfig.cloudfilt_scan == 1 and not cmd[7] in self.Config.WHITELISTED_IP: self.cloudfilt_remote_ip.append(cmd[7]) case 'NICK': @@ -1059,40 +1171,48 @@ class Defender(): try: cmd.pop(0) uid = str(cmd[0]).replace(':','') - oldnick = self.db_reputation[uid]['nickname'] + get_Reputation = self.reputation_get_Reputation(uid) + + if get_Reputation is None: + self.Logs.debug(f'This UID: {uid} is not listed in the reputation dataclass') + return None + + oldnick = get_Reputation.nickname newnickname = cmd[2] jail_salon = self.Config.SALON_JAIL service_id = self.Config.SERVICE_ID - self.update_db_reputation(uid, newnickname) + # self.update_db_reputation(uid, newnickname) + get_Reputation.nickname = newnickname - if uid in self.db_reputation: - for chan in self.Irc.db_chan: - if chan != jail_salon: - self.Irc.send2socket(f":{service_id} MODE {chan} -b {oldnick}!*@*") - self.Irc.send2socket(f":{service_id} MODE {chan} +b {newnickname}!*@*") + for chan in self.Channel.UID_CHANNEL_DB: + if chan.name != jail_salon: + self.Irc.send2socket(f":{service_id} MODE {chan.name} -b {oldnick}!*@*") + self.Irc.send2socket(f":{service_id} MODE {chan.name} +b {newnickname}!*@*") except KeyError as ke: - self.Base.logs.error(f'cmd - NICK - KeyError: {ke}') + self.Logs.error(f'cmd - NICK - KeyError: {ke}') case 'QUIT': # :001N1WD7L QUIT :Quit: free_znc_1 cmd.pop(0) - ban_all_chan = self.Base.int_if_possible(self.defConfig['reputation_ban_all_chan']) + ban_all_chan = self.Base.int_if_possible(self.ModConfig.reputation_ban_all_chan) user_id = str(cmd[0]).replace(':','') final_UID = user_id jail_salon = self.Config.SALON_JAIL service_id = self.Config.SERVICE_ID - if final_UID in self.db_reputation: - final_nickname = self.db_reputation[user_id]['nickname'] - for chan in self.Irc.db_chan: - if chan != jail_salon and ban_all_chan == 1: - self.Irc.send2socket(f":{service_id} MODE {chan} -b {final_nickname}!*@*") - self.delete_db_reputation(final_UID) + get_user_reputation = self.reputation_get_Reputation(final_UID) - def _hcmds(self, user:str, cmd: list) -> None: + if not get_user_reputation is None: + final_nickname = get_user_reputation.nickname + for chan in self.Channel.UID_CHANNEL_DB: + if chan.name != jail_salon and ban_all_chan == 1: + self.Irc.send2socket(f":{service_id} MODE {chan.name} -b {final_nickname}!*@*") + self.reputation_delete(final_UID) + + def _hcmds(self, user:str, cmd: list, fullcmd: list = []) -> None: command = str(cmd[0]).lower() fromuser = user @@ -1115,64 +1235,64 @@ class Defender(): # self.Base.create_timer(timer_sent, self.Base.garbage_collector_sockets) except TypeError as te: - self.Base.logs.error(f"Type Error -> {te}") + self.Logs.error(f"Type Error -> {te}") except ValueError as ve: - self.Base.logs.error(f"Value Error -> {ve}") + self.Logs.error(f"Value Error -> {ve}") case 'show_reputation': - if not self.db_reputation: + if not self.UID_REPUTATION_DB: self.Irc.send2socket(f':{dnickname} PRIVMSG {dchanlog} : No one is suspected') - for uid, nickname in self.db_reputation.items(): - self.Irc.send2socket(f':{dnickname} PRIVMSG {dchanlog} : Uid: {uid} | Nickname: {self.db_reputation[uid]["nickname"]} | Connected on: {self.db_reputation[uid]["connected_datetime"]} | Updated on: {self.db_reputation[uid]["updated_datetime"]}') + for suspect in self.UID_REPUTATION_DB: + self.Irc.send2socket(f':{dnickname} PRIVMSG {dchanlog} : Uid: {suspect.uid} | Nickname: {suspect.nickname} | Connected on: {suspect.connected_datetime} | Updated on: {suspect.updated_datetime}') case 'code': try: release_code = cmd[1] - jailed_nickname = self.Irc.get_nickname(fromuser) - jailed_UID = self.Irc.get_uid(fromuser) - if not jailed_UID in self.db_reputation: + jailed_nickname = self.User.get_nickname(fromuser) + jailed_UID = self.User.get_uid(fromuser) + get_reputation = self.reputation_get_Reputation(jailed_UID) + + if get_reputation is None: self.Irc.send2socket(f":{dnickname} NOTICE {fromuser} : No code is requested ...") return False - jailed_IP = self.db_reputation[jailed_UID]['ip'] + jailed_IP = get_reputation.ip jailed_salon = self.Config.SALON_JAIL - reputation_seuil = self.defConfig['reputation_seuil'] + reputation_seuil = self.ModConfig.reputation_seuil welcome_salon = self.Config.SALON_LIBERER - self.Base.logs.debug(f"IP de {jailed_nickname} : {jailed_IP}") + self.Logs.debug(f"IP de {jailed_nickname} : {jailed_IP}") link = self.Config.SERVEUR_LINK color_green = self.Config.CONFIG_COLOR['verte'] color_black = self.Config.CONFIG_COLOR['noire'] - if jailed_UID in self.db_reputation: - if release_code == self.db_reputation[jailed_UID]['secret_code']: - self.Irc.send2socket(f':{dnickname} PRIVMSG {jailed_salon} : Bon mot de passe. Allez du vent !') + if release_code == get_reputation.secret_code: + self.Irc.send2socket(f':{dnickname} PRIVMSG {jailed_salon} : Bon mot de passe. Allez du vent !') - if self.defConfig['reputation_ban_all_chan'] == 1: - for chan in self.Irc.db_chan: - if chan != jailed_salon: - self.Irc.send2socket(f":{service_id} MODE {chan} -b {jailed_nickname}!*@*") + if self.ModConfig.reputation_ban_all_chan == 1: + for chan in self.Channel.UID_CHANNEL_DB: + if chan.name != jailed_salon: + self.Irc.send2socket(f":{service_id} MODE {chan.name} -b {jailed_nickname}!*@*") - del self.db_reputation[jailed_UID] - self.Base.logs.debug(f'{jailed_UID} - {jailed_nickname} removed from REPUTATION_DB') - self.Irc.send2socket(f":{service_id} SAPART {jailed_nickname} {jailed_salon}") - self.Irc.send2socket(f":{service_id} SAJOIN {jailed_nickname} {welcome_salon}") - self.Irc.send2socket(f":{link} REPUTATION {jailed_IP} {int(reputation_seuil) + 1}") - self.Irc.send2socket(f":{service_id} PRIVMSG {jailed_nickname} :[{color_green} MOT DE PASS CORRECT {color_black}] : You have now the right to enjoy the network !") + self.reputation_delete(jailed_UID) + self.Logs.debug(f'{jailed_UID} - {jailed_nickname} removed from REPUTATION_DB') + self.Irc.send2socket(f":{service_id} SAPART {jailed_nickname} {jailed_salon}") + self.Irc.send2socket(f":{service_id} SAJOIN {jailed_nickname} {welcome_salon}") + self.Irc.send2socket(f":{link} REPUTATION {jailed_IP} {int(reputation_seuil) + 1}") + self.User.get_User(jailed_UID).score_connexion = reputation_seuil + 1 + self.Irc.send2socket(f":{service_id} PRIVMSG {jailed_nickname} :[{color_green} MOT DE PASS CORRECT {color_black}] : You have now the right to enjoy the network !") - else: - self.Irc.send2socket(f':{dnickname} PRIVMSG {jailed_salon} : Mauvais password') - self.Irc.send2socket(f":{service_id} PRIVMSG {jailed_nickname} :[{color_green} MAUVAIS PASSWORD {color_black}]") else: - self.Irc.send2socket(f":{dnickname} PRIVMSG {jailed_salon} : Ce n'est pas à toi de taper le mot de passe !") + self.Irc.send2socket(f':{dnickname} PRIVMSG {jailed_salon} : Mauvais password') + self.Irc.send2socket(f":{service_id} PRIVMSG {jailed_nickname} :[{color_green} MAUVAIS PASSWORD {color_black}]") except IndexError: - self.Base.logs.error('_hcmd code: out of index') + self.Logs.error('_hcmd code: out of index') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} code [code]') except KeyError as ke: - self.Base.logs.error(f'_hcmd code: KeyError {ke}') + self.Logs.error(f'_hcmd code: KeyError {ke}') # self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} code [code]') pass @@ -1190,7 +1310,7 @@ class Defender(): key = 'reputation' if activation == 'on': - if self.defConfig[key] == 1: + if self.ModConfig.reputation == 1: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['verte']}REPUTATION{self.Config.CONFIG_COLOR['noire']} ] : Already activated") return False @@ -1203,7 +1323,7 @@ class Defender(): if activation == 'off': - if self.defConfig[key] == 0: + if self.ModConfig.reputation == 0: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['verte']}REPUTATION{self.Config.CONFIG_COLOR['noire']} ] : Already deactivated") return False @@ -1228,7 +1348,7 @@ class Defender(): get_value = str(cmd[3]).lower() if get_value == 'on': - if self.defConfig[key] == 1: + if self.ModConfig.reputation_ban_all_chan == 1: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['rouge']}BAN ON ALL CHANS{self.Config.CONFIG_COLOR['noire']} ] : Already activated") return False @@ -1236,7 +1356,7 @@ class Defender(): self.Irc.send2socket(f':{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR["verte"]}BAN ON ALL CHANS{self.Config.CONFIG_COLOR["noire"]} ] : Activated by {fromuser}') elif get_value == 'off': - if self.defConfig[key] == 0: + if self.ModConfig.reputation_ban_all_chan == 0: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['rouge']}BAN ON ALL CHANS{self.Config.CONFIG_COLOR['noire']} ] : Already deactivated") return False @@ -1261,7 +1381,7 @@ class Defender(): pass except IndexError as ie: - self.Base.logs.warning(f'{ie}') + self.Logs.warning(f'{ie}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} reputation [ON/OFF]') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} reputation set banallchan [ON/OFF]') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} reputation set limit [1234]') @@ -1269,7 +1389,7 @@ class Defender(): self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} reputation set action [kill|None]') except ValueError as ve: - self.Base.logs.warning(f'{ie}') + self.Logs.warning(f'{ie}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : La valeur devrait etre un entier >= 0') case 'proxy_scan': @@ -1298,13 +1418,13 @@ class Defender(): match option: case 'local_scan': if action == 'on': - if self.defConfig[option] == 1: + if self.ModConfig.local_scan == 1: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated") return None self.update_db_configuration(option, 1) self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}") elif action == 'off': - if self.defConfig[option] == 0: + if self.ModConfig.local_scan == 0: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated") return None self.update_db_configuration(option, 0) @@ -1312,13 +1432,13 @@ class Defender(): case 'psutil_scan': if action == 'on': - if self.defConfig[option] == 1: + if self.ModConfig.psutil_scan == 1: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated") return None self.update_db_configuration(option, 1) self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}") elif action == 'off': - if self.defConfig[option] == 0: + if self.ModConfig.psutil_scan == 0: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated") return None self.update_db_configuration(option, 0) @@ -1326,13 +1446,13 @@ class Defender(): case 'abuseipdb_scan': if action == 'on': - if self.defConfig[option] == 1: + if self.ModConfig.abuseipdb_scan == 1: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated") return None self.update_db_configuration(option, 1) self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}") elif action == 'off': - if self.defConfig[option] == 0: + if self.ModConfig.abuseipdb_scan == 0: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated") return None self.update_db_configuration(option, 0) @@ -1340,14 +1460,14 @@ class Defender(): case 'freeipapi_scan': if action == 'on': - if self.defConfig[option] == 1: + if self.ModConfig.freeipapi_scan == 1: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated") return None self.update_db_configuration(option, 1) self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}") elif action == 'off': - if self.defConfig[option] == 0: + if self.ModConfig.freeipapi_scan == 0: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated") return None @@ -1357,13 +1477,13 @@ class Defender(): case 'cloudfilt_scan': if action == 'on': - if self.defConfig[option] == 1: + if self.ModConfig.cloudfilt_scan == 1: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated") return None self.update_db_configuration(option, 1) self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}") elif action == 'off': - if self.defConfig[option] == 0: + if self.ModConfig.cloudfilt_scan == 0: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated") return None self.update_db_configuration(option, 0) @@ -1394,7 +1514,7 @@ class Defender(): activation = str(cmd[1]).lower() key = 'flood' if activation == 'on': - if self.defConfig[key] == 1: + if self.ModConfig.flood == 1: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['verte']}FLOOD{self.Config.CONFIG_COLOR['noire']} ] : Already activated") return False @@ -1402,7 +1522,7 @@ class Defender(): self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['verte']}FLOOD{self.Config.CONFIG_COLOR['noire']} ] : Activated by {fromuser}") if activation == 'off': - if self.defConfig[key] == 0: + if self.ModConfig.flood == 0: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['rouge']}FLOOD{self.Config.CONFIG_COLOR['noire']} ] : Already Deactivated") return False @@ -1437,30 +1557,30 @@ class Defender(): pass except ValueError as ve: - self.Base.logs.error(f"{self.__class__.__name__} Value Error : {ve}") + self.Logs.error(f"{self.__class__.__name__} Value Error : {ve}") case 'status': color_green = self.Config.CONFIG_COLOR['verte'] color_red = self.Config.CONFIG_COLOR['rouge'] color_black = self.Config.CONFIG_COLOR['noire'] try: - self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : [{color_green if self.defConfig["reputation"] == 1 else color_red}Reputation{color_black}] ==> {self.defConfig["reputation"]}') - self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : reputation_seuil ==> {self.defConfig["reputation_seuil"]}') - self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : reputation_ban_all_chan ==> {self.defConfig["reputation_ban_all_chan"]}') - self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : reputation_timer ==> {self.defConfig["reputation_timer"]}') + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : [{color_green if self.ModConfig.reputation == 1 else color_red}Reputation{color_black}] ==> {self.ModConfig.reputation}') + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : reputation_seuil ==> {self.ModConfig.reputation_seuil}') + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : reputation_ban_all_chan ==> {self.ModConfig.reputation_ban_all_chan}') + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : reputation_timer ==> {self.ModConfig.reputation_timer}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : [Proxy_scan]') - self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : {color_green if self.defConfig["local_scan"] == 1 else color_red}local_scan{color_black} ==> {self.defConfig["local_scan"]}') - self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : {color_green if self.defConfig["psutil_scan"] == 1 else color_red}psutil_scan{color_black} ==> {self.defConfig["psutil_scan"]}') - self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : {color_green if self.defConfig["abuseipdb_scan"] == 1 else color_red}abuseipdb_scan{color_black} ==> {self.defConfig["abuseipdb_scan"]}') - self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : {color_green if self.defConfig["freeipapi_scan"] == 1 else color_red}freeipapi_scan{color_black} ==> {self.defConfig["freeipapi_scan"]}') - self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : {color_green if self.defConfig["cloudfilt_scan"] == 1 else color_red}cloudfilt_scan{color_black} ==> {self.defConfig["cloudfilt_scan"]}') - self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : [{color_green if self.defConfig["flood"] == 1 else color_red}Flood{color_black}] ==> {self.defConfig["flood"]}') + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : {color_green if self.ModConfig.local_scan == 1 else color_red}local_scan{color_black} ==> {self.ModConfig.local_scan}') + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : {color_green if self.ModConfig.psutil_scan == 1 else color_red}psutil_scan{color_black} ==> {self.ModConfig.psutil_scan}') + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : {color_green if self.ModConfig.abuseipdb_scan == 1 else color_red}abuseipdb_scan{color_black} ==> {self.ModConfig.abuseipdb_scan}') + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : {color_green if self.ModConfig.freeipapi_scan == 1 else color_red}freeipapi_scan{color_black} ==> {self.ModConfig.freeipapi_scan}') + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : {color_green if self.ModConfig.cloudfilt_scan == 1 else color_red}cloudfilt_scan{color_black} ==> {self.ModConfig.cloudfilt_scan}') + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : [{color_green if self.ModConfig.flood == 1 else color_red}Flood{color_black}] ==> {self.ModConfig.flood}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : flood_action ==> Coming soon') - self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : flood_message ==> {self.defConfig["flood_message"]}') - self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : flood_time ==> {self.defConfig["flood_time"]}') - self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : flood_timer ==> {self.defConfig["flood_timer"]}') + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : flood_message ==> {self.ModConfig.flood_message}') + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : flood_time ==> {self.ModConfig.flood_time}') + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : flood_timer ==> {self.ModConfig.flood_timer}') except KeyError as ke: - self.Base.logs.error(f"Key Error : {ke}") + self.Logs.error(f"Key Error : {ke}") case 'join': @@ -1470,7 +1590,7 @@ class Defender(): self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : {dnickname} JOINED {channel}') self.add_defender_channel(channel) except IndexError as ie: - self.Base.logs.error(f'{ie}') + self.Logs.error(f'{ie}') case 'part': @@ -1484,7 +1604,7 @@ class Defender(): self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : {dnickname} LEFT {channel}') self.delete_defender_channel(channel) except IndexError as ie: - self.Base.logs.error(f'{ie}') + self.Logs.error(f'{ie}') case 'op' | 'o': # /mode #channel +o user @@ -1496,7 +1616,7 @@ class Defender(): nickname = cmd[2] self.Irc.send2socket(f":{service_id} MODE {channel} +o {nickname}") except IndexError as e: - self.Base.logs.warning(f'_hcmd OP: {str(e)}') + self.Logs.warning(f'_hcmd OP: {str(e)}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} op [#SALON] [NICKNAME]') case 'deop' | 'do': @@ -1507,7 +1627,7 @@ class Defender(): nickname = cmd[2] self.Irc.send2socket(f":{service_id} MODE {channel} -o {nickname}") except IndexError as e: - self.Base.logs.warning(f'_hcmd DEOP: {str(e)}') + self.Logs.warning(f'_hcmd DEOP: {str(e)}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} deop [#SALON] [NICKNAME]') case 'owner' | 'q': @@ -1518,7 +1638,7 @@ class Defender(): nickname = cmd[2] self.Irc.send2socket(f":{service_id} MODE {channel} +q {nickname}") except IndexError as e: - self.Base.logs.warning(f'_hcmd OWNER: {str(e)}') + self.Logs.warning(f'_hcmd OWNER: {str(e)}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} owner [#SALON] [NICKNAME]') case 'deowner' | 'dq': @@ -1529,7 +1649,7 @@ class Defender(): nickname = cmd[2] self.Irc.send2socket(f":{service_id} MODE {channel} -q {nickname}") except IndexError as e: - self.Base.logs.warning(f'_hcmd DEOWNER: {str(e)}') + self.Logs.warning(f'_hcmd DEOWNER: {str(e)}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} deowner [#SALON] [NICKNAME]') case 'halfop' | 'h': @@ -1540,7 +1660,7 @@ class Defender(): nickname = cmd[2] self.Irc.send2socket(f":{service_id} MODE {channel} +h {nickname}") except IndexError as e: - self.Base.logs.warning(f'_hcmd halfop: {str(e)}') + self.Logs.warning(f'_hcmd halfop: {str(e)}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} halfop [#SALON] [NICKNAME]') case 'dehalfop' | 'dh': @@ -1551,7 +1671,7 @@ class Defender(): nickname = cmd[2] self.Irc.send2socket(f":{service_id} MODE {channel} -h {nickname}") except IndexError as e: - self.Base.logs.warning(f'_hcmd DEHALFOP: {str(e)}') + self.Logs.warning(f'_hcmd DEHALFOP: {str(e)}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} dehalfop [#SALON] [NICKNAME]') case 'voice' | 'v': @@ -1562,7 +1682,7 @@ class Defender(): nickname = cmd[2] self.Irc.send2socket(f":{service_id} MODE {channel} +v {nickname}") except IndexError as e: - self.Base.logs.warning(f'_hcmd VOICE: {str(e)}') + self.Logs.warning(f'_hcmd VOICE: {str(e)}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} voice [#SALON] [NICKNAME]') case 'devoice' | 'dv': @@ -1573,7 +1693,7 @@ class Defender(): nickname = cmd[2] self.Irc.send2socket(f":{service_id} MODE {channel} -v {nickname}") except IndexError as e: - self.Base.logs.warning(f'_hcmd DEVOICE: {str(e)}') + self.Logs.warning(f'_hcmd DEVOICE: {str(e)}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} devoice [#SALON] [NICKNAME]') case 'ban' | 'b': @@ -1583,9 +1703,9 @@ class Defender(): nickname = cmd[2] self.Irc.send2socket(f":{service_id} MODE {channel} +b {nickname}!*@*") - self.Base.logs.debug(f'{fromuser} has banned {nickname} from {channel}') + self.Logs.debug(f'{fromuser} has banned {nickname} from {channel}') except IndexError as e: - self.Base.logs.warning(f'_hcmd BAN: {str(e)}') + self.Logs.warning(f'_hcmd BAN: {str(e)}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} ban [#SALON] [NICKNAME]') case 'unban' | 'ub': @@ -1595,9 +1715,9 @@ class Defender(): nickname = cmd[2] self.Irc.send2socket(f":{service_id} MODE {channel} -b {nickname}!*@*") - self.Base.logs.debug(f'{fromuser} has unbanned {nickname} from {channel}') + self.Logs.debug(f'{fromuser} has unbanned {nickname} from {channel}') except IndexError as e: - self.Base.logs.warning(f'_hcmd UNBAN: {str(e)}') + self.Logs.warning(f'_hcmd UNBAN: {str(e)}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} unban [#SALON] [NICKNAME]') case 'kick' | 'k': @@ -1613,9 +1733,9 @@ class Defender(): final_reason = ' '.join(reason) self.Irc.send2socket(f":{service_id} KICK {channel} {nickname} {final_reason}") - self.Base.logs.debug(f'{fromuser} has kicked {nickname} from {channel} : {final_reason}') + self.Logs.debug(f'{fromuser} has kicked {nickname} from {channel} : {final_reason}') except IndexError as e: - self.Base.logs.warning(f'_hcmd KICK: {str(e)}') + self.Logs.warning(f'_hcmd KICK: {str(e)}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} kick [#SALON] [NICKNAME] [REASON]') case 'kickban' | 'kb': @@ -1632,39 +1752,50 @@ class Defender(): self.Irc.send2socket(f":{service_id} KICK {channel} {nickname} {final_reason}") self.Irc.send2socket(f":{service_id} MODE {channel} +b {nickname}!*@*") - self.Base.logs.debug(f'{fromuser} has kicked and banned {nickname} from {channel} : {final_reason}') + self.Logs.debug(f'{fromuser} has kicked and banned {nickname} from {channel} : {final_reason}') except IndexError as e: - self.Base.logs.warning(f'_hcmd KICKBAN: {str(e)}') + self.Logs.warning(f'_hcmd KICKBAN: {str(e)}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} kickban [#SALON] [NICKNAME] [REASON]') case 'info': try: nickoruid = cmd[1] - uid_query = None - nickname_query = None + UserObject = self.User.get_User(nickoruid) - if not self.Irc.get_nickname(nickoruid) is None: - nickname_query = self.Irc.get_nickname(nickoruid) - - if not self.Irc.get_uid(nickoruid) is None: - uid_query = self.Irc.get_uid(nickoruid) - - if nickname_query is None and uid_query is None: - self.Irc.send2socket(f":{dnickname} NOTICE {fromuser} : This user {nickoruid} doesn't exist") + if not UserObject is None: + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : UID : {UserObject.uid}') + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : NICKNAME : {UserObject.nickname}') + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : USERNAME : {UserObject.username}') + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : HOSTNAME : {UserObject.hostname}') + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : IP : {UserObject.remote_ip}') + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : REPUTATION : {UserObject.score_connexion}') + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : VHOST : {UserObject.vhost}') + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : MODES : {UserObject.umodes}') + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : CONNECTION TIME : {UserObject.connexion_datetime}') else: - self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : UID : {uid_query}') - self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : NICKNAME : {self.Irc.db_uid[uid_query]["nickname"]}') - self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : USERNAME : {self.Irc.db_uid[uid_query]["username"]}') - self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : HOSTNAME : {self.Irc.db_uid[uid_query]["hostname"]}') - self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : VHOST : {self.Irc.db_uid[uid_query]["vhost"]}') - self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : MODES : {self.Irc.db_uid[uid_query]["umodes"]}') - self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : CONNECTION TIME : {self.Irc.db_uid[uid_query]["datetime"]}') + self.Irc.send2socket(f":{dnickname} NOTICE {fromuser} : This user {nickoruid} doesn't exist") + except KeyError as ke: - self.Base.logs.warning(f"Key error info user : {ke}") + self.Logs.warning(f"Key error info user : {ke}") + + case 'sentinel': + # .sentinel on + activation = str(cmd[1]).lower() + service_id = self.Config.SERVICE_ID + + channel_to_dont_quit = [self.Config.SALON_JAIL, self.Config.SERVICE_CHANLOG] + + if activation == 'on': + for chan in self.Channel.UID_CHANNEL_DB: + if not chan.name in channel_to_dont_quit: + self.Irc.send2socket(f":{service_id} JOIN {chan.name}") + if activation == 'off': + for chan in self.Channel.UID_CHANNEL_DB: + if not chan.name in channel_to_dont_quit: + self.Irc.send2socket(f":{service_id} PART {chan.name}") + self.join_saved_channels() case 'show_users': - for uid, infousers in self.Irc.db_uid.items(): - # print(uid + " " + str(infousers)) - for info in infousers: - if info == 'nickname': - self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :UID : {uid} - isWebirc: {infousers['isWebirc']} - {info}: {infousers[info]}") + + for db_user in self.User.UID_DB: + self.Irc.send2socket(f":{dnickname} NOTICE {fromuser} :UID : {db_user.uid} - isWebirc: {db_user.isWebirc} - Nickname: {db_user.nickname} - Connection: {db_user.connexion_datetime}") diff --git a/mods/mod_test.py b/mods/mod_test.py index b27d27c..5fa85b9 100644 --- a/mods/mod_test.py +++ b/mods/mod_test.py @@ -1,11 +1,10 @@ -import threading from core.irc import Irc # Le module crée devra réspecter quelques conditions # 1. Importer le module de configuration # 2. Le nom de class devra toujours s'appeler comme le module exemple => nom de class Dktmb | nom du module mod_dktmb # 3. la fonction __init__ devra toujours avoir les parametres suivant (self, irc:object) -# 1 . Créer la variable irc dans le module +# 1 . Créer la variable Irc dans le module # 2 . Récuperer la configuration dans une variable # 3 . Définir et enregistrer les nouvelles commandes # 4. une fonction _hcmds(self, user:str, cmd: list) devra toujours etre crée. @@ -13,37 +12,59 @@ from core.irc import Irc class Test(): def __init__(self, ircInstance:Irc) -> None: - print(f'Module {self.__class__.__name__} loaded ...') - self.irc = ircInstance # Ajouter l'object mod_irc a la classe + # Add Irc Object to the module + self.Irc = ircInstance - self.config = ircInstance.Config # Ajouter la configuration a la classe + # Add Global Configuration to the module + self.Config = ircInstance.Config + + # Add Base object to the module + self.Base = ircInstance.Base + + # Add logs object to the module + self.Logs = ircInstance.Base.logs + + # Add User object to the module + self.User = ircInstance.User + + # Add Channel object to the module + self.Channel = ircInstance.Channel # Créer les nouvelles commandes du module - self.commands = ['test'] + self.commands_level = { + 0: ['test'], + 1: ['test_level_1'] + } - self.__set_commands(self.commands) # Enrigstrer les nouvelles commandes dans le code + # Init the module + self.__init_module() - self.core = ircInstance.Base # Instance du module Base + # Log the module + self.Logs.debug(f'Module {self.__class__.__name__} loaded ...') - self.session = '' # Instancier une session pour la base de données - self.__create_db('mod_test') # Créer la base de données si necessaire + def __init_module(self) -> None: - def __set_commands(self, commands:list) -> None: - """Rajoute les commandes du module au programme principal + self.__set_commands(self.commands_level) + self.__create_tables() + + return None + + def __set_commands(self, commands:dict[int, list[str]]) -> None: + """### Rajoute les commandes du module au programme principal Args: commands (list): Liste des commandes du module - - Returns: - None: Aucun retour attendu """ - for command in commands: - self.irc.commands.append(command) + for level, com in commands.items(): + for c in commands[level]: + if not c in self.Irc.commands: + self.Irc.commands_level[level].append(c) + self.Irc.commands.append(c) - return True + return None - def __create_db(self, db_name:str) -> None: + def __create_tables(self) -> None: """Methode qui va créer la base de donnée si elle n'existe pas. Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module Args: @@ -52,36 +73,36 @@ class Test(): Returns: None: Aucun retour n'es attendu """ - db_directory = self.core.MODS_DB_PATH - self.session = self.core.db_init(db_directory, db_name) - - table_logs = '''CREATE TABLE IF NOT EXISTS logs ( + table_logs = '''CREATE TABLE IF NOT EXISTS test_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, datetime TEXT, server_msg TEXT ) ''' - self.core.db_execute_query(self.session, table_logs) + self.Base.db_execute_query(table_logs) return None def unload(self) -> None: return None - def _hcmds(self, user:str, cmd: list) -> None: + def cmd(self, data:list) -> None: + return None - command = cmd[0].lower() + def _hcmds(self, user:str, cmd: list, fullcmd: list = []) -> None: + + command = str(cmd[0]).lower() + dnickname = self.Config.SERVICE_NICKNAME + fromuser = user match command: case 'test': try: - user_action = cmd[1] - self.irc.send2socket(f'PRIVMSG #webmail Je vais voicer {user}') - self.irc.send2socket(f'MODE #webmail +v {user_action}') - self.core.create_log(f"MODE +v sur {user_action}") + + self.Irc.send2socket(f":{dnickname} NOTICE {fromuser} : test command ready ...") + self.Logs.debug(f"Test logs ready") except KeyError as ke: - self.core.create_log(f"Key Error : {ke}") - + self.Logs.error(f"Key Error : {ke}") \ No newline at end of file diff --git a/mods/mod_votekick.py b/mods/mod_votekick.py new file mode 100644 index 0000000..4545ef4 --- /dev/null +++ b/mods/mod_votekick.py @@ -0,0 +1,440 @@ +from core.irc import Irc +import re +from dataclasses import dataclass, field + +# Activer le systeme sur un salon (activate #salon) +# Le service devra se connecter au salon +# Le service devra se mettre en op +# Soumettre un nom de user (submit nickname) +# voter pour un ban (vote_for) +# voter contre un ban (vote_against) + + + +class Votekick(): + + @dataclass + class VoteChannelModel: + channel_name: str + target_user: str + voter_users: list + vote_for: int + vote_against: int + + VOTE_CHANNEL_DB:list[VoteChannelModel] = [] + + def __init__(self, ircInstance:Irc) -> None: + # Add Irc Object to the module + self.Irc = ircInstance + + # Add Global Configuration to the module + self.Config = ircInstance.Config + + # Add Base object to the module + self.Base = ircInstance.Base + + # Add logs object to the module + self.Logs = ircInstance.Base.logs + + # Add User object to the module + self.User = ircInstance.User + + # Add Channel object to the module + self.Channel = ircInstance.Channel + + # Créer les nouvelles commandes du module + self.commands_level = { + 0: ['vote_for', 'vote_against'], + 1: ['activate', 'deactivate', 'submit', 'vote_stat', 'vote_verdict', 'vote_cancel'] + } + + # Init the module + self.__init_module() + + # Log the module + self.Logs.debug(f'Module {self.__class__.__name__} loaded ...') + + def __init_module(self) -> None: + + self.__set_commands(self.commands_level) + self.__create_tables() + self.join_saved_channels() + + return None + + def __set_commands(self, commands:dict[int, list[str]]) -> None: + """### Rajoute les commandes du module au programme principal + + Args: + commands (list): Liste des commandes du module + """ + for level, com in commands.items(): + for c in commands[level]: + if not c in self.Irc.commands: + self.Irc.commands_level[level].append(c) + self.Irc.commands.append(c) + + return None + + def __create_tables(self) -> None: + """Methode qui va créer la base de donnée si elle n'existe pas. + Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module + Args: + database_name (str): Nom de la base de données ( pas d'espace dans le nom ) + + Returns: + None: Aucun retour n'es attendu + """ + + table_logs = '''CREATE TABLE IF NOT EXISTS votekick_logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + datetime TEXT, + server_msg TEXT + ) + ''' + + table_vote = '''CREATE TABLE IF NOT EXISTS votekick_channel ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + datetime TEXT, + channel TEXT + ) + ''' + + self.Base.db_execute_query(table_logs) + self.Base.db_execute_query(table_vote) + return None + + def unload(self) -> None: + try: + for chan in self.VOTE_CHANNEL_DB: + self.Irc.send2socket(f":{self.Config.SERVICE_NICKNAME} PART {chan.channel_name}") + + self.VOTE_CHANNEL_DB = [] + self.Logs.debug(f'Delete memory DB VOTE_CHANNEL_DB: {self.VOTE_CHANNEL_DB}') + + return None + except UnboundLocalError as ne: + self.Logs.error(f'{ne}') + except NameError as ue: + self.Logs.error(f'{ue}') + except: + self.Logs.error('Error on the module') + + def init_vote_system(self, channel: str) -> bool: + + response = False + for chan in self.VOTE_CHANNEL_DB: + if chan.channel_name == channel: + chan.target_user = '' + chan.voter_users = [] + chan.vote_against = 0 + chan.vote_for = 0 + response = True + + return response + + def insert_vote_channel(self, ChannelObject: VoteChannelModel) -> bool: + result = False + found = False + for chan in self.VOTE_CHANNEL_DB: + if chan.channel_name == ChannelObject.channel_name: + found = True + + if not found: + self.VOTE_CHANNEL_DB.append(ChannelObject) + self.Logs.debug(f"The channel has been added {ChannelObject}") + self.db_add_vote_channel(ChannelObject.channel_name) + + return result + + def db_add_vote_channel(self, channel:str) -> bool: + """Cette fonction ajoute les salons ou seront autoriser les votes + + Args: + channel (str): le salon à enregistrer. + """ + current_datetime = self.Base.get_datetime() + mes_donnees = {'channel': channel} + + response = self.Base.db_execute_query("SELECT id FROM votekick_channel WHERE channel = :channel", mes_donnees) + + isChannelExist = response.fetchone() + + if isChannelExist is None: + mes_donnees = {'datetime': current_datetime, 'channel': channel} + insert = self.Base.db_execute_query(f"INSERT INTO votekick_channel (datetime, channel) VALUES (:datetime, :channel)", mes_donnees) + if insert.rowcount > 0: + return True + else: + return False + else: + return False + + def db_delete_vote_channel(self, channel: str) -> bool: + """Cette fonction supprime les salons de join de Defender + + Args: + channel (str): le salon à enregistrer. + """ + mes_donnes = {'channel': channel} + response = self.Base.db_execute_query("DELETE FROM votekick_channel WHERE channel = :channel", mes_donnes) + + affected_row = response.rowcount + + if affected_row > 0: + return True + else: + return False + + def join_saved_channels(self) -> None: + + result = self.Base.db_execute_query("SELECT id, channel FROM votekick_channel") + channels = result.fetchall() + unixtime = self.Base.get_unixtime() + + for channel in channels: + id, chan = channel + self.insert_vote_channel(self.VoteChannelModel(channel_name=chan, target_user='', voter_users=[], vote_for=0, vote_against=0)) + self.Irc.send2socket(f":{self.Config.SERVEUR_ID} SJOIN {unixtime} {chan} + :{self.Config.SERVICE_ID}") + self.Irc.send2socket(f":{self.Config.SERVICE_NICKNAME} SAMODE {chan} +o {self.Config.SERVICE_NICKNAME}") + + return None + + def is_vote_ongoing(self, channel: str) -> bool: + + response = False + for vote in self.VOTE_CHANNEL_DB: + if vote.channel_name == channel: + if vote.target_user: + response = True + + return response + + def timer_vote_verdict(self, channel: str) -> None: + + dnickname = self.Config.SERVICE_NICKNAME + + for chan in self.VOTE_CHANNEL_DB: + if chan.channel_name == channel: + target_user = self.User.get_nickname(chan.target_user) + if chan.vote_for > chan.vote_against: + self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :The user {self.Config.CONFIG_COLOR["gras"]}{target_user}{self.Config.CONFIG_COLOR["nogc"]} will be kicked from this channel') + self.Irc.send2socket(f":{dnickname} KICK {channel} {target_user} Following the vote, you are not welcome in {channel}") + elif chan.vote_for <= chan.vote_against: + self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :This user will stay on this channel') + + # Init the system + if self.init_vote_system(channel): + self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :System vote re initiated') + + return None + + def cmd(self, data:list) -> None: + cmd = list(data).copy() + + match cmd[2]: + case 'SJOIN': + pass + case _: + pass + + return None + + def _hcmds(self, user:str, cmd: list, fullcmd: list = []) -> None: + # cmd is the command starting from the user command + # full cmd is sending the entire server response + + command = str(cmd[0]).lower() + dnickname = self.Config.SERVICE_NICKNAME + fromuser = user + + if len(fullcmd) >= 3: + fromchannel = str(fullcmd[2]).lower() if self.Base.Is_Channel(str(fullcmd[2]).lower()) else None + else: + fromchannel = None + + if len(cmd) >= 2: + sentchannel = str(cmd[1]).lower() if self.Base.Is_Channel(str(cmd[1]).lower()) else None + else: + sentchannel = None + + if not fromchannel is None: + channel = fromchannel + elif not sentchannel is None: + channel = sentchannel + else: + channel = None + + match command: + + case 'vote_cancel': + try: + if channel is None: + self.Logs.error(f"The channel is not known, defender can't cancel the vote") + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :You need to specify the channel => /msg {dnickname} vote_cancel #channel') + + for vote in self.VOTE_CHANNEL_DB: + if vote.channel_name == channel: + self.init_vote_system(channel) + self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :Vote system re-initiated') + + except IndexError as ke: + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} vote_cancel #channel') + self.Logs.error(f'Index Error: {ke}') + + case 'vote_for': + try: + # vote_for + channel = str(fullcmd[2]).lower() + for chan in self.VOTE_CHANNEL_DB: + if chan.channel_name == channel: + if fromuser in chan.voter_users: + self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :You already submitted a vote') + else: + chan.vote_for += 1 + chan.voter_users.append(fromuser) + self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :Vote recorded, thank you') + + except KeyError as ke: + self.Logs.error(f'Key Error: {ke}') + except IndexError as ie: + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} vote_cancel #channel') + self.Logs.error(f'Index Error: {ie}') + + case 'vote_against': + try: + # vote_against + channel = str(fullcmd[2]).lower() + for chan in self.VOTE_CHANNEL_DB: + if chan.channel_name == channel: + if fromuser in chan.voter_users: + self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :You already submitted a vote') + else: + chan.vote_against += 1 + chan.voter_users.append(fromuser) + self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :Vote recorded, thank you') + + except KeyError as ke: + self.Logs.error(f'Key Error: {ke}') + + case 'vote_stat': + try: + # channel = str(fullcmd[2]).lower() + for chan in self.VOTE_CHANNEL_DB: + if chan.channel_name == channel: + self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :Channel: {chan.channel_name} | Target: {self.User.get_nickname(chan.target_user)} | For: {chan.vote_for} | Against: {chan.vote_against} | Number of voters: {str(len(chan.voter_users))}') + + except KeyError as ke: + self.Logs.error(f'Key Error: {ke}') + + case 'vote_verdict': + try: + # channel = str(fullcmd[2]).lower() + for chan in self.VOTE_CHANNEL_DB: + if chan.channel_name == channel: + target_user = self.User.get_nickname(chan.target_user) + if chan.vote_for > chan.vote_against: + self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :The user {self.Config.CONFIG_COLOR["gras"]}{target_user}{self.Config.CONFIG_COLOR["nogc"]} will be kicked from this channel') + self.Irc.send2socket(f":{dnickname} KICK {channel} {target_user} Following the vote, you are not welcome in {channel}") + elif chan.vote_for <= chan.vote_against: + self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :This user will stay on this channel') + + # Init the system + if self.init_vote_system(channel): + self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :System vote re initiated') + + except KeyError as ke: + self.Logs.error(f'Key Error: {ke}') + + case 'submit': + # submit nickname + try: + nickname_submitted = cmd[1] + # channel = str(fullcmd[2]).lower() + uid_submitted = self.User.get_uid(nickname_submitted) + user_submitted = self.User.get_User(nickname_submitted) + + # check if there is an ongoing vote + if self.is_vote_ongoing(channel): + for vote in self.VOTE_CHANNEL_DB: + if vote.channel_name == channel: + ongoing_user = self.User.get_nickname(vote.target_user) + + self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :There is an ongoing vote on {ongoing_user}') + return False + + # check if the user exist + if user_submitted is None: + self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :This nickname <{nickname_submitted}> do not exist') + return False + + uid_cleaned = self.Base.clean_uid(uid_submitted) + ChannelInfo = self.Channel.get_Channel(channel) + + clean_uids_in_channel: list = [] + for uid in ChannelInfo.uids: + clean_uids_in_channel.append(self.Base.clean_uid(uid)) + + if not uid_cleaned in clean_uids_in_channel: + self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :This nickname <{nickname_submitted}> is not available in this channel') + return False + + # check if Ircop or Service or Bot + pattern = fr'[o|B|S]' + operator_user = re.findall(pattern, user_submitted.umodes) + if operator_user: + self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :You cant vote for this user ! he/she is protected') + return False + + for chan in self.VOTE_CHANNEL_DB: + if chan.channel_name == channel: + chan.target_user = self.User.get_uid(nickname_submitted) + + self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :{nickname_submitted} has been targeted for a vote') + + self.Base.create_timer(60, self.timer_vote_verdict, (channel, )) + self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :This vote will end after 60 secondes') + + except KeyError as ke: + self.Logs.error(f'Key Error: {ke}') + except TypeError as te: + self.Logs.error(te) + + case 'activate': + try: + # activate #channel + # channel = str(cmd[1]).lower() + + self.insert_vote_channel( + self.VoteChannelModel( + channel_name=channel, + target_user='', + voter_users=[], + vote_for=0, + vote_against=0 + ) + ) + + self.Irc.send2socket(f":{dnickname} JOIN {channel}") + self.Irc.send2socket(f":{dnickname} SAMODE {channel} +o {dnickname}") + self.Irc.send2socket(f":{dnickname} PRIVMSG {channel} :You can now use !submit to decide if he will stay or not on this channel ") + + except KeyError as ke: + self.Logs.error(f"Key Error : {ke}") + + case 'deactivate': + try: + # deactivate #channel + # channel = str(cmd[1]).lower() + + self.Irc.send2socket(f":{dnickname} SAMODE {channel} -o {dnickname}") + self.Irc.send2socket(f":{dnickname} PART {channel}") + + for chan in self.VOTE_CHANNEL_DB: + if chan.channel_name == channel: + self.VOTE_CHANNEL_DB.remove(chan) + self.db_delete_vote_channel(chan.channel_name) + + self.Logs.debug(f"Test logs ready") + except KeyError as ke: + self.Logs.error(f"Key Error : {ke}") \ No newline at end of file diff --git a/version.json b/version.json index 2ee8501..13968cc 100644 --- a/version.json +++ b/version.json @@ -1,3 +1,3 @@ { - "version": "4.1.0" + "version": "5.0.1" } \ No newline at end of file