Updating mod_clone by adding action on groups. reloading the module is now using Settings.set_cache and get_cache

This commit is contained in:
adator
2025-08-10 02:31:50 +02:00
parent 1686c4a0b5
commit 21a2619f49
13 changed files with 587 additions and 233 deletions

View File

@@ -186,7 +186,7 @@ class Base:
# Sort to reload submodules before parent modules
for name, module in sorted(modules_to_reload, key=lambda x: x[0], reverse=True):
try:
if 'mod_' not in name:
if 'mod_' not in name and 'schemas' not in name:
importlib.reload(module)
self.logs.debug(f'[LOAD_MODULE] Module {module} success')

View File

@@ -1,146 +0,0 @@
from core.definition import MClone
from typing import Any, Optional
from core.base import Base
class Clone:
UID_CLONE_DB: list[MClone] = []
def __init__(self, base: Base):
self.Logs = base.logs
def insert(self, new_clone_object: MClone) -> bool:
"""Create new Clone object
Args:
newCloneObject (CloneModel): New CloneModel object
Returns:
bool: True if inserted
"""
result = False
exist = False
for record in self.UID_CLONE_DB:
if record.nickname == new_clone_object.nickname:
# If the user exist then return False and do not go further
exist = True
self.Logs.warning(f'Nickname {record.nickname} already exist')
return result
if record.uid == new_clone_object.uid:
exist = True
self.Logs.warning(f'UID: {record.uid} already exist')
return result
if not exist:
self.UID_CLONE_DB.append(new_clone_object)
result = True
if not result:
self.Logs.critical(f'The Clone Object was not inserted {new_clone_object}')
return result
def delete(self, uidornickname: str) -> bool:
"""Delete the Clone Object starting from the nickname or the UID
Args:
uidornickname (str): UID or nickname of the clone
Returns:
bool: True if deleted
"""
clone_obj = self.get_clone(uidornickname=uidornickname)
if clone_obj is None:
return False
self.UID_CLONE_DB.remove(clone_obj)
return True
def exists(self, nickname: str) -> bool:
"""Check if the nickname exist
Args:
nickname (str): Nickname of the clone
Returns:
bool: True if the nickname exist
"""
clone = self.get_clone(nickname)
if isinstance(clone, MClone):
return True
return False
def uid_exists(self, uid: str) -> bool:
"""Check if the nickname exist
Args:
uid (str): uid of the clone
Returns:
bool: True if the nickname exist
"""
clone = self.get_clone(uid)
if isinstance(clone, MClone):
return True
return False
def get_clone(self, uidornickname: str) -> Optional[MClone]:
"""Get MClone object or None
Args:
uidornickname (str): The UID or the Nickname
Returns:
Union[MClone, None]: Return MClone object or None
"""
for clone in self.UID_CLONE_DB:
if clone.uid == uidornickname:
return clone
if clone.nickname == uidornickname:
return clone
return None
def get_uid(self, uidornickname: str) -> Optional[str]:
"""Get the UID of the clone starting from the UID or the Nickname
Args:
uidornickname (str): UID or Nickname
Returns:
str|None: Return the UID
"""
for record in self.UID_CLONE_DB:
if record.uid == uidornickname:
return record.uid
if record.nickname == uidornickname:
return record.uid
return None
def get_clone_asdict(self, uidornickname: str) -> Optional[dict[str, Any]]:
clone_obj = self.get_clone(uidornickname=uidornickname)
if clone_obj is None:
return None
return clone_obj.to_dict()
def kill(self, nickname:str) -> bool:
response = False
for clone in self.UID_CLONE_DB:
if clone.nickname == nickname:
clone.alive = False # Kill the clone
response = True
return response

59
core/classes/commands.py Normal file
View File

