From 4c327940ddea1ce45a6fcc3129db74c07188bbd4 Mon Sep 17 00:00:00 2001 From: adator <85586985+adator85@users.noreply.github.com> Date: Tue, 19 Aug 2025 03:06:56 +0200 Subject: [PATCH] Adding new classes, todo: fix jsonrpc module --- core/classes/config.py | 21 +- core/irc.py | 21 +- core/loader.py | 2 +- core/logs.py | 10 +- defender.py | 3 +- mods/command/mod_command.py | 431 +++--------------------------------- mods/command/utils.py | 237 ++++++++++++++++++++ mods/jsonrpc/mod_jsonrpc.py | 96 +++++--- version.json | 2 +- 9 files changed, 373 insertions(+), 450 deletions(-) create mode 100644 mods/command/utils.py diff --git a/core/classes/config.py b/core/classes/config.py index 1958200..ca2f7ce 100644 --- a/core/classes/config.py +++ b/core/classes/config.py @@ -3,13 +3,13 @@ from sys import exit from os import sep from typing import Union from core.definition import MConfig - +from logging import Logger class Configuration: - def __init__(self) -> None: - + def __init__(self, logs: Logger) -> None: + self.Logs = logs self.ConfigObject: MConfig = self.__load_service_configuration() return None @@ -22,18 +22,18 @@ class Configuration: return configuration except FileNotFoundError as fe: - print(f'FileNotFound: {fe}') - print('Configuration file not found please create config/configuration.json') + self.Logs.error(f'FileNotFound: {fe}') + self.Logs.error('Configuration file not found please create config/configuration.json') exit(0) except KeyError as ke: - print(f'Key Error: {ke}') - print('The key must be defined in core/configuration.json') + self.Logs.error(f'Key Error: {ke}') + self.Logs.error('The key must be defined in core/configuration.json') def __load_service_configuration(self) -> MConfig: try: import_config = self.__load_json_service_configuration() - Model_keys = MConfig().__dict__ + Model_keys = MConfig().to_dict() model_key_list: list = [] json_config_key_list: list = [] @@ -46,12 +46,13 @@ class Configuration: for json_conf in json_config_key_list: if not json_conf in model_key_list: import_config.pop(json_conf, None) - print(f"\!/ The key {json_conf} is not expected, it has been removed from the system ! please remove it from configuration.json file \!/") + self.Logs.warning(f"[!] The key {json_conf} is not expected, it has been removed from the system ! please remove it from configuration.json file [!]") ConfigObject: MConfig = MConfig( **import_config ) return ConfigObject + except TypeError as te: - print(te) \ No newline at end of file + self.Logs.error(te) \ No newline at end of file diff --git a/core/irc.py b/core/irc.py index 6995e25..2ab5a67 100644 --- a/core/irc.py +++ b/core/irc.py @@ -233,9 +233,10 @@ class Irc: self.Logs.warning('--* Waiting for socket to close ...') # Reload configuration + self.Loader.Logs = self.Loader.LoggingModule.ServiceLogging().get_logger() self.Logs.debug('Reloading configuration') - self.Config = self.Loader.ConfModule.Configuration().ConfigObject - self.Base = self.Loader.BaseModule.Base(self.Config, self.Settings) + self.Config = self.Loader.ConfModule.Configuration(self.Logs).ConfigObject + self.Base = self.Loader.BaseModule.Base(self.Loader) self.Protocol = Protocol(self.Config.SERVEUR_PROTOCOL, ircInstance).Protocol self.init_service_user() @@ -1488,18 +1489,23 @@ class Irc: self.User.UID_DB.clear() # Clear User Object self.Channel.UID_CHANNEL_DB.clear() # Clear Channel Object - self.Base.delete_logger(self.Config.LOGGING_NAME) + self.Base.garbage_collector_thread() self.Protocol.send_squit(server_id=self.Config.SERVEUR_ID, server_link=self.Config.SERVEUR_LINK, reason=final_reason) self.Logs.info(f'Redémarrage du server {dnickname}') self.loaded_classes.clear() self.Config.DEFENDER_RESTART = 1 # Set restart status to 1 saying that the service will restart self.Config.DEFENDER_INIT = 1 # set init to 1 saying that the service will be re initiated + self.Loader.ServiceLogging.remove_logger() case 'rehash': + self.Loader.ServiceLogging.remove_logger() + self.Loader.ServiceLogging = self.Loader.LoggingModule.ServiceLogging() + self.Loader.Logs = self.Loader.ServiceLogging.get_logger() + need_a_restart = ["SERVEUR_ID"] restart_flag = False - Config_bakcup = self.Config.__dict__.copy() + Config_bakcup = self.Config.to_dict().copy() serveur_id = self.Config.SERVEUR_ID service_nickname = self.Config.SERVICE_NICKNAME hsid = self.Config.HSID @@ -1519,7 +1525,7 @@ class Irc: importlib.reload(mod_definition) importlib.reload(mod_config) - self.Config = self.Loader.ConfModule.Configuration().ConfigObject + self.Config = self.Loader.ConfModule.Configuration(self.Loader.Logs).ConfigObject self.Config.HSID = hsid self.Config.DEFENDER_INIT = defender_init self.Config.DEFENDER_RESTART = defender_restart @@ -1529,7 +1535,7 @@ class Irc: importlib.reload(mod_base) conf_bkp_dict: dict = Config_bakcup - config_dict: dict = self.Config.__dict__ + config_dict: dict = self.Config.to_dict() for key, value in conf_bkp_dict.items(): if config_dict[key] != value and key != 'COLORS': @@ -1548,9 +1554,6 @@ class Irc: self.Config.SERVEUR_ID = serveur_id self.Protocol.send_priv_msg(nick_from=self.Config.SERVICE_NICKNAME, msg='You need to restart defender !', channel=self.Config.SERVICE_CHANLOG) - self.Loader.ServiceLogging.remove_logger() - self.Loader.ServiceLogging = self.Loader.LoggingModule.ServiceLogging() - self.Loader.Logs = self.Loader.ServiceLogging.get_logger() self.Base = self.Loader.BaseModule.Base(self.Loader) importlib.reload(mod_unreal6) diff --git a/core/loader.py b/core/loader.py index f502417..6a75451 100644 --- a/core/loader.py +++ b/core/loader.py @@ -28,7 +28,7 @@ class Loader: self.Settings: settings.Settings = settings.Settings() - self.Config: df.MConfig = self.ConfModule.Configuration().ConfigObject + self.Config: df.MConfig = self.ConfModule.Configuration(self.Logs).ConfigObject self.Base: base_module.Base = self.BaseModule.Base(self) diff --git a/core/logs.py b/core/logs.py index b100bdc..bef6ab2 100644 --- a/core/logs.py +++ b/core/logs.py @@ -1,5 +1,6 @@ import logging from os import path, makedirs, sep +from typing import Optional class ServiceLogging: @@ -25,10 +26,13 @@ class ServiceLogging: return logs_obj - def remove_logger(self) -> None: + def remove_logger(self, logger_name: Optional[str] = None) -> None: + + if logger_name is None: + logger_name = self.LOGGING_NAME # Récupérer le logger - logger = logging.getLogger(self.LOGGING_NAME) + logger = logging.getLogger(logger_name) # Retirer tous les gestionnaires du logger et les fermer for handler in logger.handlers[:]: # Utiliser une copie de la liste @@ -37,7 +41,7 @@ class ServiceLogging: handler.close() # Supprimer le logger du dictionnaire global - logging.Logger.manager.loggerDict.pop(self.LOGGING_NAME, None) + logging.Logger.manager.loggerDict.pop(logger_name, None) return None diff --git a/defender.py b/defender.py index cbe0869..9e5e200 100644 --- a/defender.py +++ b/defender.py @@ -1,7 +1,7 @@ from core import installation ############################################# -# @Version : 6 # +# @Version : 6.2 # # Requierements : # # Python3.10 or higher # # SQLAlchemy, requests, psutil # @@ -14,7 +14,6 @@ try: from core.loader import Loader from core.irc import Irc - # loader = Loader() ircInstance = Irc(Loader()) ircInstance.init_irc(ircInstance) diff --git a/mods/command/mod_command.py b/mods/command/mod_command.py index 06ff30e..75d77c6 100644 --- a/mods/command/mod_command.py +++ b/mods/command/mod_command.py @@ -1,5 +1,6 @@ from typing import Optional, TYPE_CHECKING from dataclasses import dataclass +import mods.command.utils as utils if TYPE_CHECKING: from core.irc import Irc @@ -49,6 +50,9 @@ class Command: # Add Channel object to the module (Mandatory) self.Channel = ircInstance.Channel + # Module Utils + self.mod_utils = utils + self.Irc.build_command(1, self.module_name, 'join', 'Join a channel') self.Irc.build_command(1, self.module_name, 'assign', 'Assign a user to a role or task') self.Irc.build_command(1, self.module_name, 'part', 'Leave a channel') @@ -312,94 +316,8 @@ class Command: match command: case 'automode': - # automode set nickname [+/-mode] #channel - # automode set adator +o #channel try: - option: str = str(cmd[1]).lower() - allowed_modes: list[str] = self.Loader.Settings.PROTOCTL_PREFIX # ['q','a','o','h','v'] - - match option: - case 'set': - - if len(cmd) < 5: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} [nickname] [+/-mode] [#channel]") - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"AutoModes available: {' / '.join(allowed_modes)}") - return None - - # userObj: MUser = self.User.get_User(str(cmd[2])) - nickname = str(cmd[2]) - mode = str(cmd[3]) - chan: str = str(cmd[4]).lower() if self.Channel.is_valid_channel(cmd[4]) else None - sign = mode[0] if mode.startswith( ('+', '-')) else None - clean_mode = mode[1:] if len(mode) > 0 else None - - if sign is None: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="You must provide the flag mode + or -") - return None - - if clean_mode not in allowed_modes: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You should use one of those modes {' / '.join(allowed_modes)}") - return None - - if chan is None: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You should use one of those modes {' / '.join(allowed_modes)}") - return None - - db_data: dict[str, str] = {"nickname": nickname, "channel": chan} - db_query = self.Base.db_execute_query(query="SELECT id FROM command_automode WHERE nickname = :nickname and channel = :channel", params=db_data) - db_result = db_query.fetchone() - - if db_result is not None: - if sign == '+': - db_data = {"updated_on": self.MainUtils.get_sdatetime(), "nickname": nickname, "channel": chan, "mode": mode} - db_result = self.Base.db_execute_query(query="UPDATE command_automode SET mode = :mode, updated_on = :updated_on WHERE nickname = :nickname and channel = :channel", - params=db_data) - if db_result.rowcount > 0: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Automode {mode} edited for {nickname} in {chan}") - elif sign == '-': - db_data = {"nickname": nickname, "channel": chan, "mode": f"+{clean_mode}"} - db_result = self.Base.db_execute_query(query="DELETE FROM command_automode WHERE nickname = :nickname and channel = :channel and mode = :mode", - params=db_data) - if db_result.rowcount > 0: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Automode {mode} deleted for {nickname} in {chan}") - else: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"The mode [{mode}] has not been found for {nickname} in channel {chan}") - - return None - - # Instert a new automode - if sign == '+': - db_data = {"created_on": self.MainUtils.get_sdatetime(), "updated_on": self.MainUtils.get_sdatetime(), "nickname": nickname, "channel": chan, "mode": mode} - db_query = self.Base.db_execute_query( - query="INSERT INTO command_automode (created_on, updated_on, nickname, channel, mode) VALUES (:created_on, :updated_on, :nickname, :channel, :mode)", - params=db_data - ) - - if db_query.rowcount > 0: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Automode {mode} applied to {nickname} in {chan}") - if self.Channel.is_user_present_in_channel(chan, self.User.get_uid(nickname)): - self.Protocol.send2socket(f":{service_id} MODE {chan} {mode} {nickname}") - else: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"AUTOMODE {mode} cannot be added to {nickname} in {chan} because it doesn't exist") - - case 'list': - db_query: CursorResult = self.Base.db_execute_query("SELECT nickname, channel, mode FROM command_automode") - db_results: Sequence[Row] = db_query.fetchall() - - if not db_results: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, - msg="There is no automode to display.") - - for db_result in db_results: - db_nickname, db_channel, db_mode = db_result - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, - msg=f"Nickname: {db_nickname} | Channel: {db_channel} | Mode: {db_mode}") - - case _: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} SET [nickname] [+/-mode] [#channel]") - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} LIST") - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"[AUTOMODES AVAILABLE] are {' / '.join(allowed_modes)}") - + self.mod_utils.set_automode(self, cmd, fromuser) except IndexError: self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} SET [nickname] [+/-mode] [#channel]") self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} LIST") @@ -409,196 +327,65 @@ class Command: case 'deopall': try: - self.Protocol.send2socket(f":{service_id} SVSMODE {fromchannel} -o") - - except IndexError as ie: - self.Logs.warning(f'_hcmd OP: {str(ie)}') + self.mod_utils.set_deopall(self, fromchannel) except Exception as err: - self.Logs.warning(f'Unknown Error: {str(err)}') + self.Logs.error(f'Unknown Error: {str(err)}') case 'devoiceall': try: - self.Protocol.send2socket(f":{service_id} SVSMODE {fromchannel} -v") - - except IndexError as e: - self.Logs.warning(f'_hcmd OP: {str(e)}') + self.mod_utils.set_devoiceall(self, fromchannel) except Exception as err: - self.Logs.warning(f'Unknown Error: {str(err)}') + self.Logs.error(f'Unknown Error: {str(err)}') case 'voiceall': try: - chan_info = self.Channel.get_channel(fromchannel) - set_mode = 'v' - mode:str = '' - users:str = '' - uids_split = [chan_info.uids[i:i + 6] for i in range(0, len(chan_info.uids), 6)] - - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} +{set_mode} {dnickname}") - for uid in uids_split: - for i in range(0, len(uid)): - mode += set_mode - users += f'{self.User.get_nickname(self.MainUtils.clean_uid(uid[i]))} ' - if i == len(uid) - 1: - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} +{mode} {users}") - mode = '' - users = '' - except IndexError as e: - self.Logs.warning(f'_hcmd OP: {str(e)}') + self.mod_utils.set_mode_to_all(self, fromchannel, '+', 'v') except Exception as err: self.Logs.warning(f'Unknown Error: {str(err)}') case 'opall': try: - chan_info = self.Channel.get_channel(fromchannel) - set_mode = 'o' - mode:str = '' - users:str = '' - uids_split = [chan_info.uids[i:i + 6] for i in range(0, len(chan_info.uids), 6)] - - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} +{set_mode} {dnickname}") - for uid in uids_split: - for i in range(0, len(uid)): - mode += set_mode - users += f'{self.User.get_nickname(self.MainUtils.clean_uid(uid[i]))} ' - if i == len(uid) - 1: - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} +{mode} {users}") - mode = '' - users = '' - except IndexError as e: - self.Logs.warning(f'_hcmd OP: {str(e)}') + self.mod_utils.set_mode_to_all(self, fromchannel, '+', 'o') except Exception as err: self.Logs.warning(f'Unknown Error: {str(err)}') case 'op': - # /mode #channel +o user - # .op #channel user - # /msg dnickname op #channel user - # [':adator', 'PRIVMSG', '#services', ':.o', '#services', 'dktmb'] try: - if fromchannel is None: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} op [#SALON] [NICKNAME]") - return False - - if len(cmd) == 1: - self.Protocol.send2socket(f":{dnickname} MODE {fromchannel} +o {fromuser}") - return True - - # deop nickname - if len(cmd) == 2: - nickname = cmd[1] - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} +o {nickname}") - return True - - nickname = cmd[2] - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} +o {nickname}") - + self.mod_utils.set_operation(self, cmd, fromchannel, fromuser, '+o') except IndexError as e: - self.Logs.warning(f'_hcmd OP: {str(e)}') self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} op [#SALON] [NICKNAME]") except Exception as err: self.Logs.warning(f'Unknown Error: {str(err)}') case 'deop': - # /mode #channel -o user - # .deop #channel user try: - if fromchannel is None: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} deop [#SALON] [NICKNAME]") - return False - - if len(cmd) == 1: - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} -o {fromuser}") - return True - - # deop nickname - if len(cmd) == 2: - nickname = cmd[1] - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} -o {nickname}") - return True - - nickname = cmd[2] - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} -o {nickname}") - + self.mod_utils.set_operation(self, cmd, fromchannel, fromuser, '-o') except IndexError as e: - self.Logs.warning(f'_hcmd DEOP: {str(e)}') self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} deop [#SALON] [NICKNAME]") except Exception as err: self.Logs.warning(f'Unknown Error: {str(err)}') case 'owner': - # /mode #channel +q user - # .owner #channel user try: - if fromchannel is None: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} owner [#SALON] [NICKNAME]") - return False - - if len(cmd) == 1: - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} +q {fromuser}") - return True - - # owner nickname - if len(cmd) == 2: - nickname = cmd[1] - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} +q {nickname}") - return True - - nickname = cmd[2] - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} +q {nickname}") + self.mod_utils.set_operation(self, cmd, fromchannel, fromuser, '+q') except IndexError as e: - self.Logs.warning(f'_hcmd OWNER: {str(e)}') self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} owner [#SALON] [NICKNAME]") except Exception as err: self.Logs.warning(f'Unknown Error: {str(err)}') case 'deowner': - # /mode #channel -q user - # .deowner #channel user try: - if fromchannel is None: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} deowner [#SALON] [NICKNAME]") - return False - - if len(cmd) == 1: - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} -q {fromuser}") - return True - - # deowner nickname - if len(cmd) == 2: - nickname = cmd[1] - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} -q {nickname}") - return True - - nickname = cmd[2] - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} -q {nickname}") + self.mod_utils.set_operation(self, cmd, fromchannel, fromuser, '-q') except IndexError as e: - self.Logs.warning(f'_hcmd DEOWNER: {str(e)}') self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} deowner [#SALON] [NICKNAME]") except Exception as err: self.Logs.warning(f'Unknown Error: {str(err)}') case 'protect': - # /mode #channel +a user - # .protect #channel user try: - if fromchannel is None: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} {command.upper()} [#SALON] [NICKNAME]") - return False - - if len(cmd) == 1: - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} +a {fromuser}") - return True - - # deowner nickname - if len(cmd) == 2: - nickname = cmd[1] - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} +a {nickname}") - return True - - nickname = cmd[2] - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} +a {nickname}") + self.mod_utils.set_operation(self, cmd, fromchannel, fromuser, '+a') except IndexError as e: self.Logs.warning(f'_hcmd DEOWNER: {str(e)}') @@ -607,25 +394,8 @@ class Command: self.Logs.warning(f'Unknown Error: {str(err)}') case 'deprotect': - # /mode #channel -a user - # .deprotect #channel user try: - if fromchannel is None: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} {command.upper()} [#SALON] [NICKNAME]") - return False - - if len(cmd) == 1: - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} -a {fromuser}") - return True - - # deowner nickname - if len(cmd) == 2: - nickname = cmd[1] - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} -a {nickname}") - return True - - nickname = cmd[2] - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} -a {nickname}") + self.mod_utils.set_operation(self, cmd, fromchannel, fromuser, '-a') except IndexError as e: self.Logs.warning(f'_hcmd DEOWNER: {str(e)}') @@ -634,125 +404,42 @@ class Command: self.Logs.warning(f'Unknown Error: {str(err)}') case 'halfop': - # /mode #channel +h user - # .halfop #channel user try: - if fromchannel is None: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} halfop [#SALON] [NICKNAME]") - return False - - if len(cmd) == 1: - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} +h {fromuser}") - return True - - # deop nickname - if len(cmd) == 2: - nickname = cmd[1] - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} +h {nickname}") - return True - - nickname = cmd[2] - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} +h {nickname}") + self.mod_utils.set_operation(self, cmd, fromchannel, fromuser, '+h') except IndexError as e: - self.Logs.warning(f'_hcmd halfop: {str(e)}') - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} halfop [#SALON] [NICKNAME]") + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} {command} [#SALON] [NICKNAME]") except Exception as err: self.Logs.warning(f'Unknown Error: {str(err)}') case 'dehalfop': - # /mode #channel -h user - # .dehalfop #channel user try: - if fromchannel is None: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} dehalfop [#SALON] [NICKNAME]") - return False - - if len(cmd) == 1: - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} -h {fromuser}") - return True - - # dehalfop nickname - if len(cmd) == 2: - nickname = cmd[1] - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} -h {nickname}") - return True - - nickname = cmd[2] - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} -h {nickname}") + self.mod_utils.set_operation(self, cmd, fromchannel, fromuser, '-h') except IndexError as e: - self.Logs.warning(f'_hcmd DEHALFOP: {str(e)}') self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} dehalfop [#SALON] [NICKNAME]") except Exception as err: self.Logs.warning(f'Unknown Error: {str(err)}') case 'voice': - # /mode #channel +v user - # .voice #channel user try: - if fromchannel is None: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} voice [#SALON] [NICKNAME]") - return False - - if len(cmd) == 1: - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} +v {fromuser}") - return True - - # voice nickname - if len(cmd) == 2: - nickname = cmd[1] - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} +v {nickname}") - return True - - nickname = cmd[2] - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} +v {nickname}") - + self.mod_utils.set_operation(self, cmd, fromchannel, fromuser, '+v') except IndexError as e: - self.Logs.warning(f'_hcmd VOICE: {str(e)}') self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} voice [#SALON] [NICKNAME]") except Exception as err: self.Logs.warning(f'Unknown Error: {str(err)}') case 'devoice': - # /mode #channel -v user - # .devoice #channel user try: - if fromchannel is None: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} devoice [#SALON] [NICKNAME]") - return False - - if len(cmd) == 1: - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} -v {fromuser}") - return True - - # dehalfop nickname - if len(cmd) == 2: - nickname = cmd[1] - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} -v {nickname}") - return True - - nickname = cmd[2] - self.Protocol.send2socket(f":{service_id} MODE {fromchannel} -v {nickname}") - + self.mod_utils.set_operation(self, cmd, fromchannel, fromuser, '-v') except IndexError as e: - self.Logs.warning(f'_hcmd DEVOICE: {str(e)}') self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} devoice [#SALON] [NICKNAME]") except Exception as err: self.Logs.warning(f'Unknown Error: {str(err)}') case 'ban': - # .ban #channel nickname try: - sentchannel = str(cmd[1]) if self.Channel.is_valid_channel(cmd[1]) else None - if sentchannel is None: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} {command.upper()} [#SALON] [NICKNAME]") - return False - - nickname = cmd[2] - - self.Protocol.send2socket(f":{service_id} MODE {sentchannel} +b {nickname}!*@*") - self.Logs.debug(f'{fromuser} has banned {nickname} from {sentchannel}') + self.mod_utils.set_ban(self, cmd, '+', fromuser) except IndexError as e: self.Logs.warning(f'_hcmd BAN: {str(e)}') self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} {command.upper()} [#SALON] [NICKNAME]") @@ -760,97 +447,43 @@ class Command: self.Logs.warning(f'Unknown Error: {str(err)}') case 'unban': - # .unban #channel nickname try: - sentchannel = str(cmd[1]) if self.Channel.is_valid_channel(cmd[1]) else None - if sentchannel is None: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} ban [#SALON] [NICKNAME]") - return False - nickname = cmd[2] - - self.Protocol.send2socket(f":{service_id} MODE {sentchannel} -b {nickname}!*@*") - self.Logs.debug(f'{fromuser} has unbanned {nickname} from {sentchannel}') - + self.mod_utils.set_ban(self, cmd, '-', fromuser) except IndexError as e: self.Logs.warning(f'_hcmd UNBAN: {str(e)}') - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} unban [#SALON] [NICKNAME]") + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} {command.upper()} [#SALON] [NICKNAME]") except Exception as err: self.Logs.warning(f'Unknown Error: {str(err)}') case 'kick': - # .kick #channel nickname reason try: - sentchannel = str(cmd[1]) if self.Channel.is_valid_channel(cmd[1]) else None - if sentchannel is None: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} ban [#SALON] [NICKNAME]") - return False - nickname = cmd[2] - final_reason = ' '.join(cmd[3:]) - - self.Protocol.send2socket(f":{service_id} KICK {sentchannel} {nickname} {final_reason}") - self.Logs.debug(f'{fromuser} has kicked {nickname} from {sentchannel} : {final_reason}') - + self.mod_utils.set_kick(self, cmd, fromuser) except IndexError as e: self.Logs.warning(f'_hcmd KICK: {str(e)}') - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} kick [#SALON] [NICKNAME] [REASON]") + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} {command.upper()} [#SALON] [NICKNAME] [REASON]") except Exception as err: self.Logs.warning(f'Unknown Error: {str(err)}') case 'kickban': - # .kickban #channel nickname reason try: - sentchannel = str(cmd[1]) if self.Channel.is_valid_channel(cmd[1]) else None - if sentchannel is None: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} ban [#SALON] [NICKNAME]") - return False - nickname = cmd[2] - final_reason = ' '.join(cmd[3:]) - - self.Protocol.send2socket(f":{service_id} KICK {sentchannel} {nickname} {final_reason}") - self.Protocol.send2socket(f":{service_id} MODE {sentchannel} +b {nickname}!*@*") - self.Logs.debug(f'{fromuser} has kicked and banned {nickname} from {sentchannel} : {final_reason}') - + self.mod_utils.set_kickban(self, cmd, fromuser) except IndexError as e: self.Logs.warning(f'_hcmd KICKBAN: {str(e)}') - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} kickban [#SALON] [NICKNAME] [REASON]") + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} {command.upper()} [#SALON] [NICKNAME] [REASON]") except Exception as err: self.Logs.warning(f'Unknown Error: {str(err)}') case 'join' | 'assign': - try: - sent_channel = str(cmd[1]) if self.Channel.is_valid_channel(cmd[1]) else None - if sent_channel is None: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"{self.Config.SERVICE_PREFIX}JOIN #channel") - return False - - # self.Protocol.send2socket(f':{service_id} JOIN {sent_channel}') - self.Protocol.send_join_chan(uidornickname=dnickname,channel=sent_channel) - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" {dnickname} JOINED {sent_channel}") - self.Channel.db_query_channel('add', self.module_name, sent_channel) - + self.mod_utils.set_assign_channel_to_service(self, cmd, fromuser) except IndexError as ie: - self.Logs.error(f'{ie}') + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} {command.upper()} [#SALON]") except Exception as err: self.Logs.warning(f'Unknown Error: {str(err)}') case 'part' | 'unassign': - try: - sent_channel = str(cmd[1]) if self.Channel.is_valid_channel(cmd[1]) else None - if sent_channel is None: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"{self.Config.SERVICE_PREFIX}PART #channel") - return False - - if sent_channel == dchanlog: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" {dnickname} CAN'T LEFT {sent_channel} AS IT IS LOG CHANNEL") - return False - - self.Protocol.send_part_chan(uidornickname=dnickname, channel=sent_channel) - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" {dnickname} LEFT {sent_channel}") - - self.Channel.db_query_channel('del', self.module_name, sent_channel) - + self.mod_utils.set_unassign_channel_to_service(self, cmd, fromuser) except IndexError as ie: self.Logs.error(f'{ie}') except Exception as err: diff --git a/mods/command/utils.py b/mods/command/utils.py new file mode 100644 index 0000000..c01e70b --- /dev/null +++ b/mods/command/utils.py @@ -0,0 +1,237 @@ +from typing import TYPE_CHECKING, Literal, Optional + +if TYPE_CHECKING: + from mods.command.mod_command import Command + + +def set_automode(uplink: 'Command', cmd: list[str], client: str) -> None: + + command: str = str(cmd[0]).lower() + option: str = str(cmd[1]).lower() + allowed_modes: list[str] = uplink.Loader.Settings.PROTOCTL_PREFIX # ['q','a','o','h','v'] + dnickname = uplink.Config.SERVICE_NICKNAME + service_id = uplink.Config.SERVICE_ID + fromuser = client + + match option: + case 'set': + if len(cmd) < 5: + uplink.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} [nickname] [+/-mode] [#channel]") + uplink.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"AutoModes available: {' / '.join(allowed_modes)}") + return None + + nickname = str(cmd[2]) + mode = str(cmd[3]) + chan: str = str(cmd[4]).lower() if uplink.Channel.is_valid_channel(cmd[4]) else None + sign = mode[0] if mode.startswith( ('+', '-')) else None + clean_mode = mode[1:] if len(mode) > 0 else None + + if sign is None: + uplink.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="You must provide the flag mode + or -") + return None + + if clean_mode not in allowed_modes: + uplink.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You should use one of those modes {' / '.join(allowed_modes)}") + return None + + if chan is None: + uplink.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You should use one of those modes {' / '.join(allowed_modes)}") + return None + + db_data: dict[str, str] = {"nickname": nickname, "channel": chan} + db_query = uplink.Base.db_execute_query(query="SELECT id FROM command_automode WHERE nickname = :nickname and channel = :channel", params=db_data) + db_result = db_query.fetchone() + + if db_result is not None: + if sign == '+': + db_data = {"updated_on": uplink.MainUtils.get_sdatetime(), "nickname": nickname, "channel": chan, "mode": mode} + db_result = uplink.Base.db_execute_query(query="UPDATE command_automode SET mode = :mode, updated_on = :updated_on WHERE nickname = :nickname and channel = :channel", + params=db_data) + if db_result.rowcount > 0: + uplink.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Automode {mode} edited for {nickname} in {chan}") + elif sign == '-': + db_data = {"nickname": nickname, "channel": chan, "mode": f"+{clean_mode}"} + db_result = uplink.Base.db_execute_query(query="DELETE FROM command_automode WHERE nickname = :nickname and channel = :channel and mode = :mode", + params=db_data) + if db_result.rowcount > 0: + uplink.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Automode {mode} deleted for {nickname} in {chan}") + else: + uplink.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"The mode [{mode}] has not been found for {nickname} in channel {chan}") + + return None + + # Instert a new automode + if sign == '+': + db_data = {"created_on": uplink.MainUtils.get_sdatetime(), "updated_on": uplink.MainUtils.get_sdatetime(), "nickname": nickname, "channel": chan, "mode": mode} + db_query = uplink.Base.db_execute_query( + query="INSERT INTO command_automode (created_on, updated_on, nickname, channel, mode) VALUES (:created_on, :updated_on, :nickname, :channel, :mode)", + params=db_data + ) + + if db_query.rowcount > 0: + uplink.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Automode {mode} applied to {nickname} in {chan}") + if uplink.Channel.is_user_present_in_channel(chan, uplink.User.get_uid(nickname)): + uplink.Protocol.send2socket(f":{service_id} MODE {chan} {mode} {nickname}") + else: + uplink.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"AUTOMODE {mode} cannot be added to {nickname} in {chan} because it doesn't exist") + + case 'list': + db_query = uplink.Base.db_execute_query("SELECT nickname, channel, mode FROM command_automode") + db_results = db_query.fetchall() + + if not db_results: + uplink.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, + msg="There is no automode to display.") + + for db_result in db_results: + db_nickname, db_channel, db_mode = db_result + uplink.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, + msg=f"Nickname: {db_nickname} | Channel: {db_channel} | Mode: {db_mode}") + + case _: + uplink.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} SET [nickname] [+/-mode] [#channel]") + uplink.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} LIST") + uplink.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"[AUTOMODES AVAILABLE] are {' / '.join(allowed_modes)}") + +def set_deopall(uplink: 'Command', channel_name: str) -> None: + + service_id = uplink.Config.SERVICE_ID + uplink.Protocol.send2socket(f":{service_id} SVSMODE {channel_name} -o") + return None + +def set_devoiceall(uplink: 'Command', channel_name: str) -> None: + + service_id = uplink.Config.SERVICE_ID + uplink.Protocol.send2socket(f":{service_id} SVSMODE {channel_name} -v") + return None + +def set_mode_to_all(uplink: 'Command', channel_name: str, action: Literal['+', '-'], pmode: str) -> None: + + chan_info = uplink.Channel.get_channel(channel_name) + service_id = uplink.Config.SERVICE_ID + dnickname = uplink.Config.SERVICE_NICKNAME + set_mode = pmode + mode:str = '' + users:str = '' + uids_split = [chan_info.uids[i:i + 6] for i in range(0, len(chan_info.uids), 6)] + + uplink.Protocol.send2socket(f":{service_id} MODE {channel_name} {action}{set_mode} {dnickname}") + for uid in uids_split: + for i in range(0, len(uid)): + mode += set_mode + users += f'{uplink.User.get_nickname(uplink.MainUtils.clean_uid(uid[i]))} ' + if i == len(uid) - 1: + uplink.Protocol.send2socket(f":{service_id} MODE {channel_name} {action}{mode} {users}") + mode = '' + users = '' + +def set_operation(uplink: 'Command', cmd: list[str], channel_name: Optional[str], client: str, mode: str) -> None: + + dnickname = uplink.Config.SERVICE_NICKNAME + service_id = uplink.Config.SERVICE_ID + if channel_name is None: + uplink.Protocol.send_notice(nick_from=dnickname, nick_to=client, msg=f" Right command : /msg {dnickname} {mode} [#SALON] [NICKNAME]") + return False + + if len(cmd) == 1: + uplink.Protocol.send2socket(f":{dnickname} MODE {channel_name} {mode} {client}") + return None + + # deop nickname + if len(cmd) == 2: + nickname = cmd[1] + uplink.Protocol.send2socket(f":{service_id} MODE {channel_name} {mode} {nickname}") + return None + + nickname = cmd[2] + uplink.Protocol.send2socket(f":{service_id} MODE {channel_name} {mode} {nickname}") + return None + +def set_ban(uplink: 'Command', cmd: list[str], action: Literal['+', '-'], client: str) -> None: + + command = str(cmd[0]) + dnickname = uplink.Config.SERVICE_NICKNAME + service_id = uplink.Config.SERVICE_ID + sentchannel = str(cmd[1]) if uplink.Channel.is_valid_channel(cmd[1]) else None + + if sentchannel is None: + uplink.Protocol.send_notice(nick_from=dnickname, nick_to=client, msg=f" Right command : /msg {dnickname} {command.upper()} [#SALON] [NICKNAME]") + return None + + nickname = cmd[2] + + uplink.Protocol.send2socket(f":{service_id} MODE {sentchannel} {action}b {nickname}!*@*") + uplink.Logs.debug(f'{client} has banned {nickname} from {sentchannel}') + return None + +def set_kick(uplink: 'Command', cmd: list[str], client: str) -> None: + + command = str(cmd[0]) + dnickname = uplink.Config.SERVICE_NICKNAME + service_id = uplink.Config.SERVICE_ID + + sentchannel = str(cmd[1]) if uplink.Channel.is_valid_channel(cmd[1]) else None + if sentchannel is None: + uplink.Protocol.send_notice(nick_from=dnickname, nick_to=client, msg=f" Right command : /msg {dnickname} {command} [#SALON] [NICKNAME]") + return False + + nickname = cmd[2] + final_reason = ' '.join(cmd[3:]) + + uplink.Protocol.send2socket(f":{service_id} KICK {sentchannel} {nickname} {final_reason}") + uplink.Logs.debug(f'{client} has kicked {nickname} from {sentchannel} : {final_reason}') + return None + +def set_kickban(uplink: 'Command', cmd: list[str], client: str) -> None: + + command = str(cmd[0]) + dnickname = uplink.Config.SERVICE_NICKNAME + service_id = uplink.Config.SERVICE_ID + + sentchannel = str(cmd[1]) if uplink.Channel.is_valid_channel(cmd[1]) else None + if sentchannel is None: + uplink.Protocol.send_notice(nick_from=dnickname, nick_to=client, msg=f" Right command : /msg {dnickname} {command} [#SALON] [NICKNAME]") + return False + nickname = cmd[2] + final_reason = ' '.join(cmd[3:]) + + uplink.Protocol.send2socket(f":{service_id} KICK {sentchannel} {nickname} {final_reason}") + uplink.Protocol.send2socket(f":{service_id} MODE {sentchannel} +b {nickname}!*@*") + uplink.Logs.debug(f'{client} has kicked and banned {nickname} from {sentchannel} : {final_reason}') + +def set_assign_channel_to_service(uplink: 'Command', cmd: list[str], client: str) -> None: + + command = str(cmd[0]) + dnickname = uplink.Config.SERVICE_NICKNAME + sent_channel = str(cmd[1]) if uplink.Channel.is_valid_channel(cmd[1]) else None + if sent_channel is None: + uplink.Protocol.send_notice(nick_from=dnickname, nick_to=client, msg=f" Right command : /msg {dnickname} {command.upper()} [#SALON]") + return None + + # self.Protocol.send2socket(f':{service_id} JOIN {sent_channel}') + uplink.Protocol.send_join_chan(uidornickname=dnickname,channel=sent_channel) + uplink.Protocol.send_notice(nick_from=dnickname, nick_to=client, msg=f" Has joined {sent_channel}") + uplink.Channel.db_query_channel('add', uplink.module_name, sent_channel) + + return None + +def set_unassign_channel_to_service(uplink: 'Command', cmd: list[str], client: str) -> None: + + command = str(cmd[0]) + dnickname = uplink.Config.SERVICE_NICKNAME + dchanlog = uplink.Config.SERVICE_CHANLOG + + sent_channel = str(cmd[1]) if uplink.Channel.is_valid_channel(cmd[1]) else None + if sent_channel is None: + uplink.Protocol.send_notice(nick_from=dnickname, nick_to=client, msg=f" Right command : /msg {dnickname} {command.upper()} [#SALON]") + return None + + if sent_channel == dchanlog: + uplink.Protocol.send_notice(nick_from=dnickname, nick_to=client, msg=f"[!] CAN'T LEFT {sent_channel} AS IT IS LOG CHANNEL [!]") + return None + + uplink.Protocol.send_part_chan(uidornickname=dnickname, channel=sent_channel) + uplink.Protocol.send_notice(nick_from=dnickname, nick_to=client, msg=f" Has left {sent_channel}") + + uplink.Channel.db_query_channel('del', uplink.module_name, sent_channel) + return None \ No newline at end of file diff --git a/mods/jsonrpc/mod_jsonrpc.py b/mods/jsonrpc/mod_jsonrpc.py index e7181cd..1f12134 100644 --- a/mods/jsonrpc/mod_jsonrpc.py +++ b/mods/jsonrpc/mod_jsonrpc.py @@ -1,4 +1,6 @@ +import gc import logging +from time import sleep from typing import TYPE_CHECKING from dataclasses import dataclass from unrealircd_rpc_py.Live import LiveWebsocket @@ -44,6 +46,7 @@ class Jsonrpc(): # Create module commands (Mandatory) self.Irc.build_command(1, self.module_name, 'jsonrpc', 'Activate the JSON RPC Live connection [ON|OFF]') self.Irc.build_command(1, self.module_name, 'jruser', 'Get Information about a user using JSON RPC') + self.Irc.build_command(1, self.module_name, 'jrinstances', 'Get number of instances') # Init the module self.__init_module() @@ -51,10 +54,13 @@ class Jsonrpc(): # Log the module self.Logs.debug(f'Module {self.module_name} loaded ...') + def compter_instances(self, cls) -> int: + return sum(1 for obj in gc.get_objects() if isinstance(obj, cls)) + def __init_module(self) -> None: logging.getLogger('websockets').setLevel(logging.WARNING) - logging.getLogger('unrealircd-rpc-py').setLevel(logging.CRITICAL) + logging.getLogger('unrealircd-rpc-py').setLevel(logging.DEBUG) # Create you own tables (Mandatory) # self.__create_tables() @@ -68,11 +74,12 @@ class Jsonrpc(): username=self.Config.JSONRPC_USER, password=self.Config.JSONRPC_PASSWORD, callback_object_instance=self, - callback_method_or_function_name='callback_sent_to_irc' + callback_method_or_function_name='callback_sent_to_irc', + debug_level=10 ) if self.UnrealIrcdRpcLive.get_error.code != 0: - self.Logs.error(self.UnrealIrcdRpcLive.get_error.code, self.UnrealIrcdRpcLive.get_error.message) + self.Logs.error(f"{self.UnrealIrcdRpcLive.get_error.message} ({self.UnrealIrcdRpcLive.get_error.code})") self.Protocol.send_priv_msg( nick_from=self.Config.SERVICE_NICKNAME, msg=f"[{self.Config.COLORS.red}ERROR{self.Config.COLORS.nogc}] {self.UnrealIrcdRpcLive.get_error.message}", @@ -88,7 +95,8 @@ class Jsonrpc(): ) if self.Rpc.get_error.code != 0: - self.Logs.error(self.Rpc.get_error.code, self.Rpc.get_error.message) + + self.Logs.error(f"{self.Rpc.get_error.message} ({self.Rpc.get_error.code})") self.Protocol.send_priv_msg( nick_from=self.Config.SERVICE_NICKNAME, msg=f"[{self.Config.COLORS.red}ERROR{self.Config.COLORS.nogc}] {self.Rpc.get_error.message}", @@ -132,10 +140,14 @@ class Jsonrpc(): red = self.Config.COLORS.red if hasattr(response, 'result'): + if response.method == 'log.unsubscribe': + print(f">>>>>>>>>>>>>>>>> {response}") + return None + if isinstance(response.result, bool) and response.result: self.Protocol.send_priv_msg( nick_from=self.Config.SERVICE_NICKNAME, - msg=f"[{bold}{green}JSONRPC{nogc}{bold}] Event activated", + msg=f"[{bold}{green}JSONRPC{nogc}{bold}] JSONRPC Event activated on {self.Config.JSONRPC_URL}", channel=dchanlog) return None @@ -155,15 +167,37 @@ class Jsonrpc(): def thread_start_jsonrpc(self): + if not hasattr(self, 'UnrealIrcdRpcLive'): + return None + if self.UnrealIrcdRpcLive.get_error.code == 0: - self.UnrealIrcdRpcLive.subscribe(["all"]) self.subscribed = True + self.UnrealIrcdRpcLive.subscribe(["all"]) else: self.Protocol.send_priv_msg( nick_from=self.Config.SERVICE_NICKNAME, msg=f"[{self.Config.COLORS.red}ERROR{self.Config.COLORS.nogc}] {self.UnrealIrcdRpcLive.get_error.message}", channel=self.Config.SERVICE_CHANLOG ) + + if not self.subscribed: + self.Protocol.send_priv_msg( + nick_from=self.Config.SERVICE_NICKNAME, + msg=f"[{self.Config.COLORS.green}JSONRPC{self.Config.COLORS.nogc}] Stream is OFF", + channel=self.Config.SERVICE_CHANLOG + ) + else: + self.Protocol.send_priv_msg( + nick_from=self.Config.SERVICE_NICKNAME, + msg=f"[{self.Config.COLORS.red}JSONRPC{self.Config.COLORS.nogc}] Stream has crashed!", + channel=self.Config.SERVICE_CHANLOG + ) + + def thread_stop_jsonrpc(self) -> None: + self.subscribed = False + self.UnrealIrcdRpcLive.unsubscribe() + self.Logs.debug("[JSONRPC UNLOAD] Unsubscribe from the stream!") + self.__update_configuration('jsonrpc', 0) def __load_module_configuration(self) -> None: """### Load Module Configuration @@ -189,9 +223,31 @@ class Jsonrpc(): """ self.Base.db_update_core_config(self.module_name, self.ModConfig, param_key, param_value) + def restart_jsonrpc(self) -> None: + self.Logs.debug("[JSONRPC THREAD] Rescue the connection of JSONRPC") + you_can_run_jsonrpc = False + limit = 10 + inc = 0 + + for thread in self.Base.running_threads: + if thread.name == 'thread_start_jsonrpc': + while thread.is_alive(): + sleep(2) + inc += 1 + if inc > limit: + break + + if not thread.is_alive(): + you_can_run_jsonrpc = True + break + + if you_can_run_jsonrpc: + self.Logs.debug("[JSONRPC THREAD] Success re run jsonrpc") + self.Base.create_thread(self.thread_start_jsonrpc, run_once=True) + def unload(self) -> None: - if self.UnrealIrcdRpcLive.get_error.code != -1: - self.UnrealIrcdRpcLive.unsubscribe() + self.Base.create_thread(func=self.thread_stop_jsonrpc, run_once=True) + self.Logs.debug(f"Unloading {self.module_name}") return None def cmd(self, data:list) -> None: @@ -243,7 +299,8 @@ class Jsonrpc(): self.__update_configuration('jsonrpc', 1) case 'off': - self.UnrealIrcdRpcLive.unsubscribe() + self.subscribed = False + self.Base.create_thread(func=self.thread_stop_jsonrpc, run_once=True) self.__update_configuration('jsonrpc', 0) except IndexError as ie: @@ -300,22 +357,11 @@ class Jsonrpc(): except IndexError as ie: self.Logs.error(ie) - case 'ia': + case 'jrinstances': try: - - self.Base.create_thread(self.thread_ask_ia, ('',)) - - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" This is a notice to the sender ...") - self.Protocol.send_priv_msg(nick_from=dnickname, msg="This is private message to the sender ...", nick_to=fromuser) - - if not fromchannel is None: - self.Protocol.send_priv_msg(nick_from=dnickname, msg="This is channel message to the sender ...", channel=fromchannel) - - # How to update your module configuration - self.__update_configuration('param_exemple2', 7) - - # Log if you want the result - self.Logs.debug(f"Test logs ready") - + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"GC Collect: {gc.collect()}") + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Nombre d'instance LiveWebsock: {self.compter_instances(LiveWebsocket)}") + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Nombre d'instance Loader: {self.compter_instances(Loader)}") + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Nombre de toute les instances: {len(gc.get_objects())}") except Exception as err: self.Logs.error(f"Unknown Error: {err}") \ No newline at end of file diff --git a/version.json b/version.json index a4a7194..085f338 100644 --- a/version.json +++ b/version.json @@ -3,7 +3,7 @@ "requests": "2.32.3", "psutil": "6.0.0", - "unrealircd_rpc_py": "2.0.1", + "unrealircd_rpc_py": "2.0.2", "sqlalchemy": "2.0.35", "faker": "30.1.0" } \ No newline at end of file