update rest of modules to fit requirements

This commit is contained in:
adator
2025-11-01 22:00:08 +01:00
parent 2fbe75b83e
commit 769ab8b632
9 changed files with 187 additions and 351 deletions

View File

@@ -31,6 +31,9 @@ class IModule(ABC):
# Add Global Configuration to the module (Mandatory)
self.Config = uplink.Config
# Add Settings to the module (Mandatory)
self.Settings = uplink.Settings
# Add Base object to the module (Mandatory)
self.Base = uplink.Base
@@ -46,6 +49,9 @@ class IModule(ABC):
# Add Client object to the module (Mandatory)
self.Client = uplink.Client
# Add Admin object to the module (Mandatory)
self.Admin = uplink.Admin
# Add Channel object to the module (Mandatory)
self.Channel = uplink.Channel
@@ -58,9 +64,6 @@ class IModule(ABC):
# Inspect child classes
self.inspect_class()
# Init the ModConfig model object.
self.ModConfig:ModConfModel = ModConfModel()
self.create_tables()
# Sync the configuration with core configuration (Mandatory)

View File

@@ -78,6 +78,7 @@ class Module:
msg=f"[{red}MODULE ERROR{nogc}] Module {module_name} is facing issues ! {attr}",
channel=self.__Config.SERVICE_CHANLOG
)
self.__Logs.error(msg=attr, exc_info=True)
return False
if not hasattr(create_instance_of_the_class, 'cmd'):

View File