@@ -0,0 +1,59 @@
from typing import TYPE_CHECKING, Optional
from core.definition import MCommand
if TYPE_CHECKING:
from core.base import Base
class Command:
DB_COMMANDS: list['MCommand'] = []
def __init__(self, base: 'Base'):
self.Base = base
def build(self, new_command_obj: MCommand) -> bool:
command = self.get_command(new_command_obj.command_name, new_command_obj.module_name)
if command is None:
self.DB_COMMANDS.append(new_command_obj)
return True
# Update command if it exist
# Removing the object
if self.drop_command(command.command_name, command.module_name):
# Add the new object
self.DB_COMMANDS.append(new_command_obj)
return True
return False
def get_command(self, command_name: str, module_name: str) -> Optional[MCommand]:
for command in self.DB_COMMANDS:
if command.command_name.lower() == command_name and command.module_name == module_name:
return command
return None
def drop_command(self, command_name: str, module_name: str) -> bool:
cmd = self.get_command(command_name, module_name)
if cmd is not None:
self.DB_COMMANDS.remove(cmd)
return True
return False
def get_ordered_commands(self) -> list[MCommand]:
return sorted(self.DB_COMMANDS, key=lambda c: (c.command_level, c.module_name))
def get_commands_by_level(self, level: int = 0) -> Optional[list[MCommand]]:
cmd_list = self.get_ordered_commands()
new_list: list[MCommand] = []
for cmd in cmd_list:
if cmd.command_level <= level:
new_list.append(cmd)
return new_list

View File

@@ -337,7 +337,6 @@ class Unrealircd6:
def send_quit(self, uid: str, reason: str, print_log: True) -> None:
"""Send quit message
- Delete uid from User object
- Delete uid from Clone object
- Delete uid from Reputation object
Args:
@@ -345,16 +344,12 @@ class Unrealircd6:
reason (str): The reason for the quit
"""
user_obj = self.__Irc.User.get_User(uidornickname=uid)
clone_obj = self.__Irc.Clone.get_clone(uidornickname=uid)
reputationObj = self.__Irc.Reputation.get_Reputation(uidornickname=uid)
if not user_obj is None:
self.send2socket(f":{user_obj.uid} QUIT :{reason}", print_log=print_log)
self.__Irc.User.delete(user_obj.uid)
if not clone_obj is None:
self.__Irc.Clone.delete(clone_obj.uid)
if not reputationObj is None:
self.__Irc.Reputation.delete(reputationObj.uid)
@@ -569,7 +564,6 @@ class Unrealircd6:
self.__Irc.User.delete(uid_who_quit)
self.__Irc.Client.delete(uid_who_quit)
self.__Irc.Reputation.delete(uid_who_quit)
self.__Irc.Clone.delete(uid_who_quit)
return None

View File

@@ -1,7 +1,14 @@
'''This class should never be reloaded.
'''
from threading import Timer, Thread, RLock
from socket import socket
from typing import Any
class Settings:
"""This Class will never be reloaded.
Means that the variables are available during
the whole life of the app
"""
RUNNING_TIMERS: list[Timer] = []
RUNNING_THREADS: list[Thread] = []
@@ -13,3 +20,29 @@ class Settings:
PROTOCTL_USER_MODES: list[str] = []
PROTOCTL_PREFIX: list[str] = []
__CACHE: dict[str, Any] = {}
"""Use set_cache or get_cache instead"""
def set_cache(self, key: str, value_to_cache: Any):
"""When you want to store a variable
Ex.
```python
set_cache('MY_KEY', {'key1': 'value1', 'key2', 'value2'})
```
Args:
key (str): The key you want to add.
value_to_cache (Any): The Value you want to store.
"""
self.__CACHE[key] = value_to_cache
def get_cache(self, key) -> Any:
"""It returns the value associated to the key and finally it removes the entry"""
if self.__CACHE.get(key):
return self.__CACHE.pop(key)
return None
def get_cache_size(self) -> int:
return len(self.__CACHE)

View File

@@ -334,3 +334,10 @@ class MClone(MainModel):
umodes: str = None
remote_ip: str = '127.0.0.1'
group: str = 'Default'
@dataclass
class MCommand(MainModel):
module_name: str = None
command_name: str = None
description: str = None
command_level: int = 0

View File

