From 21a2619f4934770f1490670e87a225dcd1868a12 Mon Sep 17 00:00:00 2001 From: adator <85586985+adator85@users.noreply.github.com> Date: Sun, 10 Aug 2025 02:31:50 +0200 Subject: [PATCH] Updating mod_clone by adding action on groups. reloading the module is now using Settings.set_cache and get_cache --- core/base.py | 2 +- core/classes/commands.py | 59 ++++ core/classes/protocols/unreal6.py | 6 - core/classes/settings.py | 33 ++ core/definition.py | 7 + core/irc.py | 85 ++++- core/loader.py | 8 +- .../clone.py => mods/clone/clone_manager.py | 56 +++- mods/clone/mod_clone.py | 303 ++++++------------ mods/clone/schemas.py | 22 ++ mods/clone/threads.py | 39 +++ mods/clone/utils.py | 198 ++++++++++++ version.json | 2 +- 13 files changed, 587 insertions(+), 233 deletions(-) create mode 100644 core/classes/commands.py rename core/classes/clone.py => mods/clone/clone_manager.py (74%) create mode 100644 mods/clone/schemas.py create mode 100644 mods/clone/threads.py create mode 100644 mods/clone/utils.py diff --git a/core/base.py b/core/base.py index e62689c..a39915b 100644 --- a/core/base.py +++ b/core/base.py @@ -186,7 +186,7 @@ class Base: # Sort to reload submodules before parent modules for name, module in sorted(modules_to_reload, key=lambda x: x[0], reverse=True): try: - if 'mod_' not in name: + if 'mod_' not in name and 'schemas' not in name: importlib.reload(module) self.logs.debug(f'[LOAD_MODULE] Module {module} success') diff --git a/core/classes/commands.py b/core/classes/commands.py new file mode 100644 index 0000000..6a47c75 --- /dev/null +++ b/core/classes/commands.py @@ -0,0 +1,59 @@ +from typing import TYPE_CHECKING, Optional +from core.definition import MCommand + +if TYPE_CHECKING: + from core.base import Base + +class Command: + + DB_COMMANDS: list['MCommand'] = [] + + def __init__(self, base: 'Base'): + self.Base = base + + def build(self, new_command_obj: MCommand) -> bool: + + command = self.get_command(new_command_obj.command_name, new_command_obj.module_name) + if command is None: + self.DB_COMMANDS.append(new_command_obj) + return True + + # Update command if it exist + # Removing the object + if self.drop_command(command.command_name, command.module_name): + # Add the new object + self.DB_COMMANDS.append(new_command_obj) + return True + + return False + + def get_command(self, command_name: str, module_name: str) -> Optional[MCommand]: + + for command in self.DB_COMMANDS: + if command.command_name.lower() == command_name and command.module_name == module_name: + return command + + return None + + def drop_command(self, command_name: str, module_name: str) -> bool: + + cmd = self.get_command(command_name, module_name) + if cmd is not None: + self.DB_COMMANDS.remove(cmd) + return True + + return False + + def get_ordered_commands(self) -> list[MCommand]: + return sorted(self.DB_COMMANDS, key=lambda c: (c.command_level, c.module_name)) + + def get_commands_by_level(self, level: int = 0) -> Optional[list[MCommand]]: + + cmd_list = self.get_ordered_commands() + new_list: list[MCommand] = [] + + for cmd in cmd_list: + if cmd.command_level <= level: + new_list.append(cmd) + + return new_list \ No newline at end of file diff --git a/core/classes/protocols/unreal6.py b/core/classes/protocols/unreal6.py index 7987adc..6a47350 100644 --- a/core/classes/protocols/unreal6.py +++ b/core/classes/protocols/unreal6.py @@ -337,7 +337,6 @@ class Unrealircd6: def send_quit(self, uid: str, reason: str, print_log: True) -> None: """Send quit message - Delete uid from User object - - Delete uid from Clone object - Delete uid from Reputation object Args: @@ -345,16 +344,12 @@ class Unrealircd6: reason (str): The reason for the quit """ user_obj = self.__Irc.User.get_User(uidornickname=uid) - clone_obj = self.__Irc.Clone.get_clone(uidornickname=uid) reputationObj = self.__Irc.Reputation.get_Reputation(uidornickname=uid) if not user_obj is None: self.send2socket(f":{user_obj.uid} QUIT :{reason}", print_log=print_log) self.__Irc.User.delete(user_obj.uid) - if not clone_obj is None: - self.__Irc.Clone.delete(clone_obj.uid) - if not reputationObj is None: self.__Irc.Reputation.delete(reputationObj.uid) @@ -569,7 +564,6 @@ class Unrealircd6: self.__Irc.User.delete(uid_who_quit) self.__Irc.Client.delete(uid_who_quit) self.__Irc.Reputation.delete(uid_who_quit) - self.__Irc.Clone.delete(uid_who_quit) return None diff --git a/core/classes/settings.py b/core/classes/settings.py index 5132a45..46350df 100644 --- a/core/classes/settings.py +++ b/core/classes/settings.py @@ -1,7 +1,14 @@ +'''This class should never be reloaded. +''' from threading import Timer, Thread, RLock from socket import socket +from typing import Any class Settings: + """This Class will never be reloaded. + Means that the variables are available during + the whole life of the app + """ RUNNING_TIMERS: list[Timer] = [] RUNNING_THREADS: list[Thread] = [] @@ -13,3 +20,29 @@ class Settings: PROTOCTL_USER_MODES: list[str] = [] PROTOCTL_PREFIX: list[str] = [] + + __CACHE: dict[str, Any] = {} + """Use set_cache or get_cache instead""" + + def set_cache(self, key: str, value_to_cache: Any): + """When you want to store a variable + Ex. + ```python + set_cache('MY_KEY', {'key1': 'value1', 'key2', 'value2'}) + ``` + Args: + key (str): The key you want to add. + value_to_cache (Any): The Value you want to store. + """ + self.__CACHE[key] = value_to_cache + + def get_cache(self, key) -> Any: + """It returns the value associated to the key and finally it removes the entry""" + if self.__CACHE.get(key): + return self.__CACHE.pop(key) + + return None + + def get_cache_size(self) -> int: + return len(self.__CACHE) + \ No newline at end of file diff --git a/core/definition.py b/core/definition.py index ef71fad..af8f262 100644 --- a/core/definition.py +++ b/core/definition.py @@ -334,3 +334,10 @@ class MClone(MainModel): umodes: str = None remote_ip: str = '127.0.0.1' group: str = 'Default' + +@dataclass +class MCommand(MainModel): + module_name: str = None + command_name: str = None + description: str = None + command_level: int = 0 diff --git a/core/irc.py b/core/irc.py index 7c17e10..df33bcf 100644 --- a/core/irc.py +++ b/core/irc.py @@ -8,9 +8,10 @@ import time import traceback from ssl import SSLSocket from datetime import datetime, timedelta -from typing import Union +from typing import Optional, Union from core.loader import Loader from core.classes.protocol import Protocol +from core.classes.commands import Command class Irc: _instance = None @@ -67,9 +68,6 @@ class Irc: # Use Channel Instance self.Channel = self.Loader.Channel - # Use Clones Instance - self.Clone = self.Loader.Clone - # Use Reputation Instance self.Reputation = self.Loader.Reputation @@ -83,7 +81,11 @@ class Irc: self.first_connexion_ip: str = None # Define the dict that will contain all loaded modules - self.loaded_classes:dict[str, 'Irc'] = {} # Definir la variable qui contiendra la liste modules chargés + self.loaded_classes:dict[str, 'Irc'] = {} + + # Load Commands Utils + self.Commands = self.Loader.Commands + """Command utils""" # Global full module commands that contains level, module name, commands and description self.module_commands: dict[int, dict[str, dict[str, str]]] = {} @@ -341,9 +343,71 @@ class Irc: self.module_commands.setdefault(level, {}).setdefault(module_name, {}).update({command_name: command_description}) self.module_commands_list.append(command_name) + # Build Model. + self.Commands.build(self.Loader.Definition.MCommand(module_name, command_name, command_description, level)) + return None - def generate_help_menu(self, nickname: str) -> None: + def generate_help_menu(self, nickname: str, module: Optional[str] = None) -> None: + + # Check if the nickname is an admin + p = self.Protocol + admin_obj = self.Admin.get_Admin(nickname) + dnickname = self.Config.SERVICE_NICKNAME + color_bold = self.Config.COLORS.bold + color_nogc = self.Config.COLORS.nogc + color_blue = self.Config.COLORS.blue + color_black = self.Config.COLORS.black + color_underline = self.Config.COLORS.underline + current_level = 0 + count = 0 + if admin_obj is not None: + current_level = admin_obj.level + + p.send_notice(nick_from=dnickname,nick_to=nickname, msg=f" ***************** LISTE DES COMMANDES *****************") + header = f" {'Level':<8}| {'Command':<25}| {'Module':<15}| {'Description':<35}" + line = "-"*75 + p.send_notice(nick_from=dnickname,nick_to=nickname, msg=header) + p.send_notice(nick_from=dnickname,nick_to=nickname, msg=f" {line}") + for cmd in self.Commands.get_commands_by_level(current_level): + if module is None or cmd.module_name.lower() == module.lower(): + p.send_notice( + nick_from=dnickname, + nick_to=nickname, + msg=f" {color_black}{cmd.command_level:<8}{color_nogc}| {cmd.command_name:<25}| {cmd.module_name:<15}| {cmd.description:<35}" + ) + + return + + for level, modules in self.module_commands.items(): + if level > current_level: + break + + if count > 0: + p.send_notice(nick_from=dnickname, nick_to=nickname, msg=" ") + + p.send_notice( + nick_from=dnickname, + nick_to=nickname, + msg=f"{color_blue}{color_bold}Level {level}:{color_nogc}" + ) + + for module_name, commands in modules.items(): + if module is None or module.lower() == module_name.lower(): + p.send_notice( + nick_from=dnickname, + nick_to=nickname, + msg=f"{color_black} {color_underline}Module: {module_name}{color_nogc}" + ) + for command, description in commands.items(): + p.send_notice(nick_from=dnickname, nick_to=nickname, msg=f" {command:<20}: {description}") + + count += 1 + + p.send_notice(nick_from=dnickname,nick_to=nickname,msg=f" ***************** FIN DES COMMANDES *****************") + return None + + def generate_help_menu_bakcup(self, nickname: str) -> None: # Check if the nickname is an admin admin_obj = self.Admin.get_Admin(nickname) @@ -578,6 +642,7 @@ class Irc: channel=self.Config.SERVICE_CHANLOG ) self.Base.db_delete_module(module_name) + traceback.print_exc() def unload_module(self, mod_name: str) -> bool: """Unload a module @@ -1387,8 +1452,12 @@ class Irc: self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} ") case 'help': - - self.generate_help_menu(nickname=fromuser) + # Syntax. !help [module_name] + module_name = str(cmd[1]) if len(cmd) == 2 else None + self.generate_help_menu(nickname=fromuser, module=module_name) + + for com in self.Commands.get_ordered_commands(): + print(com) case 'load': try: diff --git a/core/loader.py b/core/loader.py index d897c04..d9ebdf5 100644 --- a/core/loader.py +++ b/core/loader.py @@ -1,4 +1,4 @@ -from core.classes import user, admin, client, channel, clone, reputation, settings +from core.classes import user, admin, client, channel, reputation, settings, commands import core.definition as df import core.base as baseModule import core.classes.config as confModule @@ -15,7 +15,7 @@ class Loader: self.BaseModule: baseModule = baseModule # Load Classes - self.Settings: settings = settings.Settings() + self.Settings: settings.Settings = settings.Settings() self.Config: df.MConfig = self.ConfModule.Configuration().ConfigObject @@ -29,6 +29,6 @@ class Loader: self.Channel: channel.Channel = channel.Channel(self.Base) - self.Clone: clone.Clone = clone.Clone(self.Base) - self.Reputation: reputation.Reputation = reputation.Reputation(self.Base) + + self.Commands: commands.Command = commands.Command(self.Base) diff --git a/core/classes/clone.py b/mods/clone/clone_manager.py similarity index 74% rename from core/classes/clone.py rename to mods/clone/clone_manager.py index 4689309..9b53a69 100644 --- a/core/classes/clone.py +++ b/mods/clone/clone_manager.py @@ -1,14 +1,16 @@ +from typing import Optional, TYPE_CHECKING from core.definition import MClone -from typing import Any, Optional -from core.base import Base -class Clone: +if TYPE_CHECKING: + from mods.clone.mod_clone import Clone + +class CloneManager: UID_CLONE_DB: list[MClone] = [] - def __init__(self, base: Base): + def __init__(self, uplink: 'Clone'): - self.Logs = base.logs + self.Logs = uplink.Logs def insert(self, new_clone_object: MClone) -> bool: """Create new Clone object @@ -61,7 +63,7 @@ class Clone: return True - def exists(self, nickname: str) -> bool: + def nickname_exists(self, nickname: str) -> bool: """Check if the nickname exist Args: @@ -91,6 +93,21 @@ class Clone: return False + def group_exists(self, groupname: str) -> bool: + """Verify if a group exist + + Args: + groupname (str): The group name + + Returns: + bool: _description_ + """ + for clone in self.UID_CLONE_DB: + if clone.group.strip().lower() == groupname.strip().lower(): + return True + + return False + def get_clone(self, uidornickname: str) -> Optional[MClone]: """Get MClone object or None @@ -108,6 +125,24 @@ class Clone: return None + def get_clones_from_groupname(self, groupname: str) -> list[MClone]: + """Get list of clone objects by group name + + Args: + groupname (str): The group name + + Returns: + list[MClone]: List of clones in the group + """ + group_of_clone: list[MClone] = [] + + if self.group_exists(groupname): + for clone in self.UID_CLONE_DB: + if clone.group.strip().lower() == groupname.strip().lower(): + group_of_clone.append(clone) + + return group_of_clone + def get_uid(self, uidornickname: str) -> Optional[str]: """Get the UID of the clone starting from the UID or the Nickname @@ -125,15 +160,6 @@ class Clone: return None - def get_clone_asdict(self, uidornickname: str) -> Optional[dict[str, Any]]: - - clone_obj = self.get_clone(uidornickname=uidornickname) - - if clone_obj is None: - return None - - return clone_obj.to_dict() - def kill(self, nickname:str) -> bool: response = False diff --git a/mods/clone/mod_clone.py b/mods/clone/mod_clone.py index 0eabb59..d87706e 100644 --- a/mods/clone/mod_clone.py +++ b/mods/clone/mod_clone.py @@ -1,16 +1,16 @@ -from dataclasses import dataclass -import random, faker, time, logging -from typing import TYPE_CHECKING +import time, logging +from typing import TYPE_CHECKING, Optional +from faker import Faker +import mods.clone.utils as utils +import mods.clone.threads as thds +import mods.clone.schemas as schemas +from mods.clone.clone_manager import CloneManager if TYPE_CHECKING: from core.irc import Irc class Clone(): - @dataclass - class ModConfModel: - clone_nicknames: list[str] - def __init__(self, ircInstance: 'Irc') -> None: # Module name (Mandatory) @@ -36,12 +36,29 @@ class Clone(): # Add Channel object to the module (Mandatory) self.Channel = ircInstance.Channel - - # Add clone object to the module (Optionnal) - self.Clone = ircInstance.Clone - + + # Add global definitions self.Definition = ircInstance.Loader.Definition + # The Global Settings + self.Settings = ircInstance.Loader.Settings + + self.Schemas = schemas + + self.Utils = utils + + self.Threads = thds + + self.Faker: Optional['Faker'] = self.Utils.create_faker_object('en_GB') + + self.Clone = CloneManager(self) + + metadata = self.Settings.get_cache('UID_CLONE_DB') + + if metadata is not None: + self.Clone.UID_CLONE_DB = metadata + self.Logs.debug(f"Cache Size = {self.Settings.get_cache_size()}") + # Créer les nouvelles commandes du module self.Irc.build_command(1, self.module_name, 'clone', 'Connect, join, part, kill and say clones') @@ -57,10 +74,6 @@ class Clone(): self.__create_tables() self.stop = False - logging.getLogger('faker').setLevel(logging.CRITICAL) - - self.fakeEN = faker.Faker('en_GB') - self.fakeFR = faker.Faker('fr_FR') # Load module configuration (Mandatory) self.__load_module_configuration() @@ -99,9 +112,7 @@ class Clone(): """ try: # Variable qui va contenir les options de configuration du module Defender - self.ModConfig = self.ModConfModel( - clone_nicknames=[] - ) + self.ModConfig = self.Schemas.ModConfModel() # Sync the configuration with core configuration (Mandatory) # self.Base.db_sync_core_config(self.module_name, self.ModConfig) @@ -115,6 +126,8 @@ class Clone(): """Cette methode sera executée a chaque désactivation ou rechargement de module """ + # Store Clones DB into the global Settings to retrieve it after the reload. + self.Settings.set_cache('UID_CLONE_DB', self.Clone.UID_CLONE_DB) self.Channel.db_query_channel(action='del', module_name=self.module_name, channel_name=self.Config.CLONE_CHANNEL) self.Protocol.send2socket(f":{self.Config.SERVICE_NICKNAME} MODE {self.Config.CLONE_CHANNEL} -nts") @@ -123,175 +136,40 @@ class Clone(): return None - def generate_vhost(self) -> str: - - fake = self.fakeEN - - rand_1 = fake.random_elements(['A','B','C','D','E','F','0','1','2','3','4','5','6','7','8','9'], unique=True, length=8) - rand_2 = fake.random_elements(['A','B','C','D','E','F','0','1','2','3','4','5','6','7','8','9'], unique=True, length=8) - rand_3 = fake.random_elements(['A','B','C','D','E','F','0','1','2','3','4','5','6','7','8','9'], unique=True, length=8) - - vhost = ''.join(rand_1) + '.' + ''.join(rand_2) + '.' + ''.join(rand_3) + '.IP' - return vhost - - def generate_clones(self, group: str = 'Default', auto_remote_ip: bool = False) -> None: + def cmd(self, data:list): try: + if not data or len(data) < 2: + return - fakeEN = self.fakeEN - fakeFR = self.fakeFR - unixtime = self.Base.get_unixtime() + cmd = data.copy() if isinstance(data, list) else list(data).copy() + index, command = self.Irc.Protocol.get_ircd_protocol_poisition(cmd) + if index == -1: + return - chaine = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' - generate_uid = fakeEN.random_sample(chaine, 6) - uid = self.Config.SERVEUR_ID + ''.join(generate_uid) + match command: - umodes = self.Config.CLONE_UMODES - - # Generate Username - chaine = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789' - new_username = fakeEN.random_sample(chaine, 9) - username = ''.join(new_username) - - # Create realname XX F|M Department - gender = fakeEN.random_choices(['F','M'], 1) - gender = ''.join(gender) - - if gender == 'F': - nickname = fakeEN.first_name_female() - elif gender == 'M': - nickname = fakeEN.first_name_male() - else: - nickname = fakeEN.first_name() - - age = random.randint(20, 60) - department = fakeFR.department_name() - realname = f'{age} {gender} {department}' - - decoded_ip = fakeEN.ipv4_private() if auto_remote_ip else '127.0.0.1' - hostname = fakeEN.hostname() - - vhost = self.generate_vhost() - - checkNickname = self.Clone.exists(nickname=nickname) - checkUid = self.Clone.uid_exists(uid=uid) - - while checkNickname: - caracteres = '0123456789' - randomize = ''.join(random.choice(caracteres) for _ in range(2)) - nickname = nickname + str(randomize) - checkNickname = self.Clone.exists(nickname=nickname) - - while checkUid: - chaine = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' - generate_uid = fakeEN.random_sample(chaine, 6) - uid = self.Config.SERVEUR_ID + ''.join(generate_uid) - checkUid = self.Clone.uid_exists(uid=uid) - - clone = self.Definition.MClone( - connected=False, - nickname=nickname, - username=username, - realname=realname, - hostname=hostname, - umodes=umodes, - uid=uid, - remote_ip=decoded_ip, - vhost=vhost, - group=group, - channels=[] - ) - - self.Clone.insert(clone) - - return None - - except AttributeError as ae: - self.Logs.error(f'Attribute Error : {ae}') - except Exception as err: - self.Logs.error(f"General Error: {err}") - - def thread_connect_clones(self, number_of_clones:int , group: str = 'Default', auto_remote_ip: bool = False, interval: float = 0.2) -> None: - - for i in range(0, number_of_clones): - self.generate_clones(group=group, auto_remote_ip=auto_remote_ip) - - for clone in self.Clone.UID_CLONE_DB: - - if self.stop: - print(f"Stop creating clones ...") - self.stop = False - break - - if not clone.connected: - self.Protocol.send_uid(clone.nickname, clone.username, clone.hostname, clone.uid, clone.umodes, clone.vhost, clone.remote_ip, clone.realname, print_log=False) - self.Protocol.send_join_chan(uidornickname=clone.uid, channel=self.Config.CLONE_CHANNEL, password=self.Config.CLONE_CHANNEL_PASSWORD, print_log=False) - - time.sleep(interval) - clone.connected = True - - def thread_kill_clones(self, fromuser: str) -> None: - - clone_to_kill: list[str] = [] - for clone in self.Clone.UID_CLONE_DB: - clone_to_kill.append(clone.uid) - - for clone_uid in clone_to_kill: - self.Protocol.send_quit(clone_uid, 'Gooood bye', print_log=False) - - del clone_to_kill - - return None - - def cmd(self, data:list) -> None: - try: - service_id = self.Config.SERVICE_ID # Defender serveur id - cmd = list(data).copy() - - if len(cmd) < 2: - return None - - match cmd[1]: - - case 'REPUTATION': - pass - - if len(cmd) < 3: - return None - - match cmd[2]: case 'PRIVMSG': - # print(cmd) - uid_sender = self.User.clean_uid(cmd[1]) - senderObj = self.User.get_User(uid_sender) + return self.Utils.handle_on_privmsg(self, cmd) - if senderObj.hostname in self.Config.CLONE_LOG_HOST_EXEMPT: - return None + case 'QUIT': + return - if not senderObj is None: - senderMsg = ' '.join(cmd[4:]) - clone_obj = self.Clone.get_clone(cmd[3]) - - if clone_obj is None: - return None - - if clone_obj.uid != self.Config.SERVICE_ID: - final_message = f"{senderObj.nickname}!{senderObj.username}@{senderObj.hostname} > {senderMsg.lstrip(':')}" - self.Protocol.send_priv_msg( - nick_from=clone_obj.uid, - msg=final_message, - channel=self.Config.CLONE_CHANNEL - ) + case _: + return except Exception as err: - self.Logs.error(f'General Error: {err}') + self.Logs.error(f'General Error: {err}', exc_info=True) - def hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None: + def hcmds(self, user: str, channel: any, cmd: list, fullcmd: list = []) -> None: try: + + if len(cmd) < 1: + return + command = str(cmd[0]).lower() fromuser = user - - dnickname = self.Config.SERVICE_NICKNAME # Defender nickname + dnickname = self.Config.SERVICE_NICKNAME match command: @@ -303,6 +181,7 @@ class Clone(): self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone join [all | nickname] #channel") self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone part [all | nickname] #channel") self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone list") + return None option = str(cmd[1]).lower() @@ -317,8 +196,8 @@ class Clone(): connection_interval = int(cmd[4]) if len(cmd) == 5 else 0.2 self.Base.create_thread( - func=self.thread_connect_clones, - func_args=(number_of_clones, group, False, connection_interval) + func=self.Threads.thread_connect_clones, + func_args=(self, number_of_clones, group, False, connection_interval) ) except Exception as err: @@ -328,18 +207,28 @@ class Clone(): case 'kill': try: - # clone kill [all | nickname] + # clone kill [ALL | group name | nickname] self.stop = True - clone_name = str(cmd[2]) - clone_to_kill: list[str] = [] + option = str(cmd[2]) - if clone_name.lower() == 'all': - self.Base.create_thread(func=self.thread_kill_clones, func_args=(fromuser, )) + if option.lower() == 'all': + self.Base.create_thread(func=self.Threads.thread_kill_clones, func_args=(self, )) + + elif self.Clone.group_exists(option): + list_of_clones_in_group = self.Clone.get_clones_from_groupname(option) + + if len(list_of_clones_in_group) > 0: + self.Logs.debug(f"[Clone Kill Group] - Killing {len(list_of_clones_in_group)} clones in the group {option}") + + for clone in list_of_clones_in_group: + self.Protocol.send_quit(clone.uid, "Now i am leaving irc but i'll come back soon ...", print_log=False) + self.Clone.delete(clone.uid) else: - clone_obj = self.Clone.get_clone(clone_name) + clone_obj = self.Clone.get_clone(option) if not clone_obj is None: self.Protocol.send_quit(clone_obj.uid, 'Goood bye', print_log=False) + self.Clone.delete(clone_obj.uid) except Exception as err: self.Logs.error(f'{err}') @@ -348,19 +237,28 @@ class Clone(): case 'join': try: - # clone join [all | nickname] #channel - clone_name = str(cmd[2]) + # clone join [all | group name | nickname] #channel + option = str(cmd[2]) clone_channel_to_join = str(cmd[3]) - if clone_name.lower() == 'all': + if option.lower() == 'all': for clone in self.Clone.UID_CLONE_DB: self.Protocol.send_join_chan(uidornickname=clone.uid, channel=clone_channel_to_join, print_log=False) + elif self.Clone.group_exists(option): + list_of_clones_in_group = self.Clone.get_clones_from_groupname(option) + + if len(list_of_clones_in_group) > 0: + self.Logs.debug(f"[Clone Join Group] - Joining {len(list_of_clones_in_group)} clones from group {option} in the channel {clone_channel_to_join}") + + for clone in list_of_clones_in_group: + self.Protocol.send_join_chan(uidornickname=clone.nickname, channel=clone_channel_to_join, print_log=False) + else: - if self.Clone.exists(clone_name): - if not self.Clone.get_uid(clone_name) is None: - self.Protocol.send_join_chan(uidornickname=clone_name, channel=clone_channel_to_join, print_log=False) + if self.Clone.nickname_exists(option): + clone_uid = self.Clone.get_clone(option).uid + self.Protocol.send_join_chan(uidornickname=clone_uid, channel=clone_channel_to_join, print_log=False) except Exception as err: self.Logs.error(f'{err}') @@ -369,18 +267,27 @@ class Clone(): case 'part': try: - # clone part [all | nickname] #channel - clone_name = str(cmd[2]) + # clone part [all | nickname] #channel + option = str(cmd[2]) clone_channel_to_part = str(cmd[3]) - if clone_name.lower() == 'all': + if option.lower() == 'all': for clone in self.Clone.UID_CLONE_DB: self.Protocol.send_part_chan(uidornickname=clone.uid, channel=clone_channel_to_part, print_log=False) + elif self.Clone.group_exists(option): + list_of_clones_in_group = self.Clone.get_clones_from_groupname(option) + + if len(list_of_clones_in_group) > 0: + self.Logs.debug(f"[Clone Part Group] - Part {len(list_of_clones_in_group)} clones from group {option} from the channel {clone_channel_to_part}") + + for clone in list_of_clones_in_group: + self.Protocol.send_part_chan(uidornickname=clone.uid, channel=clone_channel_to_part, print_log=False) + else: - if self.Clone.exists(clone_name): - clone_uid = self.Clone.get_uid(clone_name) + if self.Clone.nickname_exists(option): + clone_uid = self.Clone.get_uid(option) if not clone_uid is None: self.Protocol.send_part_chan(uidornickname=clone_uid, channel=clone_channel_to_part, print_log=False) @@ -407,7 +314,7 @@ class Clone(): final_message = ' '.join(cmd[4:]) - if clone_channel is None or not self.Clone.exists(clone_name): + if clone_channel is None or not self.Clone.nickname_exists(clone_name): self.Protocol.send_notice( nick_from=dnickname, nick_to=fromuser, @@ -415,7 +322,7 @@ class Clone(): ) return None - if self.Clone.exists(clone_name): + if self.Clone.nickname_exists(clone_name): self.Protocol.send_priv_msg(nick_from=clone_name, msg=final_message, channel=clone_channel) except Exception as err: @@ -428,12 +335,12 @@ class Clone(): case _: self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone connect NUMBER GROUP_NAME INTERVAL") - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone kill [all | nickname]") - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone join [all | nickname] #channel") - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone part [all | nickname] #channel") + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone kill [all | group name | nickname]") + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone join [all | group name | nickname] #channel") + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone part [all | group name | nickname] #channel") self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone list") except IndexError as ie: self.Logs.error(f'Index Error: {ie}') except Exception as err: - self.Logs.error(f'Index Error: {err}') + self.Logs.error(f'General Error: {err}') diff --git a/mods/clone/schemas.py b/mods/clone/schemas.py new file mode 100644 index 0000000..d45fafe --- /dev/null +++ b/mods/clone/schemas.py @@ -0,0 +1,22 @@ +from core.definition import MainModel, dataclass, field + +@dataclass +class ModConfModel(MainModel): + clone_nicknames: list[str] = field(default_factory=list) + +@dataclass +class MClone(MainModel): + """Model Clone""" + connected: bool = False + uid: str = None + nickname: str = None + username: str = None + realname: str = None + channels: list = field(default_factory=list) + vhost: str = None + hostname: str = 'localhost' + umodes: str = None + remote_ip: str = '127.0.0.1' + group: str = 'Default' + +DB_CLONES: list[MClone] = [] \ No newline at end of file diff --git a/mods/clone/threads.py b/mods/clone/threads.py new file mode 100644 index 0000000..60e40e4 --- /dev/null +++ b/mods/clone/threads.py @@ -0,0 +1,39 @@ +from typing import TYPE_CHECKING +from time import sleep + +if TYPE_CHECKING: + from mods.clone.mod_clone import Clone + +def thread_connect_clones(uplink: 'Clone', + number_of_clones:int , + group: str = 'Default', + auto_remote_ip: bool = False, + interval: float = 0.2 + ): + + for i in range(0, number_of_clones): + uplink.Utils.create_new_clone(uplink, uplink.Faker, group=group, auto_remote_ip=auto_remote_ip) + + for clone in uplink.Clone.UID_CLONE_DB: + + if uplink.stop: + print(f"Stop creating clones ...") + uplink.stop = False + break + + if not clone.connected: + uplink.Protocol.send_uid(clone.nickname, clone.username, clone.hostname, clone.uid, clone.umodes, clone.vhost, clone.remote_ip, clone.realname, print_log=False) + uplink.Protocol.send_join_chan(uidornickname=clone.uid, channel=uplink.Config.CLONE_CHANNEL, password=uplink.Config.CLONE_CHANNEL_PASSWORD, print_log=False) + + sleep(interval) + clone.connected = True + +def thread_kill_clones(uplink: 'Clone'): + + clone_to_kill = uplink.Clone.UID_CLONE_DB.copy() + + for clone in clone_to_kill: + uplink.Protocol.send_quit(clone.uid, 'Gooood bye', print_log=False) + uplink.Clone.delete(clone.uid) + + del clone_to_kill diff --git a/mods/clone/utils.py b/mods/clone/utils.py new file mode 100644 index 0000000..a636b68 --- /dev/null +++ b/mods/clone/utils.py @@ -0,0 +1,198 @@ +import logging +import random +from typing import Optional, TYPE_CHECKING +from faker import Faker + +logging.getLogger('faker').setLevel(logging.CRITICAL) + +if TYPE_CHECKING: + from mods.clone.mod_clone import Clone + +def create_faker_object(faker_local: Optional[str] = 'en_GB') -> Faker: + """Create a new faker object + + Args: + faker_local (Optional[str], optional): _description_. Defaults to 'en_GB'. + + Returns: + Faker: The Faker Object + """ + if faker_local not in ['en_GB', 'fr_FR']: + faker_local = 'en_GB' + + return Faker(faker_local) + +def generate_uid_for_clone(faker_instance: 'Faker', server_id: str) -> str: + chaine = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' + return server_id + ''.join(faker_instance.random_sample(chaine, 6)) + +def generate_vhost_for_clone(faker_instance: 'Faker') -> str: + """Generate new vhost for the clone + + Args: + faker_instance (Faker): The Faker instance + + Returns: + str: _description_ + """ + rand_1 = faker_instance.random_elements(['A','B','C','D','E','F','0','1','2','3','4','5','6','7','8','9'], unique=True, length=8) + rand_2 = faker_instance.random_elements(['A','B','C','D','E','F','0','1','2','3','4','5','6','7','8','9'], unique=True, length=8) + rand_3 = faker_instance.random_elements(['A','B','C','D','E','F','0','1','2','3','4','5','6','7','8','9'], unique=True, length=8) + + vhost = ''.join(rand_1) + '.' + ''.join(rand_2) + '.' + ''.join(rand_3) + '.IP' + return vhost + +def generate_username_for_clone(faker_instance: 'Faker') -> str: + """Generate vhosts for clones + + Returns: + str: The vhost + """ + chaine = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789' + return ''.join(faker_instance.random_sample(chaine, 9)) + +def generate_realname_for_clone(faker_instance: 'Faker') -> tuple[int, str, str]: + """Generate realname for clone + Ex: XX F|M Department + Args: + faker_instance (Faker): _description_ + + Returns: + tuple: Age | Gender | Department + """ + # Create realname XX F|M Department + gender = faker_instance.random_choices(['F','M'], 1) + gender = ''.join(gender) + age = random.randint(20, 60) + if faker_instance.locales[0] == 'fr_FR': + department = faker_instance.department_name() + else: + department = faker_instance.city() + + return (age, gender, department) + +def generate_nickname_for_clone(faker_instance: 'Faker', gender: Optional[str] = 'AUTO') -> str: + """Generate nickname for clone + + Args: + faker_instance (Faker): The Faker Instance + gender (str): The Gender.Default F + + Returns: + str: Nickname Based on the Gender + """ + if gender.upper() == 'AUTO' or gender.upper() not in ['F', 'M']: + # Generate new gender + gender = faker_instance.random_choices(['F','M'], 1) + gender = ''.join(gender) + + if gender.upper() == 'F': + return faker_instance.first_name_female() + elif gender.upper() == 'M': + return faker_instance.first_name_male() + +def generate_ipv4_for_clone(faker_instance: 'Faker', auto: bool = True) -> str: + """Generate remote ipv4 for clone + + Args: + faker_instance (Faker): The Faker Instance + auto (bool): Set auto generation of ip or 127.0.0.1 will be returned + + Returns: + str: Remote IPV4 + """ + return faker_instance.ipv4_private() if auto else '127.0.0.1' + +def generate_hostname_for_clone(faker_instance: 'Faker') -> str: + """Generate hostname for clone + + Args: + faker_instance (Faker): The Faker Instance + + Returns: + str: New hostname + """ + return faker_instance.hostname() + +def create_new_clone(uplink: 'Clone', faker_instance: 'Faker', group: str = 'Default', auto_remote_ip: bool = False) -> bool: + """Create a new Clone object in the DB_CLONES. + + Args: + faker_instance (Faker): The Faker instance + + Returns: + bool: True if it was created + """ + faker = faker_instance + + uid = generate_uid_for_clone(faker, uplink.Config.SERVEUR_ID) + umodes = uplink.Config.CLONE_UMODES + + # Generate Username + username = generate_username_for_clone(faker) + + # Generate realname (XX F|M Department) + age, gender, department = generate_realname_for_clone(faker) + realname = f'{age} {gender} {department}' + + # Generate nickname + nickname = generate_nickname_for_clone(faker, gender) + + # Generate decoded ipv4 and hostname + decoded_ip = generate_ipv4_for_clone(faker, auto_remote_ip) + hostname = generate_hostname_for_clone(faker) + vhost = generate_vhost_for_clone(faker) + + checkNickname = uplink.Clone.nickname_exists(nickname) + checkUid = uplink.Clone.uid_exists(uid=uid) + + while checkNickname: + caracteres = '0123456789' + randomize = ''.join(random.choice(caracteres) for _ in range(2)) + nickname = nickname + str(randomize) + checkNickname = uplink.Clone.nickname_exists(nickname) + + while checkUid: + uid = generate_uid_for_clone(faker, uplink.Config.SERVEUR_ID) + checkUid = uplink.Clone.uid_exists(uid=uid) + + clone = uplink.Definition.MClone( + connected=False, + nickname=nickname, + username=username, + realname=realname, + hostname=hostname, + umodes=umodes, + uid=uid, + remote_ip=decoded_ip, + vhost=vhost, + group=group, + channels=[] + ) + + uplink.Clone.insert(clone) + + return True + +def handle_on_privmsg(uplink: 'Clone', srvmsg: list[str]): + + uid_sender = uplink.User.clean_uid(srvmsg[1]) + senderObj = uplink.User.get_User(uid_sender) + + if senderObj.hostname in uplink.Config.CLONE_LOG_HOST_EXEMPT: + return + + if not senderObj is None: + senderMsg = ' '.join(srvmsg[4:]) + clone_obj = uplink.Clone.get_clone(srvmsg[3]) + + if clone_obj is None: + return + + if clone_obj.uid != uplink.Config.SERVICE_ID: + final_message = f"{senderObj.nickname}!{senderObj.username}@{senderObj.hostname} > {senderMsg.lstrip(':')}" + uplink.Protocol.send_priv_msg( + nick_from=clone_obj.uid, + msg=final_message, + channel=uplink.Config.CLONE_CHANNEL + ) diff --git a/version.json b/version.json index 265b584..a4a7194 100644 --- a/version.json +++ b/version.json @@ -1,5 +1,5 @@ { - "version": "6.1.4", + "version": "6.2.0", "requests": "2.32.3", "psutil": "6.0.0",