@@ -1,48 +1,40 @@
from typing import TYPE_CHECKING
from dataclasses import dataclass
from core.classes.interfaces.imodule import IModule
import mods.defender.schemas as schemas
import mods.defender.utils as utils
import mods.defender.threads as thds
from core.utils import tr
if TYPE_CHECKING:
from core.irc import Irc
class Defender(IModule):
class Defender:
@dataclass
class ModConfModel(schemas.ModConfModel):
...
def __init__(self, irc_instance: 'Irc') -> None:
def create_tables(self) -> None:
"""Methode qui va créer la base de donnée si elle n'existe pas.
Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module
Args:
database_name (str): Nom de la base de données ( pas d'espace dans le nom )
# Module name (Mandatory)
self.module_name = 'mod_' + str(self.__class__.__name__).lower()
Returns:
None: Aucun retour n'es attendu
"""
# Add Irc Object to the module (Mandatory)
self.Irc = irc_instance
# table_autoop = '''CREATE TABLE IF NOT EXISTS defender_autoop (
# id INTEGER PRIMARY KEY AUTOINCREMENT,
# datetime TEXT,
# nickname TEXT,
# channel TEXT
# )
# '''
# Add Loader Object to the module (Mandatory)
self.Loader = irc_instance.Loader
# self.Base.db_execute_query(table_autoop)
# self.Base.db_execute_query(table_config)
# self.Base.db_execute_query(table_trusted)
return None
# Add server protocol Object to the module (Mandatory)
self.Protocol = irc_instance.Protocol
# Add Global Configuration to the module (Mandatory)
self.Config = irc_instance.Config
# Add Base object to the module (Mandatory)
self.Base = irc_instance.Base
# Add logs object to the module (Mandatory)
self.Logs = irc_instance.Loader.Logs
# Add User object to the module (Mandatory)
self.User = irc_instance.User
# Add Channel object to the module (Mandatory)
self.Channel = irc_instance.Channel
# Add Settings object to save objects when reloading modules (Mandatory)
self.Settings = irc_instance.Settings
# Add Reputation object to the module (Optional)
self.Reputation = irc_instance.Reputation
def load(self):
# Add module schemas
self.Schemas = schemas
@@ -50,6 +42,9 @@ class Defender:
# Add utils functions
self.Utils = utils
# Variable qui va contenir les options de configuration du module Defender
self.ModConfig: schemas.ModConfModel = self.ModConfModel()
# Create module commands (Mandatory)
self.Irc.build_command(0, self.module_name, 'code', 'Display the code or key for access')
self.Irc.build_command(1, self.module_name, 'info', 'Provide information about the channel or server')
@@ -62,38 +57,16 @@ class Defender:
self.Irc.build_command(3, self.module_name, 'show_reputation', 'Display reputation information')
self.Irc.build_command(3, self.module_name, 'sentinel', 'Monitor and guard the channel or server')
# Init the module (Mandatory)
self.__init_module()
# Log the module
self.Logs.debug(f'-- Module {self.module_name} V2 loaded ...')
def __init_module(self) -> None:
# Create you own tables if needed (Mandatory)
self.__create_tables()
# Load module configuration (Mandatory)
self.__load_module_configuration()
# End of mandatory methods you can start your customization #
self.timeout = self.Config.API_TIMEOUT
# Listes qui vont contenir les ip a scanner avec les différentes API
self.Schemas.DB_ABUSEIPDB_USERS = []
self.Schemas.DB_FREEIPAPI_USERS = []
self.Schemas.DB_CLOUDFILT_USERS = []
self.Schemas.DB_PSUTIL_USERS = []
self.Schemas.DB_LOCALSCAN_USERS = []
self.Schemas.DB_ABUSEIPDB_USERS = self.Schemas.DB_FREEIPAPI_USERS = self.Schemas.DB_CLOUDFILT_USERS = []
self.Schemas.DB_PSUTIL_USERS = self.Schemas.DB_LOCALSCAN_USERS = []
# Variables qui indique que les threads sont en cours d'éxecutions
self.abuseipdb_isRunning:bool = True
self.freeipapi_isRunning:bool = True
self.cloudfilt_isRunning:bool = True
self.psutil_isRunning:bool = True
self.localscan_isRunning:bool = True
self.reputationTimer_isRunning:bool = True
self.autolimit_isRunning: bool = True
self.abuseipdb_isRunning = self.freeipapi_isRunning= self.cloudfilt_isRunning = True
self.psutil_isRunning = self.localscan_isRunning = self.reputationTimer_isRunning = True
self.autolimit_isRunning = True
# Variable qui va contenir les users
self.flood_system = {}
@@ -121,46 +94,6 @@ class Defender:
self.Protocol.send_sjoin(self.Config.SALON_JAIL)
self.Protocol.send2socket(f":{self.Config.SERVICE_NICKNAME} SAMODE {self.Config.SALON_JAIL} +o {self.Config.SERVICE_NICKNAME}")
return None
def __create_tables(self) -> None:
"""Methode qui va créer la base de donnée si elle n'existe pas.
Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module
Args:
database_name (str): Nom de la base de données ( pas d'espace dans le nom )
Returns:
None: Aucun retour n'es attendu
"""
# table_autoop = '''CREATE TABLE IF NOT EXISTS defender_autoop (
# id INTEGER PRIMARY KEY AUTOINCREMENT,
# datetime TEXT,
# nickname TEXT,
# channel TEXT
# )
# '''
# self.Base.db_execute_query(table_autoop)
# self.Base.db_execute_query(table_config)
# self.Base.db_execute_query(table_trusted)
return None
def __load_module_configuration(self) -> None:
"""### Load Module Configuration
"""
# Variable qui va contenir les options de configuration du module Defender
self.ModConfig = self.Schemas.ModConfModel()
# Sync the configuration with core configuration (Mandatory)
self.Base.db_sync_core_config(self.module_name, self.ModConfig)
return None
def __update_configuration(self, param_key: str, param_value: str):
self.Base.db_update_core_config(self.module_name, self.ModConfig, param_key, param_value)
def __onload(self):
abuseipdb = self.Settings.get_cache('ABUSEIPDB')
@@ -431,7 +364,7 @@ class Defender:
match arg:
case 'on':
if self.ModConfig.autolimit == 0:
self.__update_configuration('autolimit', 1)
self.update_configuration('autolimit', 1)
self.autolimit_isRunning = True
self.Base.create_thread(func=thds.thread_autolimit, func_args=(self, ))
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[{self.Config.COLORS.green}AUTOLIMIT{self.Config.COLORS.nogc}] Activated", channel=self.Config.SERVICE_CHANLOG)
@@ -440,7 +373,7 @@ class Defender:
case 'off':
if self.ModConfig.autolimit == 1:
self.__update_configuration('autolimit', 0)
self.update_configuration('autolimit', 0)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[{self.Config.COLORS.green}AUTOLIMIT{self.Config.COLORS.nogc}] Deactivated", channel=self.Config.SERVICE_CHANLOG)
else:
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[{self.Config.COLORS.red}AUTOLIMIT{self.Config.COLORS.nogc}] Already Deactivated", channel=self.Config.SERVICE_CHANLOG)
@@ -449,8 +382,8 @@ class Defender:
amount = int(cmd[2])
interval = int(cmd[3])
self.__update_configuration('autolimit_amount', amount)
self.__update_configuration('autolimit_interval', interval)
self.update_configuration('autolimit_amount', amount)
self.update_configuration('autolimit_interval', interval)
self.Protocol.send_priv_msg(
nick_from=dnickname,
msg=f"[{self.Config.COLORS.green}AUTOLIMIT{self.Config.COLORS.nogc}] Amount set to ({amount}) | Interval set to ({interval})",
@@ -489,7 +422,7 @@ class Defender:
return False
# self.update_db_configuration('reputation', 1)
self.__update_configuration(key, 1)
self.update_configuration(key, 1)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}REPUTATION{self.Config.COLORS.black} ] : Activated by {fromuser}", channel=dchanlog)
@@ -515,7 +448,7 @@ class Defender:
)
return False
self.__update_configuration(key, 0)
self.update_configuration(key, 0)
self.Protocol.send_priv_msg(
nick_from=dnickname,
@@ -604,7 +537,7 @@ class Defender:
return False
# self.update_db_configuration(key, 1)
self.__update_configuration(key, 1)
self.update_configuration(key, 1)
self.Protocol.send_priv_msg(
nick_from=dnickname,
@@ -622,7 +555,7 @@ class Defender:
return False
# self.update_db_configuration(key, 0)
self.__update_configuration(key, 0)
self.update_configuration(key, 0)
self.Protocol.send_priv_msg(
nick_from=dnickname,
@@ -635,7 +568,7 @@ class Defender:
key = 'reputation_seuil'
# self.update_db_configuration(key, reputation_seuil)
self.__update_configuration(key, reputation_seuil)
self.update_configuration(key, reputation_seuil)
self.Protocol.send_priv_msg(
nick_from=dnickname,
@@ -647,7 +580,7 @@ class Defender:
case 'timer':
reputation_timer = int(cmd[3])
key = 'reputation_timer'
self.__update_configuration(key, reputation_timer)
self.update_configuration(key, reputation_timer)
self.Protocol.send_priv_msg(
nick_from=dnickname,
@@ -659,7 +592,7 @@ class Defender:
case 'score_after_release':
reputation_score_after_release = int(cmd[3])
key = 'reputation_score_after_release'
self.__update_configuration(key, reputation_score_after_release)
self.update_configuration(key, reputation_score_after_release)
self.Protocol.send_priv_msg(
nick_from=dnickname,
@@ -671,7 +604,7 @@ class Defender:
case 'security_group':
reputation_sg = int(cmd[3])
key = 'reputation_sg'
self.__update_configuration(key, reputation_sg)
self.update_configuration(key, reputation_sg)
self.Protocol.send_priv_msg(
nick_from=dnickname,
@@ -733,7 +666,7 @@ class Defender:
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
return None
self.__update_configuration(option, 1)
self.update_configuration(option, 1)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
elif action == 'off':
@@ -741,7 +674,7 @@ class Defender:
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog)
return None
self.__update_configuration(option, 0)
self.update_configuration(option, 0)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
@@ -751,7 +684,7 @@ class Defender:
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
return None
self.__update_configuration(option, 1)
self.update_configuration(option, 1)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
elif action == 'off':
@@ -759,7 +692,7 @@ class Defender:
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog)
return None
self.__update_configuration(option, 0)
self.update_configuration(option, 0)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
@@ -769,7 +702,7 @@ class Defender:
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
return None
self.__update_configuration(option, 1)
self.update_configuration(option, 1)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
elif action == 'off':
@@ -777,7 +710,7 @@ class Defender:
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog)
return None
self.__update_configuration(option, 0)
self.update_configuration(option, 0)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
@@ -787,7 +720,7 @@ class Defender:
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
return None
self.__update_configuration(option, 1)
self.update_configuration(option, 1)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
elif action == 'off':
@@ -795,7 +728,7 @@ class Defender:
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog)
return None
self.__update_configuration(option, 0)
self.update_configuration(option, 0)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
@@ -805,7 +738,7 @@ class Defender:
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
return None
self.__update_configuration(option, 1)
self.update_configuration(option, 1)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
elif action == 'off':
@@ -813,7 +746,7 @@ class Defender:
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog)
return None
self.__update_configuration(option, 0)
self.update_configuration(option, 0)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
@@ -846,7 +779,7 @@ class Defender:
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Already activated", channel=dchanlog)
return False
self.__update_configuration(key, 1)
self.update_configuration(key, 1)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Activated by {fromuser}", channel=dchanlog)
@@ -855,7 +788,7 @@ class Defender:
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.red}FLOOD{self.Config.COLORS.black} ] : Already Deactivated", channel=dchanlog)
return False
self.__update_configuration(key, 0)
self.update_configuration(key, 0)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Deactivated by {fromuser}", channel=dchanlog)
@@ -867,7 +800,7 @@ class Defender:
case 'flood_message':
key = 'flood_message'
set_value = int(cmd[3])
self.__update_configuration(key, set_value)
self.update_configuration(key, set_value)
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Flood message set to {set_value} by {fromuser}",
@@ -876,7 +809,7 @@ class Defender:
case 'flood_time':
key = 'flood_time'
set_value = int(cmd[3])
self.__update_configuration(key, set_value)
self.update_configuration(key, set_value)
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Flood time set to {set_value} by {fromuser}",
@@ -885,7 +818,7 @@ class Defender:
case 'flood_timer':
key = 'flood_timer'
set_value = int(cmd[3])
self.__update_configuration(key, set_value)
self.update_configuration(key, set_value)
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Flood timer set to {set_value} by {fromuser}",
@@ -922,6 +855,7 @@ class Defender:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' flood_message ==> {self.ModConfig.flood_message}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' flood_time ==> {self.ModConfig.flood_time}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' flood_timer ==> {self.ModConfig.flood_timer}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' [{color_green if self.ModConfig.flood == 1 else color_red}Sentinel{nogc}] ==> {self.ModConfig.sentinel}')
except KeyError as ke:
self.Logs.error(f"Key Error : {ke}")
@@ -963,14 +897,27 @@ class Defender:
channel_to_dont_quit = [self.Config.SALON_JAIL, self.Config.SERVICE_CHANLOG]
if activation == 'on':
result = self.Base.db_execute_query(f"SELECT distinct channel_name FROM {self.Config.TABLE_CHANNEL}")
channels = result.fetchall()
channel_in_db = [channel[0] for channel in channels]
channel_to_dont_quit.extend(channel_in_db)
self.update_configuration('sentinel', 1)
for chan in self.Channel.UID_CHANNEL_DB:
if chan.name not in channel_to_dont_quit:
self.Protocol.send_join_chan(uidornickname=dnickname, channel=chan.name)
self.Protocol.send_priv_msg(dnickname, f"Sentinel mode activated on {channel}", channel=chan.name)
return None
if activation == 'off':
result = self.Base.db_execute_query(f"SELECT distinct channel_name FROM {self.Config.TABLE_CHANNEL}")
channels = result.fetchall()
channel_in_db = [channel[0] for channel in channels]
channel_to_dont_quit.extend(channel_in_db)
self.update_configuration('sentinel', 0)
for chan in self.Channel.UID_CHANNEL_DB:
if chan.name not in channel_to_dont_quit:
self.Protocol.send_part_chan(uidornickname=dnickname, channel=chan.name)
self.Protocol.send_priv_msg(dnickname, f"Sentinel mode deactivated on {channel}", channel=chan.name)
self.join_saved_channels()
return None