@@ -8,9 +8,10 @@ import time
import traceback
from ssl import SSLSocket
from datetime import datetime, timedelta
from typing import Union
from typing import Optional, Union
from core.loader import Loader
from core.classes.protocol import Protocol
from core.classes.commands import Command
class Irc:
_instance = None
@@ -67,9 +68,6 @@ class Irc:
# Use Channel Instance
self.Channel = self.Loader.Channel
# Use Clones Instance
self.Clone = self.Loader.Clone
# Use Reputation Instance
self.Reputation = self.Loader.Reputation
@@ -83,7 +81,11 @@ class Irc:
self.first_connexion_ip: str = None
# Define the dict that will contain all loaded modules
self.loaded_classes:dict[str, 'Irc'] = {} # Definir la variable qui contiendra la liste modules chargés
self.loaded_classes:dict[str, 'Irc'] = {}
# Load Commands Utils
self.Commands = self.Loader.Commands
"""Command utils"""
# Global full module commands that contains level, module name, commands and description
self.module_commands: dict[int, dict[str, dict[str, str]]] = {}
@@ -341,9 +343,71 @@ class Irc:
self.module_commands.setdefault(level, {}).setdefault(module_name, {}).update({command_name: command_description})
self.module_commands_list.append(command_name)
# Build Model.
self.Commands.build(self.Loader.Definition.MCommand(module_name, command_name, command_description, level))
return None
def generate_help_menu(self, nickname: str) -> None:
def generate_help_menu(self, nickname: str, module: Optional[str] = None) -> None:
# Check if the nickname is an admin
p = self.Protocol
admin_obj = self.Admin.get_Admin(nickname)
dnickname = self.Config.SERVICE_NICKNAME
color_bold = self.Config.COLORS.bold
color_nogc = self.Config.COLORS.nogc
color_blue = self.Config.COLORS.blue
color_black = self.Config.COLORS.black
color_underline = self.Config.COLORS.underline
current_level = 0
count = 0
if admin_obj is not None:
current_level = admin_obj.level
p.send_notice(nick_from=dnickname,nick_to=nickname, msg=f" ***************** LISTE DES COMMANDES *****************")
header = f" {'Level':<8}| {'Command':<25}| {'Module':<15}| {'Description':<35}"
line = "-"*75
p.send_notice(nick_from=dnickname,nick_to=nickname, msg=header)
p.send_notice(nick_from=dnickname,nick_to=nickname, msg=f" {line}")
for cmd in self.Commands.get_commands_by_level(current_level):
if module is None or cmd.module_name.lower() == module.lower():
p.send_notice(
nick_from=dnickname,
nick_to=nickname,
msg=f" {color_black}{cmd.command_level:<8}{color_nogc}| {cmd.command_name:<25}| {cmd.module_name:<15}| {cmd.description:<35}"
)
return
for level, modules in self.module_commands.items():
if level > current_level:
break
if count > 0:
p.send_notice(nick_from=dnickname, nick_to=nickname, msg=" ")
p.send_notice(
nick_from=dnickname,
nick_to=nickname,
msg=f"{color_blue}{color_bold}Level {level}:{color_nogc}"
)
for module_name, commands in modules.items():
if module is None or module.lower() == module_name.lower():
p.send_notice(
nick_from=dnickname,
nick_to=nickname,
msg=f"{color_black} {color_underline}Module: {module_name}{color_nogc}"
)
for command, description in commands.items():
p.send_notice(nick_from=dnickname, nick_to=nickname, msg=f" {command:<20}: {description}")
count += 1
p.send_notice(nick_from=dnickname,nick_to=nickname,msg=f" ***************** FIN DES COMMANDES *****************")
return None
def generate_help_menu_bakcup(self, nickname: str) -> None:
# Check if the nickname is an admin
admin_obj = self.Admin.get_Admin(nickname)
@@ -578,6 +642,7 @@ class Irc:
channel=self.Config.SERVICE_CHANLOG
)
self.Base.db_delete_module(module_name)
traceback.print_exc()
def unload_module(self, mod_name: str) -> bool:
"""Unload a module
@@ -1387,8 +1452,12 @@ class Irc:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} <account>")
case 'help':
self.generate_help_menu(nickname=fromuser)
# Syntax. !help [module_name]
module_name = str(cmd[1]) if len(cmd) == 2 else None
self.generate_help_menu(nickname=fromuser, module=module_name)
for com in self.Commands.get_ordered_commands():
print(com)
case 'load':
try:

View File

@@ -1,4 +1,4 @@
from core.classes import user, admin, client, channel, clone, reputation, settings
from core.classes import user, admin, client, channel, reputation, settings, commands
import core.definition as df
import core.base as baseModule
import core.classes.config as confModule
@@ -15,7 +15,7 @@ class Loader:
self.BaseModule: baseModule = baseModule
# Load Classes
self.Settings: settings = settings.Settings()
self.Settings: settings.Settings = settings.Settings()
self.Config: df.MConfig = self.ConfModule.Configuration().ConfigObject
@@ -29,6 +29,6 @@ class Loader:
self.Channel: channel.Channel = channel.Channel(self.Base)
self.Clone: clone.Clone = clone.Clone(self.Base)
self.Reputation: reputation.Reputation = reputation.Reputation(self.Base)
self.Commands: commands.Command = commands.Command(self.Base)