From e4a0c530a30e7a45f28b17ee2c19b17aecd9e60e Mon Sep 17 00:00:00 2001 From: adator85 <> Date: Fri, 9 Aug 2024 02:08:33 +0200 Subject: [PATCH] Create DataClasses --- .gitignore | 5 + core/Model.py | 218 +++++++++++++++++++++++++++++ core/base.py | 86 +++++++----- core/irc.py | 316 ++++++++++++++++++------------------------- core/loadConf.py | 59 +++++--- mods/mod_defender.py | 82 ++++++----- 6 files changed, 483 insertions(+), 283 deletions(-) create mode 100644 .gitignore create mode 100644 core/Model.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0efc5bc --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.pyenv/ +db/ +logs/ +__pycache__/ +configuration.json \ No newline at end of file diff --git a/core/Model.py b/core/Model.py new file mode 100644 index 0000000..6a07d9a --- /dev/null +++ b/core/Model.py @@ -0,0 +1,218 @@ +from dataclasses import dataclass, field +from datetime import datetime +from typing import Union +from core.base import Base + + +class User: + + @dataclass + class UserModel: + uid: str + nickname: str + username: str + hostname: str + umodes: str + vhost: str + isWebirc: bool + remote_ip: str + score_connexion: int + connexion_datetime: datetime = field(default=datetime.now()) + + UID_DB: list[UserModel] = [] + + def __init__(self, Base: Base) -> None: + self.log = Base.logs + pass + + def insert(self, newUser: UserModel) -> bool: + + result = False + exist = False + + for record in self.UID_DB: + if record.uid == newUser.uid: + exist = True + self.log.debug(f'{record.uid} already exist') + + if not exist: + self.UID_DB.append(newUser) + result = True + self.log.debug(f'New User Created: ({newUser})') + + if not result: + self.log.critical(f'The User Object was not inserted {newUser}') + + return result + + def update(self, uid: str, newNickname: str) -> bool: + + result = False + + for record in self.UID_DB: + if record.uid == uid: + record.nickname = newNickname + result = True + self.log.debug(f'UID ({record.uid}) has been updated with new nickname {newNickname}') + + if not result: + self.log.critical(f'The new nickname {newNickname} was not updated, uid = {uid}') + + return result + + def delete(self, uid: str) -> bool: + + result = False + + for record in self.UID_DB: + if record.uid == uid: + self.UID_DB.remove(record) + result = True + self.log.debug(f'UID ({record.uid}) has been created') + + if not result: + self.log.critical(f'The UID {uid} was not deleted') + + return result + + def get_User(self, uidornickname: str) -> Union[UserModel, None]: + + User = None + for record in self.UID_DB: + if record.uid == uidornickname: + User = record + elif record.nickname == uidornickname: + User = record + + self.log.debug(f'Search {uidornickname} -- result = {User}') + + return User + + def get_uid(self, uidornickname:str) -> Union[str, None]: + + uid = None + for record in self.UID_DB: + if record.uid == uidornickname: + uid = record.uid + if record.nickname == uidornickname: + uid = record.uid + + self.log.debug(f'The UID that you are looking for {uidornickname} has been found {uid}') + return uid + + def get_nickname(self, uidornickname:str) -> Union[str, None]: + + nickname = None + for record in self.UID_DB: + if record.nickname == uidornickname: + nickname = record.nickname + if record.uid == uidornickname: + nickname = record.nickname + self.log.debug(f'The value {uidornickname} -- {nickname}') + return nickname + + +class Admin: + + @dataclass + class AdminModel: + uid: str + nickname: str + username: str + hostname: str + umodes: str + vhost: str + level: int + connexion_datetime: datetime = field(default=datetime.now()) + + UID_ADMIN_DB: list[AdminModel] = [] + + def __init__(self, Base: Base) -> None: + self.log = Base.logs + pass + + def insert(self, newAdmin: AdminModel) -> bool: + + result = False + exist = False + + for record in self.UID_ADMIN_DB: + if record.uid == newAdmin.uid: + exist = True + self.log.debug(f'{record.uid} already exist') + + if not exist: + self.UID_ADMIN_DB.append(newAdmin) + result = True + self.log.debug(f'UID ({newAdmin.uid}) has been created') + + if not result: + self.log.critical(f'The User Object was not inserted {newAdmin}') + + return result + + def update(self, uid: str, newNickname: str) -> bool: + + result = False + + for record in self.UID_ADMIN_DB: + if record.uid == uid: + record.nickname = newNickname + result = True + self.log.debug(f'UID ({record.uid}) has been updated with new nickname {newNickname}') + + if not result: + self.log.critical(f'The new nickname {newNickname} was not updated, uid = {uid}') + + return result + + def delete(self, uid: str) -> bool: + + result = False + + for record in self.UID_ADMIN_DB: + if record.uid == uid: + self.UID_ADMIN_DB.remove(record) + result = True + self.log.debug(f'UID ({record.uid}) has been created') + + if not result: + self.log.critical(f'The UID {uid} was not deleted') + + return result + + def get_Admin(self, uidornickname: str) -> Union[AdminModel, None]: + + Admin = None + for record in self.UID_ADMIN_DB: + if record.uid == uidornickname: + Admin = record + elif record.nickname == uidornickname: + Admin = record + + self.log.debug(f'Search {uidornickname} -- result = {Admin}') + + return Admin + + def get_uid(self, uidornickname:str) -> Union[str, None]: + + uid = None + for record in self.UID_ADMIN_DB: + if record.uid == uidornickname: + uid = record.uid + if record.nickname == uidornickname: + uid = record.uid + + self.log.debug(f'The UID that you are looking for {uidornickname} has been found {uid}') + return uid + + def get_nickname(self, uidornickname:str) -> Union[str, None]: + + nickname = None + for record in self.UID_ADMIN_DB: + if record.nickname == uidornickname: + nickname = record.nickname + if record.uid == uidornickname: + nickname = record.nickname + self.log.debug(f'The value {uidornickname} -- {nickname}') + return nickname diff --git a/core/base.py b/core/base.py index a8e7639..4874023 100644 --- a/core/base.py +++ b/core/base.py @@ -1,31 +1,22 @@ import time, threading, os, random, socket, hashlib, ipaddress, logging, requests, json, sys +from typing import Union +from base64 import b64decode from datetime import datetime from sqlalchemy import create_engine, Engine, Connection, CursorResult from sqlalchemy.sql import text -from core.loadConf import Config +from core.loadConf import ConfigDataModel class Base: CORE_DB_PATH = 'core' + os.sep + 'db' + os.sep # Le dossier bases de données core MODS_DB_PATH = 'mods' + os.sep + 'db' + os.sep # Le dossier bases de données des modules PYTHON_MIN_VERSION = '3.10' # Version min de python - DB_SCHEMA:list[str] = { - 'admins': 'sys_admins', - 'commandes': 'sys_commandes', - 'logs': 'sys_logs', - 'modules': 'sys_modules' - } - DEFENDER_VERSION = '' # MAJOR.MINOR.BATCH - LATEST_DEFENDER_VERSION = '' # Latest Version of Defender in git - DEFENDER_DB_PATH = 'db' + os.sep # Séparateur en fonction de l'OS - DEFENDER_DB_NAME = 'defender' # Le nom de la base de données principale - - def __init__(self, Config: Config) -> None: + def __init__(self, Config: ConfigDataModel) -> None: self.Config = Config # Assigner l'objet de configuration self.init_log_system() # Demarrer le systeme de log - self.check_for_new_version() # Verifier si une nouvelle version est disponible + self.check_for_new_version(True) # Verifier si une nouvelle version est disponible self.running_timers:list[threading.Timer] = [] # Liste des timers en cours self.running_threads:list[threading.Thread] = [] # Liste des threads en cours @@ -48,12 +39,15 @@ class Base: with open(version_filename, 'r') as version_data: current_version:dict[str, str] = json.load(version_data) - self.DEFENDER_VERSION = current_version["version"] + # self.DEFENDER_VERSION = current_version["version"] + self.Config.current_version = current_version['version'] return None def __get_latest_defender_version(self) -> None: try: + self.logs.debug(f'Looking for a new version available on Github') + print(f'===> Looking for a new version available on Github') token = '' json_url = f'https://raw.githubusercontent.com/adator85/IRC_DEFENDER_MODULES/main/version.json' headers = { @@ -68,7 +62,8 @@ class Base: response.raise_for_status() # Vérifie si la requête a réussi json_response:dict = response.json() - self.LATEST_DEFENDER_VERSION = json_response["version"] + # self.LATEST_DEFENDER_VERSION = json_response["version"] + self.Config.latest_version = json_response['version'] return None except requests.HTTPError as err: @@ -76,16 +71,20 @@ class Base: except: self.logs.warning(f'Github not available to fetch latest version') - def check_for_new_version(self) -> bool: + def check_for_new_version(self, online:bool) -> bool: try: + self.logs.debug(f'Checking for a new service version') + # Assigner la version actuelle de Defender - self.__set_current_defender_version() + self.__set_current_defender_version() # Récuperer la dernier version disponible dans github - self.__get_latest_defender_version() + if online: + self.logs.debug(f'Retrieve the latest version from Github') + self.__get_latest_defender_version() isNewVersion = False - latest_version = self.LATEST_DEFENDER_VERSION - current_version = self.DEFENDER_VERSION + latest_version = self.Config.latest_version + current_version = self.Config.current_version curr_major , curr_minor, curr_patch = current_version.split('.') last_major, last_minor, last_patch = latest_version.split('.') @@ -130,7 +129,7 @@ class Base: Returns: None: Aucun retour """ - sql_insert = f"INSERT INTO {self.DB_SCHEMA['logs']} (datetime, server_msg) VALUES (:datetime, :server_msg)" + sql_insert = f"INSERT INTO {self.Config.table_log} (datetime, server_msg) VALUES (:datetime, :server_msg)" mes_donnees = {'datetime': str(self.get_datetime()),'server_msg': f'{log_message}'} self.db_execute_query(sql_insert, mes_donnees) @@ -166,7 +165,7 @@ class Base: cmd_list[2] = '*******' cmd = ' '.join(cmd_list) - insert_cmd_query = f"INSERT INTO {self.DB_SCHEMA['commandes']} (datetime, user, commande) VALUES (:datetime, :user, :commande)" + insert_cmd_query = f"INSERT INTO {self.Config.table_commande} (datetime, user, commande) VALUES (:datetime, :user, :commande)" mes_donnees = {'datetime': self.get_datetime(), 'user': user_cmd, 'commande': cmd} self.db_execute_query(insert_cmd_query, mes_donnees) @@ -181,7 +180,7 @@ class Base: Returns: bool: True si le module existe déja dans la base de données sinon False """ - query = f"SELECT id FROM {self.DB_SCHEMA['modules']} WHERE module = :module" + query = f"SELECT id FROM {self.Config.table_module} WHERE module = :module" mes_donnes = {'module': module_name} results = self.db_execute_query(query, mes_donnes) @@ -199,7 +198,7 @@ class Base: if not self.db_isModuleExist(module_name): self.logs.debug(f"Le module {module_name} n'existe pas alors ont le créer") - insert_cmd_query = f"INSERT INTO {self.DB_SCHEMA['modules']} (datetime, user, module) VALUES (:datetime, :user, :module)" + insert_cmd_query = f"INSERT INTO {self.Config.table_module} (datetime, user, module) VALUES (:datetime, :user, :module)" mes_donnees = {'datetime': self.get_datetime(), 'user': user_cmd, 'module': module_name} self.db_execute_query(insert_cmd_query, mes_donnees) # self.db_close_session(self.session) @@ -214,7 +213,7 @@ class Base: Args: cmd (str): le module a enregistrer """ - insert_cmd_query = f"DELETE FROM {self.DB_SCHEMA['modules']} WHERE module = :module" + insert_cmd_query = f"DELETE FROM {self.Config.table_module} WHERE module = :module" mes_donnees = {'module': module_name} self.db_execute_query(insert_cmd_query, mes_donnees) @@ -222,14 +221,20 @@ class Base: def db_create_first_admin(self) -> None: - user = self.db_execute_query(f"SELECT id FROM {self.DB_SCHEMA['admins']}") + user = self.db_execute_query(f"SELECT id FROM {self.Config.table_admin}") if not user.fetchall(): admin = self.Config.OWNER password = self.crypt_password(self.Config.PASSWORD) - mes_donnees = {'createdOn': self.get_datetime(), 'user': admin, 'password': password, 'hostname': '*', 'vhost': '*', 'level': 5} + mes_donnees = {'createdOn': self.get_datetime(), + 'user': admin, + 'password': password, + 'hostname': '*', + 'vhost': '*', + 'level': 5 + } self.db_execute_query(f""" - INSERT INTO {self.DB_SCHEMA['admins']} + INSERT INTO {self.Config.table_admin} (createdOn, user, password, hostname, vhost, level) VALUES (:createdOn, :user, :password, :hostname, :vhost, :level)""" @@ -348,8 +353,8 @@ class Base: def db_init(self) -> tuple[Engine, Connection]: - db_directory = self.DEFENDER_DB_PATH - full_path_db = self.DEFENDER_DB_PATH + self.DEFENDER_DB_NAME + db_directory = self.Config.db_path + full_path_db = self.Config.db_path + self.Config.db_name if not os.path.exists(db_directory): os.makedirs(db_directory) @@ -361,14 +366,14 @@ class Base: def __create_db(self) -> None: - table_logs = f'''CREATE TABLE IF NOT EXISTS {self.DB_SCHEMA['logs']} ( + table_logs = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_log} ( id INTEGER PRIMARY KEY AUTOINCREMENT, datetime TEXT, server_msg TEXT ) ''' - table_cmds = f'''CREATE TABLE IF NOT EXISTS {self.DB_SCHEMA['commandes']} ( + table_cmds = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_commande} ( id INTEGER PRIMARY KEY AUTOINCREMENT, datetime TEXT, user TEXT, @@ -376,7 +381,7 @@ class Base: ) ''' - table_modules = f'''CREATE TABLE IF NOT EXISTS {self.DB_SCHEMA['modules']} ( + table_modules = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_module} ( id INTEGER PRIMARY KEY AUTOINCREMENT, datetime TEXT, user TEXT, @@ -384,7 +389,7 @@ class Base: ) ''' - table_admins = f'''CREATE TABLE IF NOT EXISTS {self.DB_SCHEMA['admins']} ( + table_admins = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_admin} ( id INTEGER PRIMARY KEY AUTOINCREMENT, createdOn TEXT, user TEXT, @@ -464,6 +469,17 @@ class Base: except ValueError: return False + def decode_ip(self, ip_b64encoded: str) -> Union[str, None]: + + binary_ip = b64decode(ip_b64encoded) + try: + decoded_ip = ipaddress.ip_address(binary_ip) + + return decoded_ip.exploded + except ValueError as ve: + self.logs.critical(f'This remote ip is not valid : {ve}') + return None + def get_random(self, lenght:int) -> str: """ Retourn une chaîne aléatoire en fonction de la longueur spécifiée. diff --git a/core/irc.py b/core/irc.py index df9b802..e68c436 100644 --- a/core/irc.py +++ b/core/irc.py @@ -3,6 +3,7 @@ from ssl import SSLSocket from datetime import datetime, timedelta from typing import Union from core.loadConf import Config +from core.Model import User, Admin from core.base import Base class Irc: @@ -10,8 +11,9 @@ class Irc: def __init__(self) -> 'Irc': self.defender_connexion_datetime = datetime.now() # Date et heure de la premiere connexion de Defender - self.db_uid = {} # Definir la variable qui contiendra la liste des utilisateurs connectés au réseau - self.db_admin = {} # Definir la variable qui contiendra la liste des administrateurs + self.first_score: int = 100 + #self.db_uid = {} # Definir la variable qui contiendra la liste des utilisateurs connectés au réseau + #self.db_admin = {} # Definir la variable qui contiendra la liste des administrateurs self.db_chan = [] # Definir la variable qui contiendra la liste des salons self.loaded_classes:dict[str, 'Irc'] = {} # Definir la variable qui contiendra la liste modules chargés self.beat = 30 # Lancer toutes les 30 secondes des actions de nettoyages @@ -24,7 +26,7 @@ class Irc: self.CHARSET = ['utf-8', 'iso-8859-1'] # Charset utiliser pour décoder/encoder les messages self.SSL_VERSION = None # Version SSL - self.Config = Config().ConfigModel + self.Config = Config().ConfigObject # Liste des commandes internes du bot self.commands_level = { @@ -41,6 +43,8 @@ class Irc: self.commands.append(command) self.Base = Base(self.Config) + self.User = User(self.Base) + self.Admin = Admin(self.Base) self.Base.create_thread(func=self.heartbeat, func_args=(self.beat, )) ############################################## @@ -113,7 +117,7 @@ class Irc: # Reload configuration self.Base.logs.debug('Reloading configuration') - self.Config = Config().ConfigModel + self.Config = Config().ConfigObject self.Base = Base(self.Config) self.__create_socket() @@ -186,7 +190,7 @@ class Irc: sid = self.Config.SERVEUR_ID service_id = self.Config.SERVICE_ID - version = self.Base.DEFENDER_VERSION + version = self.Config.current_version unixtime = self.Base.get_unixtime() # Envoyer un message d'identification @@ -262,7 +266,7 @@ class Irc: Returns: None: Aucun retour requis, elle charge puis c'est tout """ - result = self.Base.db_execute_query(f"SELECT module FROM {self.Base.DB_SCHEMA['modules']}") + result = self.Base.db_execute_query(f"SELECT module FROM {self.Config.table_module}") for r in result.fetchall(): self.load_module('sys', r[0], True) @@ -399,120 +403,42 @@ class Irc: except Exception as e: self.Base.logs.error(f"Something went wrong with a module you want to load : {e}") - def insert_db_uid(self, uid:str, nickname:str, username:str, hostname:str, umodes:str, vhost:str, isWebirc: bool) -> None: - - if uid in self.db_uid: - return None - - self.db_uid[uid] = { - 'nickname': nickname, - 'username': username, - 'hostname': hostname, - 'umodes': umodes, - 'vhost': vhost, - 'isWebirc': isWebirc, - 'datetime': datetime.now() - } - - self.db_uid[nickname] = { - 'uid': uid, - 'username': username, - 'hostname': hostname, - 'umodes': umodes, - 'vhost': vhost, - 'isWebirc': isWebirc, - 'datetime': datetime.now() - } - - return None - - def update_db_uid(self, uid:str, newnickname:str) -> None: - - # Récupérer l'ancien nickname - oldnickname = self.db_uid[uid]['nickname'] - - # Enregistrement du nouveau nickname - self.db_uid[newnickname] = { - 'uid': uid, - 'username': self.db_uid[uid]['username'], - 'hostname': self.db_uid[uid]['hostname'], - 'umodes': self.db_uid[uid]['umodes'], - 'vhost': self.db_uid[uid]['vhost'] - } - - # Modification du nickname dans la ligne UID - self.db_uid[uid]['nickname'] = newnickname - - # Supprimer l'ancien nickname - if oldnickname in self.db_uid: - del self.db_uid[oldnickname] - else: - self.Base.logs.debug(f"L'ancien nickname {oldnickname} n'existe pas dans UID_DB") - response = False - - self.Base.logs.debug(f"{oldnickname} changed to {newnickname}") - - return None - - def delete_db_uid(self, uid:str) -> None: - - uid_reel = self.get_uid(uid) - nickname = self.get_nickname(uid_reel) - - if uid_reel in self.db_uid: - del self.db_uid[uid] - - if nickname in self.db_uid: - del self.db_uid[nickname] - - return None - def insert_db_admin(self, uid:str, level:int) -> None: - if not uid in self.db_uid: + if self.User.get_User(uid) is None: return None + + getUser = self.User.get_User(uid) - nickname = self.db_uid[uid]['nickname'] - username = self.db_uid[uid]['username'] - hostname = self.db_uid[uid]['hostname'] - umodes = self.db_uid[uid]['umodes'] - vhost = self.db_uid[uid]['vhost'] + nickname = getUser.nickname + username = getUser.username + hostname = getUser.hostname + umodes = getUser.umodes + vhost = getUser.vhost level = int(level) - self.db_admin[uid] = { - 'nickname': nickname, - 'username': username, - 'hostname': hostname, - 'umodes': umodes, - 'vhost': vhost, - 'datetime': self.Base.get_datetime(), - 'level': level - } - - self.db_admin[nickname] = { - 'uid': uid, - 'username': username, - 'hostname': hostname, - 'umodes': umodes, - 'vhost': vhost, - 'datetime': self.Base.get_datetime(), - 'level': level - } + self.Admin.insert( + self.Admin.AdminModel( + uid=uid, + nickname=nickname, + username=username, + hostname=hostname, + umodes=umodes, + vhost=vhost, + level=level, + connexion_datetime=datetime.now() + ) + ) return None def delete_db_admin(self, uid:str) -> None: - if not uid in self.db_admin: + if self.Admin.get_Admin(uid) is None: return None - nickname_admin = self.db_admin[uid]['nickname'] - - if uid in self.db_admin: - del self.db_admin[uid] - - if nickname_admin in self.db_admin: - del self.db_admin[nickname_admin] + if not self.Admin.delete(uid): + self.Base.logs.critical(f'UID: {uid} was not deleted') return None @@ -541,7 +467,7 @@ class Irc: def create_defender_user(self, nickname:str, level: int, password:str) -> str: - nickname = self.get_nickname(nickname) + nickname = self.User.get_nickname(nickname) response = '' if level > 4: @@ -560,14 +486,14 @@ class Irc: spassword = self.Base.crypt_password(password) mes_donnees = {'admin': nickname} - query_search_user = f"SELECT id FROM {self.Base.DB_SCHEMA['admins']} WHERE user=:admin" + query_search_user = f"SELECT id FROM {self.Config.table_admin} WHERE user=:admin" r = self.Base.db_execute_query(query_search_user, mes_donnees) exist_user = r.fetchone() # On verifie si le user exist dans la base if not exist_user: mes_donnees = {'datetime': self.Base.get_datetime(), 'user': nickname, 'password': spassword, 'hostname': hostname, 'vhost': vhost, 'level': level} - self.Base.db_execute_query(f'''INSERT INTO {self.Base.DB_SCHEMA['admins']} + self.Base.db_execute_query(f'''INSERT INTO {self.Config.table_admin} (createdOn, user, password, hostname, vhost, level) VALUES (:datetime, :user, :password, :hostname, :vhost, :level) ''', mes_donnees) @@ -581,41 +507,15 @@ class Irc: self.Base.logs.info(response) return response - def get_uid(self, uidornickname:str) -> Union[str, None]: - - uid_recherche = uidornickname - response = None - for uid, value in self.db_uid.items(): - if uid == uid_recherche: - if 'nickname' in value: - response = uid - if 'uid' in value: - response = value['uid'] - - return response - - def get_nickname(self, uidornickname:str) -> Union[str, None]: - - nickname_recherche = uidornickname - - response = None - for nickname, value in self.db_uid.items(): - if nickname == nickname_recherche: - if 'nickname' in value: - response = value['nickname'] - if 'uid' in value: - response = nickname - - return response - - def is_cmd_allowed(self,nickname:str, cmd:str) -> bool: + def is_cmd_allowed(self, nickname:str, cmd:str) -> bool: # Vérifier si le user est identifié et si il a les droits is_command_allowed = False - uid = self.get_uid(nickname) + uid = self.User.get_uid(nickname) + get_admin = self.Admin.get_Admin(uid) - if uid in self.db_admin: - admin_level = self.db_admin[uid]['level'] + if not get_admin is None: + admin_level = get_admin.level for ref_level, ref_commands in self.commands_level.items(): # print(f"LevelNo: {ref_level} - {ref_commands} - {admin_level}") @@ -651,6 +551,18 @@ class Irc: return None + def thread_check_for_new_version(self, fromuser: str) -> None: + + dnickname = self.Config.SERVICE_NICKNAME + + if self.Base.check_for_new_version(True): + self.send2socket(f':{dnickname} NOTICE {fromuser} : New Version available : {self.Config.current_version} >>> {self.Config.latest_version}') + self.send2socket(f':{dnickname} NOTICE {fromuser} : Please run (git pull origin main) in the current folder') + else: + self.send2socket(f':{dnickname} NOTICE {fromuser} : You have the latest version of defender') + + return None + def cmd(self, data:list) -> None: try: @@ -713,6 +625,8 @@ class Irc: try: # if self.Config.ABUSEIPDB == 1: # self.Base.create_thread(self.abuseipdb_scan, (cmd[2], )) + self.first_connexion_ip = cmd[2] + self.first_score = cmd[3] pass # Possibilité de déclancher les bans a ce niveau. except IndexError as ie: @@ -735,10 +649,13 @@ class Irc: hsid = str(cmd[0]).replace(':','') if hsid == self.HSID: if self.INIT == 1: - if self.Base.check_for_new_version(): - version = f'{self.Base.DEFENDER_VERSION} >>> {self.Base.LATEST_DEFENDER_VERSION}' + current_version = self.Config.current_version + latest_version = self.Config.latest_version + + if current_version != latest_version: + version = f'{current_version} >>> {latest_version}' else: - version = f'{self.Base.DEFENDER_VERSION}' + version = f'{current_version}' self.send2socket(f"MODE {self.Config.SERVICE_NICKNAME} +B") self.send2socket(f"JOIN {self.Config.SERVICE_CHANLOG}") @@ -764,13 +681,11 @@ class Irc: self.Base.logs.info(f"# VERSION : {version} ") self.Base.logs.info(f"################################################") - if self.Base.check_for_new_version(): + if self.Base.check_for_new_version(False): self.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} : New Version available {version}") # Initialisation terminé aprés le premier PING self.INIT = 0 - # self.send2socket(f':{self.Config.SERVICE_ID} PING :{hsid}') - # print(self.db_uid) case _: pass @@ -784,7 +699,7 @@ class Irc: # :001N1WD7L QUIT :Quit: free_znc_1 cmd.pop(0) uid_who_quit = str(cmd[0]).replace(':', '') - self.delete_db_uid(uid_who_quit) + self.User.delete(uid_who_quit) case 'PONG': # ['@msgid=aTNJhp17kcPboF5diQqkUL;time=2023-12-28T20:35:58.411Z', ':irc.deb.biz.st', 'PONG', 'irc.deb.biz.st', ':Dev-PyDefender'] @@ -798,8 +713,7 @@ class Irc: cmd.pop(0) uid = str(cmd[0]).replace(':','') newnickname = cmd[2] - - self.update_db_uid(uid, newnickname) + self.User.update(uid, newnickname) case 'SJOIN': # ['@msgid=ictnEBhHmTUHzkEeVZl6rR;time=2023-12-28T20:03:18.482Z', ':001', 'SJOIN', '1702139101', '#stats', '+nst', ':@001SB890A', '@00BAAAAAI'] @@ -808,7 +722,9 @@ class Irc: self.insert_db_chan(channel) case 'UID': - + # ['@s2s-md/geoip=cc=GB|cd=United\\sKingdom|asn=16276|asname=OVH\\sSAS;s2s-md/tls_cipher=TLSv1.3-TLS_CHACHA20_POLY1305_SHA256;s2s-md/creationtime=1721564601', + # ':001', 'UID', 'albatros', '0', '1721564597', 'albatros', 'vps-91b2f28b.vps.ovh.net', + # '001HB8G04', '0', '+iwxz', 'Clk-A62F1D18.vps.ovh.net', 'Clk-A62F1D18.vps.ovh.net', 'MyZBwg==', ':...'] if 'webirc' in cmd[0]: isWebirc = True else: @@ -820,8 +736,27 @@ class Irc: hostname = str(cmd[7]) umodes = str(cmd[10]) vhost = str(cmd[11]) + if not 'S' in umodes: + remote_ip = self.Base.decode_ip(str(cmd[13])) + else: + remote_ip = '127.0.0.1' - self.insert_db_uid(uid, nickname, username, hostname, umodes, vhost, isWebirc) + score_connexion = str(self.first_score) + + self.User.insert( + self.User.UserModel( + uid=uid, + nickname=nickname, + username=username, + hostname=hostname, + umodes=umodes, + vhost=vhost, + isWebirc=isWebirc, + remote_ip=remote_ip, + score_connexion=score_connexion, + connexion_datetime=datetime.now() + ) + ) for classe_name, classe_object in self.loaded_classes.items(): classe_object.cmd(cmd_to_send) @@ -842,7 +777,8 @@ class Irc: else: self.Base.logs.info(f'{cmd}') # user_trigger = get_user.split('!')[0] - user_trigger = self.get_nickname(get_uid_or_nickname) + # user_trigger = self.get_nickname(get_uid_or_nickname) + user_trigger = self.User.get_nickname(get_uid_or_nickname) dnickname = self.Config.SERVICE_NICKNAME pattern = fr'(:\{self.Config.SERVICE_PREFIX})(.*)$' @@ -874,7 +810,7 @@ class Irc: # Réponse a un CTCP VERSION if arg[0] == '\x01VERSION\x01': - self.send2socket(f':{dnickname} NOTICE {user_trigger} :\x01VERSION Service {self.Config.SERVICE_NICKNAME} V{self.Base.DEFENDER_VERSION}\x01') + self.send2socket(f':{dnickname} NOTICE {user_trigger} :\x01VERSION Service {self.Config.SERVICE_NICKNAME} V{self.Config.current_version}\x01') return False # Réponse a un TIME @@ -896,7 +832,7 @@ class Irc: return False cmd_to_send = convert_to_string.replace(':','') - self.Base.log_cmd(self.get_nickname(user_trigger), cmd_to_send) + self.Base.log_cmd(self.User.get_nickname(user_trigger), cmd_to_send) self._hcmds(user_trigger, arg) @@ -916,8 +852,8 @@ class Irc: def _hcmds(self, user: str, cmd:list) -> None: - fromuser = self.get_nickname(user) # Nickname qui a lancé la commande - uid = self.get_uid(fromuser) # Récuperer le uid de l'utilisateur + fromuser = self.User.get_nickname(user) # Nickname qui a lancé la commande + uid = self.User.get_uid(fromuser) # Récuperer le uid de l'utilisateur # Defender information dnickname = self.Config.SERVICE_NICKNAME # Defender nickname @@ -942,7 +878,7 @@ class Irc: case 'notallowed': try: current_command = cmd[0] - self.send2socket(f':{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR["rouge"]}{current_command}{self.Config.CONFIG_COLOR["noire"]} ] - Accès Refusé à {self.get_nickname(fromuser)}') + self.send2socket(f':{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR["rouge"]}{current_command}{self.Config.CONFIG_COLOR["noire"]} ] - Accès Refusé à {self.User.get_nickname(fromuser)}') self.send2socket(f':{dnickname} NOTICE {fromuser} : Accès Refusé') except IndexError as ie: self.Base.logs.error(f'{ie}') @@ -950,29 +886,29 @@ class Irc: case 'deauth': current_command = cmd[0] - uid_to_deauth = self.get_uid(fromuser) + uid_to_deauth = self.User.get_uid(fromuser) self.delete_db_admin(uid_to_deauth) - self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['rouge']}{current_command}{self.Config.CONFIG_COLOR['noire']} ] - {self.get_nickname(fromuser)} est désormais déconnecter de {dnickname}") + self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['rouge']}{current_command}{self.Config.CONFIG_COLOR['noire']} ] - {self.User.get_nickname(fromuser)} est désormais déconnecter de {dnickname}") case 'auth': # ['auth', 'adator', 'password'] current_command = cmd[0] - user_to_log = self.get_nickname(cmd[1]) + user_to_log = self.User.get_nickname(cmd[1]) password = cmd[2] if not user_to_log is None: mes_donnees = {'user': user_to_log, 'password': self.Base.crypt_password(password)} - query = f"SELECT id, level FROM {self.Base.DB_SCHEMA['admins']} WHERE user = :user AND password = :password" + query = f"SELECT id, level FROM {self.Config.table_admin} WHERE user = :user AND password = :password" result = self.Base.db_execute_query(query, mes_donnees) user_from_db = result.fetchone() if not user_from_db is None: - uid_user = self.get_uid(user_to_log) + uid_user = self.User.get_uid(user_to_log) self.insert_db_admin(uid_user, user_from_db[1]) - self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['verte']}{current_command}{self.Config.CONFIG_COLOR['noire']} ] - {self.get_nickname(fromuser)} est désormais connecté a {dnickname}") + self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['verte']}{current_command}{self.Config.CONFIG_COLOR['noire']} ] - {self.User.get_nickname(fromuser)} est désormais connecté a {dnickname}") self.send2socket(f":{self.Config.SERVICE_NICKNAME} NOTICE {fromuser} :Connexion a {dnickname} réussie!") else: - self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['rouge']}{current_command}{self.Config.CONFIG_COLOR['noire']} ] - {self.get_nickname(fromuser)} a tapé un mauvais mot de pass") + self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['rouge']}{current_command}{self.Config.CONFIG_COLOR['noire']} ] - {self.User.get_nickname(fromuser)} a tapé un mauvais mot de pass") self.send2socket(f":{self.Config.SERVICE_NICKNAME} NOTICE {fromuser} :Mot de passe incorrecte") else: @@ -1007,9 +943,14 @@ class Irc: self.send2socket(f':{dnickname} NOTICE {fromuser} : .editaccess [USER] [NEWPASSWORD] [NEWLEVEL]') return None - current_user = self.get_nickname(fromuser) - current_uid = self.get_uid(fromuser) - current_user_level = self.db_admin[current_uid]['level'] + get_admin = self.Admin.get_Admin(fromuser) + if get_admin is None: + self.send2socket(f':{dnickname} NOTICE {fromuser} : This user {fromuser} has no Admin access') + return None + + current_user = self.User.get_nickname(fromuser) + current_uid = self.User.get_uid(fromuser) + current_user_level = get_admin.level if user_new_level > 5: self.send2socket(f':{dnickname} NOTICE {fromuser} : Maximum authorized level is 5') @@ -1017,7 +958,7 @@ class Irc: # Rechercher le user dans la base de données. mes_donnees = {'user': user_to_edit} - query = f"SELECT user, level FROM {self.Base.DB_SCHEMA['admins']} WHERE user = :user" + query = f"SELECT user, level FROM {self.Config.table_admin} WHERE user = :user" result = self.Base.db_execute_query(query, mes_donnees) isUserExist = result.fetchone() @@ -1033,7 +974,7 @@ class Irc: # Le user existe dans la base de données data_to_update = {'user': user_to_edit, 'password': user_password, 'level': user_new_level} - sql_update = f"UPDATE {self.Base.DB_SCHEMA['admins']} SET level = :level, password = :password WHERE user = :user" + sql_update = f"UPDATE {self.Config.table_admin} SET level = :level, password = :password WHERE user = :user" exec_query = self.Base.db_execute_query(sql_update, data_to_update) if exec_query.rowcount > 0: self.send2socket(f':{dnickname} NOTICE {fromuser} : User {user_to_edit} has been modified with level {str(user_new_level)}') @@ -1059,14 +1000,20 @@ class Irc: if len(cmd) < 3: self.send2socket(f':{dnickname} NOTICE {fromuser} : .delaccess [USER] [CONFIRMUSER]') return None + + get_admin = self.Admin.get_Admin(fromuser) + + if get_admin is None: + self.send2socket(f':{dnickname} NOTICE {fromuser} : This user {fromuser} has no admin access') + return None - current_user = self.get_nickname(fromuser) - current_uid = self.get_uid(fromuser) - current_user_level = self.db_admin[current_uid]['level'] + current_user = self.User.get_nickname(fromuser) + current_uid = self.User.get_uid(fromuser) + current_user_level = get_admin.level # Rechercher le user dans la base de données. mes_donnees = {'user': user_to_del} - query = f"SELECT user, level FROM {self.Base.DB_SCHEMA['admins']} WHERE user = :user" + query = f"SELECT user, level FROM {self.Config.table_admin} WHERE user = :user" result = self.Base.db_execute_query(query, mes_donnees) info_user = result.fetchone() @@ -1078,7 +1025,7 @@ class Irc: return None data_to_delete = {'user': user_to_del} - sql_delete = f"DELETE FROM {self.Base.DB_SCHEMA['admins']} WHERE user = :user" + sql_delete = f"DELETE FROM {self.Config.table_admin} WHERE user = :user" exec_query = self.Base.db_execute_query(sql_delete, data_to_delete) if exec_query.rowcount > 0: self.send2socket(f':{dnickname} NOTICE {fromuser} : User {user_to_del} has been deleted !') @@ -1090,8 +1037,9 @@ class Irc: help = '' count_level_definition = 0 - if uid in self.db_admin: - user_level = self.db_admin[uid]['level'] + get_admin = self.Admin.get_Admin(uid) + if not get_admin is None: + user_level = get_admin.level else: user_level = 0 @@ -1223,7 +1171,7 @@ class Irc: self.Base.logs.debug(self.loaded_classes) - results = self.Base.db_execute_query(f'SELECT module FROM {self.Base.DB_SCHEMA["modules"]}') + results = self.Base.db_execute_query(f'SELECT module FROM {self.Config.table_module}') results = results.fetchall() if len(results) == 0: @@ -1253,7 +1201,7 @@ class Irc: self.send2socket(f':{dnickname} NOTICE {fromuser} : {uptime}') case 'copyright': - self.send2socket(f':{dnickname} NOTICE {fromuser} : # Defender V.{self.Base.DEFENDER_VERSION} Developped by adator® and dktmb® #') + self.send2socket(f':{dnickname} NOTICE {fromuser} : # Defender V.{self.Config.current_version} Developped by adator® and dktmb® #') case 'sentinel': # .sentinel on @@ -1273,12 +1221,10 @@ class Irc: case 'checkversion': - if self.Base.check_for_new_version(): - self.send2socket(f':{dnickname} NOTICE {fromuser} : New Version available : {self.Base.DEFENDER_VERSION} >>> {self.Base.LATEST_DEFENDER_VERSION}') - self.send2socket(f':{dnickname} NOTICE {fromuser} : Please run (git pull origin main) in the current folder') - else: - self.send2socket(f':{dnickname} NOTICE {fromuser} : You have the latest version of defender') - pass + self.Base.create_thread( + self.thread_check_for_new_version, + (fromuser, ) + ) case _: pass diff --git a/core/loadConf.py b/core/loadConf.py index 0c15303..15d5f95 100644 --- a/core/loadConf.py +++ b/core/loadConf.py @@ -1,4 +1,5 @@ -import json, os +import json +from os import sep from typing import Union from dataclasses import dataclass, field @@ -45,20 +46,44 @@ class ConfigDataModel: DEBUG_LEVEL: int # Le niveau des logs DEBUG 10 | INFO 20 | WARNING 30 | ERROR 40 | CRITICAL 50 - CONFIG_COLOR: dict + CONFIG_COLOR: dict[str, str] + + table_admin: str + table_commande: str + table_log: str + table_module: str + + current_version: str + latest_version: str + db_name: str + db_path: str def __post_init__(self): # Initialiser SERVICE_ID après la création de l'objet self.SERVICE_ID:str = f"{self.SERVEUR_ID}AAAAAB" - class Config: def __init__(self): - import_config = self.__load_json_configuration() + self.ConfigObject: ConfigDataModel = self.__load_service_configuration() + return None - ConfigModel = ConfigDataModel( + def __load_json_service_configuration(self): + + conf_filename = f'core{sep}configuration.json' + with open(conf_filename, 'r') as configuration_data: + configuration:dict[str, Union[str, int, list, dict]] = json.load(configuration_data) + + for key, value in configuration['CONFIG_COLOR'].items(): + configuration['CONFIG_COLOR'][key] = value.encode('utf-8').decode('unicode_escape') + + return configuration + + def __load_service_configuration(self) -> ConfigDataModel: + import_config = self.__load_json_service_configuration() + + ConfigObject: ConfigDataModel = ConfigDataModel( SERVEUR_IP=import_config["SERVEUR_IP"], SERVEUR_HOSTNAME=import_config["SERVEUR_HOSTNAME"], SERVEUR_LINK=import_config["SERVEUR_LINK"], @@ -87,19 +112,15 @@ class Config: WHITELISTED_IP=import_config["WHITELISTED_IP"], GLINE_DURATION=import_config["GLINE_DURATION"], DEBUG_LEVEL=import_config["DEBUG_LEVEL"], - CONFIG_COLOR=import_config["CONFIG_COLOR"] + CONFIG_COLOR=import_config["CONFIG_COLOR"], + table_admin='sys_admins', + table_commande='sys_commandes', + table_log='sys_logs', + table_module='sys_modules', + current_version='', + latest_version='', + db_name='defender', + db_path=f'db{sep}' ) - self.ConfigModel = ConfigModel - return None - - def __load_json_configuration(self): - - conf_filename = f'core{os.sep}configuration.json' - with open(conf_filename, 'r') as configuration_data: - configuration:dict[str, Union[str, int, list, dict]] = json.load(configuration_data) - - for key, value in configuration['CONFIG_COLOR'].items(): - configuration['CONFIG_COLOR'][key] = value.encode('utf-8').decode('unicode_escape') - - return configuration + return ConfigObject diff --git a/mods/mod_defender.py b/mods/mod_defender.py index f96416e..4e3f719 100644 --- a/mods/mod_defender.py +++ b/mods/mod_defender.py @@ -21,6 +21,7 @@ class Defender(): self.Irc = ircInstance # Ajouter l'object mod_irc a la classe ( Obligatoire ) self.Config = ircInstance.Config # Ajouter la configuration a la classe ( Obligatoire ) + self.User = ircInstance.User # Importer les liste des User connectés self.Base = ircInstance.Base # Ajouter l'objet Base au module ( Obligatoire ) self.timeout = self.Config.API_TIMEOUT # API Timeout @@ -37,7 +38,7 @@ class Defender(): self.localscan_isRunning:bool = True self.reputationTimer_isRunning:bool = True - self.Irc.Base.logs.info(f'Module {self.__class__.__name__} loaded ...') + self.Base.logs.info(f'Module {self.__class__.__name__} loaded ...') # Créer les nouvelles commandes du module self.commands_level = { @@ -302,12 +303,12 @@ class Defender(): def update_db_reputation(self, uidornickname:str, newnickname:str) -> None: - uid = self.Irc.get_uid(uidornickname) + uid = self.User.get_uid(uidornickname) currentDateTime = self.Base.get_datetime() secret_code = self.Base.get_random(8) - if not uid in self.Irc.db_uid: - self.Irc.Base.logs.error(f'Etrange UID {uid}') + if self.User.get_uid(uid) is None: + self.Base.logs.error(f'Etrange UID {uid}') return None if uid in self.db_reputation: @@ -315,7 +316,7 @@ class Defender(): self.db_reputation[uid]['updated_datetime'] = currentDateTime self.db_reputation[uid]['secret_code'] = secret_code else: - self.Irc.Base.logs.error(f"L'UID {uid} n'existe pas dans REPUTATION_DB") + self.Base.logs.error(f"L'UID {uid} n'existe pas dans REPUTATION_DB") return None @@ -330,12 +331,12 @@ class Defender(): if uid in self.db_reputation: # Si le nickname existe dans le dictionnaire alors le supprimer del self.db_reputation[uid] - self.Irc.Base.logs.debug(f"Le UID {uid} a été supprimé du REPUTATION_DB") + self.Base.logs.debug(f"Le UID {uid} a été supprimé du REPUTATION_DB") def insert_db_trusted(self, uid: str, nickname:str) -> None: - uid = self.Irc.get_uid(uid) - nickname = self.Irc.get_nickname(nickname) + uid = self.User.get_uid(uid) + nickname = self.User.get_nickname(nickname) query = "SELECT id FROM def_trusted WHERE user = ?" exec_query = self.Base.db_execute_query(query, {"user": nickname}) @@ -378,13 +379,12 @@ class Defender(): int: Temps de connexion de l'utilisateur en secondes """ - get_uid = self.Irc.get_uid(uidornickname) - - if not get_uid in self.Irc.db_uid: + get_user = self.User.get_User(uidornickname) + if get_user is None: return 0 # Convertir la date enregistrée dans UID_DB en un objet {datetime} - connected_time_string = self.Irc.db_uid[get_uid]['datetime'] + connected_time_string = get_user.connexion_datetime if type(connected_time_string) == datetime: connected_time = connected_time_string else: @@ -439,15 +439,15 @@ class Defender(): self.Irc.send2socket(f":{service_id} MODE {chan} +b {jailed_nickname}!*@*") self.Irc.send2socket(f":{service_id} KICK {chan} {jailed_nickname}") - self.Irc.Base.logs.info(f"system_reputation : {jailed_nickname} à été capturé par le système de réputation") + self.Base.logs.info(f"system_reputation : {jailed_nickname} à été capturé par le système de réputation") # self.Irc.create_ping_timer(int(self.defConfig['reputation_timer']) * 60, 'Defender', 'system_reputation_timer') # self.Base.create_timer(int(self.defConfig['reputation_timer']) * 60, self.system_reputation_timer) else: - self.Irc.Base.logs.info(f"system_reputation : {jailed_nickname} à été supprimé du système de réputation car connecté via WebIrc ou il est dans la 'Trusted list'") + self.Base.logs.info(f"system_reputation : {jailed_nickname} à été supprimé du système de réputation car connecté via WebIrc ou il est dans la 'Trusted list'") self.delete_db_reputation(uid) except IndexError as e: - self.Irc.Base.logs.error(f"system_reputation : {str(e)}") + self.Base.logs.error(f"system_reputation : {str(e)}") def system_reputation_timer(self) -> None: try: @@ -488,11 +488,11 @@ class Defender(): self.Irc.send2socket(f":{service_id} MODE {chan} -b {self.db_reputation[uid]['nickname']}!*@*") # Lorsqu'un utilisateur quitte, il doit être supprimé de {UID_DB}. - self.Irc.delete_db_uid(uid) + self.User.delete(uid) self.delete_db_reputation(uid) except AssertionError as ae: - self.Irc.Base.logs.error(f'Assertion Error -> {ae}') + self.Base.logs.error(f'Assertion Error -> {ae}') def thread_reputation_timer(self) -> None: try: @@ -542,8 +542,8 @@ class Defender(): color_red = self.Config.CONFIG_COLOR['rouge'] color_bold = self.Config.CONFIG_COLOR['gras'] - get_detected_uid = self.Irc.get_uid(detected_user) - get_detected_nickname = self.Irc.get_nickname(detected_user) + get_detected_uid = self.User.get_uid(detected_user) + get_detected_nickname = self.User.get_nickname(detected_user) unixtime = self.Base.get_unixtime() get_diff_secondes = 0 @@ -961,7 +961,7 @@ class Defender(): cmd.pop(0) user_trigger = str(cmd[0]).replace(':','') channel = cmd[2] - find_nickname = self.Irc.get_nickname(user_trigger) + find_nickname = self.User.get_nickname(user_trigger) self.flood(find_nickname, channel) case 'UID': @@ -1130,8 +1130,8 @@ class Defender(): case 'code': try: release_code = cmd[1] - jailed_nickname = self.Irc.get_nickname(fromuser) - jailed_UID = self.Irc.get_uid(fromuser) + jailed_nickname = self.User.get_nickname(fromuser) + jailed_UID = self.User.get_uid(fromuser) if not jailed_UID in self.db_reputation: self.Irc.send2socket(f":{dnickname} NOTICE {fromuser} : No code is requested ...") return False @@ -1160,6 +1160,7 @@ class Defender(): self.Irc.send2socket(f":{service_id} SAPART {jailed_nickname} {jailed_salon}") self.Irc.send2socket(f":{service_id} SAJOIN {jailed_nickname} {welcome_salon}") self.Irc.send2socket(f":{link} REPUTATION {jailed_IP} {int(reputation_seuil) + 1}") + self.User.get_User(jailed_UID).score_connexion = reputation_seuil + 1 self.Irc.send2socket(f":{service_id} PRIVMSG {jailed_nickname} :[{color_green} MOT DE PASS CORRECT {color_black}] : You have now the right to enjoy the network !") else: @@ -1640,31 +1641,24 @@ class Defender(): case 'info': try: nickoruid = cmd[1] - uid_query = None - nickname_query = None + UserObject = self.User.get_User(nickoruid) - if not self.Irc.get_nickname(nickoruid) is None: - nickname_query = self.Irc.get_nickname(nickoruid) - - if not self.Irc.get_uid(nickoruid) is None: - uid_query = self.Irc.get_uid(nickoruid) - - if nickname_query is None and uid_query is None: - self.Irc.send2socket(f":{dnickname} NOTICE {fromuser} : This user {nickoruid} doesn't exist") + if not UserObject is None: + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : UID : {UserObject.uid}') + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : NICKNAME : {UserObject.nickname}') + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : USERNAME : {UserObject.username}') + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : HOSTNAME : {UserObject.hostname}') + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : IP : {UserObject.remote_ip}') + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : REPUTATION : {UserObject.score_connexion}') + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : VHOST : {UserObject.vhost}') + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : MODES : {UserObject.umodes}') + self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : CONNECTION TIME : {UserObject.connexion_datetime}') else: - self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : UID : {uid_query}') - self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : NICKNAME : {self.Irc.db_uid[uid_query]["nickname"]}') - self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : USERNAME : {self.Irc.db_uid[uid_query]["username"]}') - self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : HOSTNAME : {self.Irc.db_uid[uid_query]["hostname"]}') - self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : VHOST : {self.Irc.db_uid[uid_query]["vhost"]}') - self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : MODES : {self.Irc.db_uid[uid_query]["umodes"]}') - self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : CONNECTION TIME : {self.Irc.db_uid[uid_query]["datetime"]}') + self.Irc.send2socket(f":{dnickname} NOTICE {fromuser} : This user {nickoruid} doesn't exist") + except KeyError as ke: self.Base.logs.warning(f"Key error info user : {ke}") case 'show_users': - for uid, infousers in self.Irc.db_uid.items(): - # print(uid + " " + str(infousers)) - for info in infousers: - if info == 'nickname': - self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :UID : {uid} - isWebirc: {infousers['isWebirc']} - {info}: {infousers[info]}") + for db_user in self.User.UID_DB: + self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :UID : {db_user.uid} - isWebirc: {db_user.isWebirc} - Nickname: {db_user.nickname}")