View File

@@ -20,6 +20,7 @@ class ModConfModel(MainModel):
autolimit: int = 0
autolimit_amount: int = 3
autolimit_interval: int = 3
sentinel: int = 0
@dataclass
class FloodUser(MainModel):

View File

@@ -62,6 +62,14 @@ def handle_on_mode(uplink: 'Defender', srvmsg: list[str]):
def handle_on_privmsg(uplink: 'Defender', srvmsg: list[str]):
# ['@mtag....',':python', 'PRIVMSG', '#defender', ':zefzefzregreg', 'regg', 'aerg']
sender = srvmsg[1].replace(':','')
channel = srvmsg[3]
message = srvmsg[4:]
message[0] = message[0].replace(':', '')
if uplink.ModConfig.sentinel == 1 and srvmsg[3] != uplink.Config.SERVICE_CHANLOG:
uplink.Protocol.send_priv_msg(uplink.Config.SERVICE_NICKNAME, f"{sender} say on {channel}: {' '.join(message)}", uplink.Config.SERVICE_CHANLOG)
action_on_flood(uplink, srvmsg)
return None
@@ -385,7 +393,7 @@ def action_apply_reputation_santions(uplink: 'Defender') -> None:
salon_jail = gconfig.SALON_JAIL
uid_to_clean = []
if reputation_flag == 0 or reputation_timer == 0:
if reputation_flag == 0 or reputation_timer == 0 or not irc.Reputation.UID_REPUTATION_DB:
return None
for user in irc.Reputation.UID_REPUTATION_DB:

View File

@@ -1,18 +1,13 @@
import logging
import asyncio
from unrealircd_rpc_py.objects.Definition import LiveRPCResult
from core.classes.interfaces.imodule import IModule
import mods.jsonrpc.utils as utils
import mods.jsonrpc.threads as thds
from time import sleep
from typing import TYPE_CHECKING
from dataclasses import dataclass
from unrealircd_rpc_py.ConnectionFactory import ConnectionFactory
from unrealircd_rpc_py.LiveConnectionFactory import LiveConnectionFactory
if TYPE_CHECKING:
from core.irc import Irc
class Jsonrpc():
class Jsonrpc(IModule):
@dataclass
class ModConfModel:
@@ -20,121 +15,6 @@ class Jsonrpc():
"""
jsonrpc: int = 0
def __init__(self, ircInstance: 'Irc') -> None:
# Module name (Mandatory)
self.module_name = 'mod_' + str(self.__class__.__name__).lower()
# Add Irc Object to the module (Mandatory)
self.Irc = ircInstance
# Add Protocol to the module (Mandatory)
self.Protocol = ircInstance.Protocol
# Add Global Configuration to the module (Mandatory)
self.Config = ircInstance.Config
# Add Base object to the module (Mandatory)
self.Base = ircInstance.Base
# Add Main Utils (Mandatory)
self.MainUtils = ircInstance.Utils
# Add logs object to the module (Mandatory)
self.Logs = ircInstance.Loader.Logs
# Add User object to the module (Mandatory)
self.User = ircInstance.User
# Add Channel object to the module (Mandatory)
self.Channel = ircInstance.Channel
# Is RPC Active?
self.is_streaming = False
# Module Utils
self.Utils = utils
# Module threads
self.Threads = thds
# Run Garbage collector.
self.Base.create_timer(10, self.MainUtils.run_python_garbage_collector)
# Create module commands (Mandatory)
self.Irc.build_command(1, self.module_name, 'jsonrpc', 'Activate the JSON RPC Live connection [ON|OFF]')
self.Irc.build_command(1, self.module_name, 'jruser', 'Get Information about a user using JSON RPC')
self.Irc.build_command(1, self.module_name, 'jrinstances', 'Get number of instances')
# Init the module
self.__init_module()
# Log the module
self.Logs.debug(f'Module {self.module_name} loaded ...')
def __init_module(self) -> None:
logging.getLogger('websockets').setLevel(logging.WARNING)
logging.getLogger('unrealircd-rpc-py').setLevel(logging.CRITICAL)
logging.getLogger('unrealircd-liverpc-py').setLevel(logging.CRITICAL)
# Create you own tables (Mandatory)
# self.__create_tables()
# Load module configuration and sync with core one (Mandatory)
self.__load_module_configuration()
# End of mandatory methods you can start your customization #
try:
self.Rpc = ConnectionFactory(self.Config.DEBUG_LEVEL).get(self.Config.JSONRPC_METHOD)
self.LiveRpc = LiveConnectionFactory(self.Config.DEBUG_LEVEL).get(self.Config.JSONRPC_METHOD)
sync_unixsocket = {'path_to_socket_file': self.Config.JSONRPC_PATH_TO_SOCKET_FILE}
sync_http = {'url': self.Config.JSONRPC_URL, 'username': self.Config.JSONRPC_USER, 'password': self.Config.JSONRPC_PASSWORD}
live_unixsocket = {'path_to_socket_file': self.Config.JSONRPC_PATH_TO_SOCKET_FILE,
'callback_object_instance' : self, 'callback_method_or_function_name': 'callback_sent_to_irc'}
live_http = {'url': self.Config.JSONRPC_URL, 'username': self.Config.JSONRPC_USER, 'password': self.Config.JSONRPC_PASSWORD,
'callback_object_instance' : self, 'callback_method_or_function_name': 'callback_sent_to_irc'}
sync_param = sync_unixsocket if self.Config.JSONRPC_METHOD == 'unixsocket' else sync_http
live_param = live_unixsocket if self.Config.JSONRPC_METHOD == 'unixsocket' else live_http
self.Rpc.setup(sync_param)
self.LiveRpc.setup(live_param)
if self.ModConfig.jsonrpc == 1:
self.Base.create_thread(func=self.Threads.thread_subscribe, func_args=(self, ), run_once=True)
return None
except Exception as err:
self.Protocol.send_priv_msg(
nick_from=self.Config.SERVICE_NICKNAME,
msg=f"[{self.Config.COLORS.red}JSONRPC ERROR{self.Config.COLORS.nogc}] {err.__str__()}",
channel=self.Config.SERVICE_CHANLOG
)
self.Logs.error(f"JSONRPC ERROR: {err.__str__()}")
def __create_tables(self) -> None:
"""Methode qui va créer la base de donnée si elle n'existe pas.
Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module
Args:
database_name (str): Nom de la base de données ( pas d'espace dans le nom )
Returns:
None: Aucun retour n'es attendu
"""
table_logs = '''CREATE TABLE IF NOT EXISTS test_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
server_msg TEXT
)
'''
self.Base.db_execute_query(table_logs)
return None
def callback_sent_to_irc(self, response: LiveRPCResult) -> None:
dnickname = self.Config.SERVICE_NICKNAME
@@ -169,29 +49,63 @@ class Jsonrpc():
return None
def __load_module_configuration(self) -> None:
"""### Load Module Configuration
"""
try:
# Build the default configuration model (Mandatory)
self.ModConfig = self.ModConfModel(jsonrpc=0)
def create_tables(self) -> None:
return None
# Sync the configuration with core configuration (Mandatory)
self.Base.db_sync_core_config(self.module_name, self.ModConfig)
def load(self) -> None:
logging.getLogger('websockets').setLevel(logging.WARNING)
logging.getLogger('unrealircd-rpc-py').setLevel(logging.CRITICAL)
logging.getLogger('unrealircd-liverpc-py').setLevel(logging.CRITICAL)
self.ModConfig = self.ModConfModel(jsonrpc=0)
# Is RPC Active?
self.is_streaming = False
# Module Utils
self.Utils = utils
# Module threads
self.Threads = thds
# Run Garbage collector.
self.Base.create_timer(10, self.MainUtils.run_python_garbage_collector)
# Create module commands (Mandatory)
self.Irc.build_command(1, self.module_name, 'jsonrpc', 'Activate the JSON RPC Live connection [ON|OFF]')
self.Irc.build_command(1, self.module_name, 'jruser', 'Get Information about a user using JSON RPC')
self.Irc.build_command(1, self.module_name, 'jrinstances', 'Get number of instances')
try:
self.Rpc = ConnectionFactory(self.Config.DEBUG_LEVEL).get(self.Config.JSONRPC_METHOD)
self.LiveRpc = LiveConnectionFactory(self.Config.DEBUG_LEVEL).get(self.Config.JSONRPC_METHOD)
sync_unixsocket = {'path_to_socket_file': self.Config.JSONRPC_PATH_TO_SOCKET_FILE}
sync_http = {'url': self.Config.JSONRPC_URL, 'username': self.Config.JSONRPC_USER, 'password': self.Config.JSONRPC_PASSWORD}
live_unixsocket = {'path_to_socket_file': self.Config.JSONRPC_PATH_TO_SOCKET_FILE,
'callback_object_instance' : self, 'callback_method_or_function_name': 'callback_sent_to_irc'}
live_http = {'url': self.Config.JSONRPC_URL, 'username': self.Config.JSONRPC_USER, 'password': self.Config.JSONRPC_PASSWORD,
'callback_object_instance' : self, 'callback_method_or_function_name': 'callback_sent_to_irc'}
sync_param = sync_unixsocket if self.Config.JSONRPC_METHOD == 'unixsocket' else sync_http
live_param = live_unixsocket if self.Config.JSONRPC_METHOD == 'unixsocket' else live_http
self.Rpc.setup(sync_param)
self.LiveRpc.setup(live_param)
if self.ModConfig.jsonrpc == 1:
self.Base.create_thread(func=self.Threads.thread_subscribe, func_args=(self, ), run_once=True)
return None
except TypeError as te:
self.Logs.critical(te)
def update_configuration(self, param_key: str, param_value: str) -> None:
"""Update the local and core configuration
Args:
param_key (str): The parameter key
param_value (str): The parameter value
"""
self.Base.db_update_core_config(self.module_name, self.ModConfig, param_key, param_value)
except Exception as err:
self.Protocol.send_priv_msg(
nick_from=self.Config.SERVICE_NICKNAME,
msg=f"[{self.Config.COLORS.red}JSONRPC ERROR{self.Config.COLORS.nogc}] {err.__str__()}",
channel=self.Config.SERVICE_CHANLOG
)
self.Logs.error(f"JSONRPC ERROR: {err.__str__()}")
def unload(self) -> None:
if self.is_streaming:

View File

@@ -45,6 +45,7 @@ class Test(IModule):
# Build the default configuration model (Mandatory)
self.ModConfig = self.ModConfModel(param_exemple1='str', param_exemple2=1)
def unload(self) -> None:
self.Irc.Commands.drop_command_by_module(self.module_name)
return None

View File

@@ -1,95 +1,29 @@
"""
File : mod_votekick.py
Version : 1.0.0
Version : 1.0.2
Description : Manages votekick sessions for multiple channels.
Handles activation, ongoing vote checks, and cleanup.
Author : adator
Created : 2025-08-16
Last Updated: 2025-08-16
Last Updated: 2025-11-01
-----------------------------------------
"""
from dataclasses import dataclass
import re
from core.classes.interfaces.imodule import IModule
import mods.votekick.schemas as schemas
import mods.votekick.utils as utils
from mods.votekick.votekick_manager import VotekickManager
import mods.votekick.threads as thds
from typing import TYPE_CHECKING, Any, Optional
if TYPE_CHECKING:
from core.irc import Irc
class Votekick(IModule):
@dataclass
class ModConfModel(schemas.VoteChannelModel):
...
class Votekick:
def __init__(self, uplink: 'Irc') -> None:
# Module name (Mandatory)
self.module_name = 'mod_' + str(self.__class__.__name__).lower()
# Add Irc Object to the module
self.Irc = uplink
# Add Loader Object to the module (Mandatory)
self.Loader = uplink.Loader
# Add server protocol Object to the module (Mandatory)
self.Protocol = uplink.Protocol
# Add Global Configuration to the module
self.Config = uplink.Config
# Add Base object to the module
self.Base = uplink.Base
# Add logs object to the module
self.Logs = uplink.Logs
# Add User object to the module
self.User = uplink.User
# Add Channel object to the module
self.Channel = uplink.Channel
# Add Utils.
self.Utils = uplink.Utils
# Add Utils module
self.ModUtils = utils
# Add Schemas module
self.Schemas = schemas
# Add Threads module
self.Threads = thds
# Add VoteKick Manager
self.VoteKickManager = VotekickManager(self)
metadata = uplink.Loader.Settings.get_cache('VOTEKICK')
if metadata is not None:
self.VoteKickManager.VOTE_CHANNEL_DB = metadata
# self.VOTE_CHANNEL_DB = metadata
# Créer les nouvelles commandes du module
self.Irc.build_command(1, self.module_name, 'vote', 'The kick vote module')
# Init the module
self.__init_module()
# Log the module
self.Logs.debug(f'-- Module {self.module_name} loaded ...')
def __init_module(self) -> None:
# Add admin object to retrieve admin users
self.Admin = self.Irc.Admin
self.__create_tables()
self.ModUtils.join_saved_channels(self)
return None
def __create_tables(self) -> None:
def create_tables(self) -> None:
"""Methode qui va créer la base de donnée si elle n'existe pas.
Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module
@@ -115,10 +49,37 @@ class Votekick:
self.Base.db_execute_query(table_vote)
return None
def load(self) -> None:
self.ModConfig = self.ModConfModel()
# Add VoteKick Manager
self.VoteKickManager = VotekickManager(self)
# Add Utils module
self.ModUtils = utils
# Add Schemas module
self.Schemas = schemas
# Add Threads module
self.Threads = thds
self.ModUtils.join_saved_channels(self)
metadata = self.Settings.get_cache('VOTEKICK')
if metadata is not None:
self.VoteKickManager.VOTE_CHANNEL_DB = metadata
# self.VOTE_CHANNEL_DB = metadata
# Créer les nouvelles commandes du module
self.Irc.build_command(1, self.module_name, 'vote', 'The kick vote module')
def unload(self) -> None:
try:
# Cache the local DB with current votes.
self.Loader.Settings.set_cache('VOTEKICK', self.VoteKickManager.VOTE_CHANNEL_DB)
self.Settings.set_cache('VOTEKICK', self.VoteKickManager.VOTE_CHANNEL_DB)
for chan in self.VoteKickManager.VOTE_CHANNEL_DB:
self.Protocol.send_part_chan(uidornickname=self.Config.SERVICE_ID, channel=chan.channel_name)

View File

@@ -11,7 +11,7 @@ class VotekickManager:
def __init__(self, uplink: 'Votekick'):
self.uplink = uplink
self.Logs = uplink.Logs
self.Utils = uplink.Utils
self.Utils = uplink.MainUtils
def activate_new_channel(self, channel_name: str) -> bool:
"""Activate a new channel in the votekick systeme