Moving some methods to utils.py, creating new logs class

This commit is contained in:
adator
2025-08-17 22:41:51 +02:00
parent 8d23827e1e
commit 25262d4049
24 changed files with 653 additions and 601 deletions

View File

@@ -32,8 +32,9 @@ class Base:
self.Config = loader.Config
self.Settings = loader.Settings
self.Utils = loader.Utils
self.logs = loader.Logs
self.init_log_system() # Demarrer le systeme de log
# self.init_log_system() # Demarrer le systeme de log
self.check_for_new_version(True) # Verifier si une nouvelle version est disponible
# Liste des timers en cours
@@ -140,18 +141,6 @@ class Base:
except Exception as err:
self.logs.error(f'General Error: {err}')
def get_unixtime(self) -> int:
"""
Cette fonction retourne un UNIXTIME de type 12365456
Return: Current time in seconds since the Epoch (int)
"""
cet_offset = timezone(timedelta(hours=2))
now_cet = datetime.now(cet_offset)
unixtime_cet = int(now_cet.timestamp())
unixtime = int( time.time() )
return unixtime
def get_all_modules(self) -> list[str]:
"""Get list of all main modules
using this pattern mod_*.py
@@ -203,73 +192,6 @@ class Base:
return None
def init_log_system(self) -> None:
# Create folder if not available
logs_directory = f'logs{self.Config.OS_SEP}'
if not os.path.exists(f'{logs_directory}'):
os.makedirs(logs_directory)
# Init logs object
self.logs = logging.getLogger(self.Config.LOGGING_NAME)
self.logs.setLevel(self.Config.DEBUG_LEVEL)
# Add Handlers
file_hanlder = logging.FileHandler(f'logs{self.Config.OS_SEP}defender.log',encoding='UTF-8')
file_hanlder.setLevel(self.Config.DEBUG_LEVEL)
stdout_handler = logging.StreamHandler()
stdout_handler.setLevel(50)
# Define log format
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(filename)s - %(lineno)d - %(funcName)s - %(message)s')
# Apply log format
file_hanlder.setFormatter(formatter)
stdout_handler.setFormatter(formatter)
# Add handler to logs
self.logs.addHandler(file_hanlder)
self.logs.addHandler(stdout_handler)
# Apply the filter
self.logs.addFilter(self.replace_filter)
# self.logs.Logger('defender').addFilter(self.replace_filter)
self.logs.info('#################### STARTING DEFENDER ####################')
return None
def replace_filter(self, record: logging.LogRecord) -> bool:
response = True
filters: list[str] = ['PING',
f':{self.Config.SERVICE_PREFIX}auth']
# record.msg = record.getMessage().replace("PING", "[REDACTED]")
if self.Settings.CONSOLE:
print(record.getMessage())
for f in filters:
if f in record.getMessage():
response = False
return response # Retourne True pour permettre l'affichage du message
def delete_logger(self, logger_name: str) -> None:
# Récupérer le logger
logger = logging.getLogger(logger_name)
# Retirer tous les gestionnaires du logger et les fermer
for handler in logger.handlers[:]: # Utiliser une copie de la liste
logger.removeHandler(handler)
handler.close()
# Supprimer le logger du dictionnaire global
logging.Logger.manager.loggerDict.pop(logger_name, None)
return None
def log_cmd(self, user_cmd: str, cmd: str) -> None:
"""Enregistre les commandes envoyées par les utilisateurs
@@ -830,15 +752,6 @@ class Base:
self.logs.critical(f'General Error: {err}')
return None
def get_random(self, lenght:int) -> str:
"""
Retourn une chaîne aléatoire en fonction de la longueur spécifiée.
"""
caracteres = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
randomize = ''.join(random.choice(caracteres) for _ in range(lenght))
return randomize
def execute_periodic_action(self) -> None:
if not self.periodic_func:
@@ -878,23 +791,3 @@ class Base:
self.logs.debug(f'Method to execute : {str(self.periodic_func)}')
return None
def clean_uid(self, uid:str) -> Optional[str]:
"""Clean UID by removing @ / % / + / ~ / * / :
Args:
uid (str): The UID to clean
Returns:
str: Clean UID without any sign
"""
try:
if uid is None:
return None
pattern = fr'[:|@|%|\+|~|\*]*'
parsed_UID = re.sub(pattern, '', uid)
return parsed_UID
except TypeError as te:
self.logs.error(f'Type Error: {te}')

View File

@@ -1,13 +1,16 @@
from typing import Optional
from typing import TYPE_CHECKING, Optional
from core.base import Base
from core.definition import MAdmin
if TYPE_CHECKING:
from core.loader import Loader
class Admin:
UID_ADMIN_DB: list[MAdmin] = []
def __init__(self, base: Base) -> None:
self.Logs = base.logs
def __init__(self, loader: 'Loader') -> None:
self.Logs = loader.Logs
def insert(self, new_admin: MAdmin) -> bool:
"""Insert a new admin object model

View File

@@ -13,7 +13,7 @@ class Channel:
def __init__(self, loader: 'Loader') -> None:
self.Logs = loader.Base.logs
self.Logs = loader.Logs
self.Base = loader.Base
self.Utils = loader.Utils
@@ -102,7 +102,7 @@ class Channel:
return result
for userid in chan_obj.uids:
if self.Base.clean_uid(userid) == self.Base.clean_uid(uid):
if self.Utils.clean_uid(userid) == self.Utils.clean_uid(uid):
chan_obj.uids.remove(userid)
result = True
@@ -126,7 +126,7 @@ class Channel:
for record in self.UID_CHANNEL_DB:
for user_id in record.uids:
if self.Base.clean_uid(user_id) == self.Base.clean_uid(uid):
if self.Utils.clean_uid(user_id) == self.Utils.clean_uid(uid):
record.uids.remove(user_id)
result = True
@@ -177,9 +177,9 @@ class Channel:
if chan is None:
return False
clean_uid = self.Base.clean_uid(uid=uid)
clean_uid = self.Utils.clean_uid(uid=uid)
for chan_uid in chan.uids:
if self.Base.clean_uid(chan_uid) == clean_uid:
if self.Utils.clean_uid(chan_uid) == clean_uid:
return True
return False

View File

@@ -2,17 +2,17 @@ from re import sub
from typing import Any, Optional, Union, TYPE_CHECKING
if TYPE_CHECKING:
from core.base import Base
from core.loader import Loader
from core.definition import MClient
class Client:
CLIENT_DB: list['MClient'] = []
def __init__(self, base: 'Base'):
def __init__(self, loader: 'Loader'):
self.Logs = base.logs
self.Base = base
self.Logs = loader.Logs
self.Base = loader.Base
def insert(self, new_client: 'MClient') -> bool:
"""Insert a new User object

View File

@@ -2,14 +2,14 @@ from typing import TYPE_CHECKING, Optional
from core.definition import MCommand
if TYPE_CHECKING:
from core.base import Base
from core.loader import Loader
class Command:
DB_COMMANDS: list['MCommand'] = []
def __init__(self, base: 'Base'):
self.Base = base
def __init__(self, loader: 'Loader'):
self.Base = loader.Base
def build(self, new_command_obj: MCommand) -> bool:

View File

@@ -15,8 +15,9 @@ class Inspircd:
self.__Config = ircInstance.Config
self.__Base = ircInstance.Base
self.__Utils = ircInstance.Loader.Utils
self.__Logs = ircInstance.Loader.Logs
self.__Base.logs.info(f"** Loading protocol [{__name__}]")
self.__Logs.info(f"** Loading protocol [{__name__}]")
def send2socket(self, message: str, print_log: bool = True) -> None:
"""Envoit les commandes à envoyer au serveur.
@@ -28,24 +29,24 @@ class Inspircd:
with self.__Base.lock:
self.__Irc.IrcSocket.send(f"{message}\r\n".encode(self.__Config.SERVEUR_CHARSET[0]))
if print_log:
self.__Base.logs.debug(f'<< {message}')
self.__Logs.debug(f'<< {message}')
except UnicodeDecodeError as ude:
self.__Base.logs.error(f'Decode Error try iso-8859-1 - {ude} - {message}')
self.__Logs.error(f'Decode Error try iso-8859-1 - {ude} - {message}')
self.__Irc.IrcSocket.send(f"{message}\r\n".encode(self.__Config.SERVEUR_CHARSET[1],'replace'))
except UnicodeEncodeError as uee:
self.__Base.logs.error(f'Encode Error try iso-8859-1 - {uee} - {message}')
self.__Logs.error(f'Encode Error try iso-8859-1 - {uee} - {message}')
self.__Irc.IrcSocket.send(f"{message}\r\n".encode(self.__Config.SERVEUR_CHARSET[1],'replace'))
except AssertionError as ae:
self.__Base.logs.warning(f'Assertion Error {ae} - message: {message}')
self.__Logs.warning(f'Assertion Error {ae} - message: {message}')
except SSLEOFError as soe:
self.__Base.logs.error(f"SSLEOFError: {soe} - {message}")
self.__Logs.error(f"SSLEOFError: {soe} - {message}")
except SSLError as se:
self.__Base.logs.error(f"SSLError: {se} - {message}")
self.__Logs.error(f"SSLError: {se} - {message}")
except OSError as oe:
self.__Base.logs.error(f"OSError: {oe} - {message}")
self.__Logs.error(f"OSError: {oe} - {message}")
except AttributeError as ae:
self.__Base.logs.critical(f"Attribute Error: {ae}")
self.__Logs.critical(f"Attribute Error: {ae}")
def send_priv_msg(self, nick_from: str, msg: str, channel: str = None, nick_to: str = None):
"""Sending PRIVMSG to a channel or to a nickname by batches
@@ -62,7 +63,7 @@ class Inspircd:
User_to = self.__Irc.User.get_User(nick_to) if nick_to is None else None
if User_from is None:
self.__Base.logs.error(f"The sender nickname [{nick_from}] do not exist")
self.__Logs.error(f"The sender nickname [{nick_from}] do not exist")
return None
if not channel is None:
@@ -75,7 +76,7 @@ class Inspircd:
batch = str(msg)[i:i+batch_size]
self.send2socket(f":{nick_from} PRIVMSG {User_to.uid} :{batch}")
except Exception as err:
self.__Base.logs.error(f"General Error: {err}")
self.__Logs.error(f"General Error: {err}")
def send_notice(self, nick_from: str, nick_to: str, msg: str) -> None:
"""Sending NOTICE by batches
@@ -91,7 +92,7 @@ class Inspircd:
User_to = self.__Irc.User.get_User(nick_to)
if User_from is None or User_to is None:
self.__Base.logs.error(f"The sender [{nick_from}] or the Reciever [{nick_to}] do not exist")
self.__Logs.error(f"The sender [{nick_from}] or the Reciever [{nick_to}] do not exist")
return None
for i in range(0, len(str(msg)), batch_size):
@@ -99,9 +100,9 @@ class Inspircd:
self.send2socket(f":{User_from.uid} NOTICE {User_to.uid} :{batch}")
except Exception as err:
self.__Base.logs.error(f"General Error: {err}")
self.__Logs.error(f"General Error: {err}")
def link(self):
def send_link(self):
"""Créer le link et envoyer les informations nécessaires pour la
connexion au serveur.
"""
@@ -123,7 +124,7 @@ class Inspircd:
service_id = self.__Config.SERVICE_ID
version = self.__Config.CURRENT_VERSION
unixtime = self.__Base.get_unixtime()
unixtime = self.__Utils.get_unixtime()
self.send2socket(f"CAPAB START 1206")
@@ -133,7 +134,7 @@ class Inspircd:
self.send2socket(f"BURST {unixtime}")
self.send2socket(f":{server_id} ENDBURST")
self.__Base.logs.debug(f'>> {__name__} Link information sent to the server')
self.__Logs.debug(f'>> {__name__} Link information sent to the server')
def gline(self, nickname: str, hostname: str, set_by: str, expire_timestamp: int, set_at_timestamp: int, reason: str) -> None:
# TKL + G user host set_by expire_timestamp set_at_timestamp :reason
@@ -142,12 +143,12 @@ class Inspircd:
return None
def set_nick(self, newnickname: str) -> None:
def send_set_nick(self, newnickname: str) -> None:
self.send2socket(f":{self.__Config.SERVICE_NICKNAME} NICK {newnickname}")
return None
def squit(self, server_id: str, server_link: str, reason: str) -> None:
def send_squit(self, server_id: str, server_link: str, reason: str) -> None:
if not reason:
reason = 'Service Shutdown'
@@ -155,26 +156,26 @@ class Inspircd:
self.send2socket(f":{server_id} SQUIT {server_link} :{reason}")
return None
def ungline(self, nickname:str, hostname: str) -> None:
def send_ungline(self, nickname:str, hostname: str) -> None:
self.send2socket(f":{self.__Config.SERVEUR_ID} TKL - G {nickname} {hostname} {self.__Config.SERVICE_NICKNAME}")
return None
def kline(self, nickname: str, hostname: str, set_by: str, expire_timestamp: int, set_at_timestamp: int, reason: str) -> None:
def send_kline(self, nickname: str, hostname: str, set_by: str, expire_timestamp: int, set_at_timestamp: int, reason: str) -> None:
# TKL + k user host set_by expire_timestamp set_at_timestamp :reason
self.send2socket(f":{self.__Config.SERVEUR_ID} TKL + k {nickname} {hostname} {set_by} {expire_timestamp} {set_at_timestamp} :{reason}")
return None
def sjoin(self, channel: str) -> None:
def send_sjoin(self, channel: str) -> None:
if not self.__Irc.Channel.is_valid_channel(channel):
self.__Base.logs.error(f"The channel [{channel}] is not valid")
self.__Logs.error(f"The channel [{channel}] is not valid")
return None
self.send2socket(f":{self.__Config.SERVEUR_ID} SJOIN {self.__Base.get_unixtime()} {channel} + :{self.__Config.SERVICE_ID}")
self.send2socket(f":{self.__Config.SERVEUR_ID} SJOIN {self.__Utils.get_unixtime()} {channel} + :{self.__Config.SERVICE_ID}")
# Add defender to the channel uids list
self.__Irc.Channel.insert(self.__Irc.Loader.Definition.MChannel(name=channel, uids=[self.__Config.SERVICE_ID]))
@@ -202,7 +203,7 @@ class Inspircd:
self.__Irc.Reputation.delete(reputationObj.uid)
if not self.__Irc.Channel.delete_user_from_all_channel(uid):
self.__Base.logs.error(f"The UID [{uid}] has not been deleted from all channels")
self.__Logs.error(f"The UID [{uid}] has not been deleted from all channels")
return None
@@ -221,9 +222,9 @@ class Inspircd:
print_log (bool, optional): print logs if true. Defaults to True.
"""
# {self.Config.SERVEUR_ID} UID
# {clone.nickname} 1 {self.Base.get_unixtime()} {clone.username} {clone.hostname} {clone.uid} * {clone.umodes} {clone.vhost} * {self.Base.encode_ip(clone.remote_ip)} :{clone.realname}
# {clone.nickname} 1 {self.__Utils.get_unixtime()} {clone.username} {clone.hostname} {clone.uid} * {clone.umodes} {clone.vhost} * {self.Base.encode_ip(clone.remote_ip)} :{clone.realname}
try:
unixtime = self.__Base.get_unixtime()
unixtime = self.__Utils.get_unixtime()
encoded_ip = self.__Base.encode_ip(remote_ip)
# Create the user
@@ -242,7 +243,7 @@ class Inspircd:
return None
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")
def send_join_chan(self, uidornickname: str, channel: str, password: str = None, print_log: bool = True) -> None:
"""Joining a channel
@@ -261,7 +262,7 @@ class Inspircd:
return None
if not self.__Irc.Channel.is_valid_channel(channel):
self.__Base.logs.error(f"The channel [{channel}] is not valid")
self.__Logs.error(f"The channel [{channel}] is not valid")
return None
self.send2socket(f":{userObj.uid} JOIN {channel} {passwordChannel}", print_log=print_log)
@@ -282,11 +283,11 @@ class Inspircd:
userObj = self.__Irc.User.get_User(uidornickname)
if userObj is None:
self.__Base.logs.error(f"The user [{uidornickname}] is not valid")
self.__Logs.error(f"The user [{uidornickname}] is not valid")
return None
if not self.__Irc.Channel.is_valid_channel(channel):
self.__Base.logs.error(f"The channel [{channel}] is not valid")
self.__Logs.error(f"The channel [{channel}] is not valid")
return None
self.send2socket(f":{userObj.uid} PART {channel}", print_log=print_log)
@@ -295,7 +296,7 @@ class Inspircd:
self.__Irc.Channel.delete_user_from_channel(channel, userObj.uid)
return None
def unkline(self, nickname:str, hostname: str) -> None:
def send_unkline(self, nickname:str, hostname: str) -> None:
self.send2socket(f":{self.__Config.SERVEUR_ID} TKL - K {nickname} {hostname} {self.__Config.SERVICE_NICKNAME}")
@@ -322,14 +323,14 @@ class Inspircd:
# TODO : User object should be able to update user modes
if self.__Irc.User.update_mode(userObj.uid, userMode):
return None
# self.__Base.logs.debug(f"Updating user mode for [{userObj.nickname}] [{old_umodes}] => [{userObj.umodes}]")
# self.__Logs.debug(f"Updating user mode for [{userObj.nickname}] [{old_umodes}] => [{userObj.umodes}]")
return None
except IndexError as ie:
self.__Base.logs.error(f"{__name__} - Index Error: {ie}")
self.__Logs.error(f"{__name__} - Index Error: {ie}")
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")
def on_quit(self, serverMsg: list[str]) -> None:
"""Handle quit coming from a server
@@ -350,9 +351,9 @@ class Inspircd:
return None
except IndexError as ie:
self.__Base.logs.error(f"{__name__} - Index Error: {ie}")
self.__Logs.error(f"{__name__} - Index Error: {ie}")
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")
def on_squit(self, serverMsg: list[str]) -> None:
"""Handle squit coming from a server
@@ -409,9 +410,9 @@ class Inspircd:
return None
except IndexError as ie:
self.__Base.logs.error(f"{__name__} - Index Error: {ie}")
self.__Logs.error(f"{__name__} - Index Error: {ie}")
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")
def on_sjoin(self, serverMsg: list[str]) -> None:
"""Handle sjoin coming from a server
@@ -444,7 +445,7 @@ class Inspircd:
# Boucle qui va ajouter l'ensemble des users (UID)
for i in range(start_boucle, len(serverMsg)):
parsed_UID = str(serverMsg[i])
clean_uid = self.__Irc.User.clean_uid(parsed_UID)
clean_uid = self.__Utils.clean_uid(parsed_UID)
if not clean_uid is None and len(clean_uid) == 9:
list_users.append(parsed_UID)
@@ -458,9 +459,9 @@ class Inspircd:
return None
except IndexError as ie:
self.__Base.logs.error(f"{__name__} - Index Error: {ie}")
self.__Logs.error(f"{__name__} - Index Error: {ie}")
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")
def on_part(self, serverMsg: list[str]) -> None:
"""Handle part coming from a server
@@ -479,9 +480,9 @@ class Inspircd:
return None
except IndexError as ie:
self.__Base.logs.error(f"{__name__} - Index Error: {ie}")
self.__Logs.error(f"{__name__} - Index Error: {ie}")
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")
def on_uid(self, serverMsg: list[str]) -> None:
"""Handle uid message coming from the server
@@ -542,9 +543,9 @@ class Inspircd:
)
return None
except IndexError as ie:
self.__Base.logs.error(f"{__name__} - Index Error: {ie}")
self.__Logs.error(f"{__name__} - Index Error: {ie}")
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")
def on_server_ping(self, serverMsg: list[str]) -> None:
"""Send a PONG message to the server
@@ -562,7 +563,7 @@ class Inspircd:
return None
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")
def on_version(self, serverMsg: list[str]) -> None:
"""Sending Server Version to the server
@@ -574,7 +575,7 @@ class Inspircd:
# Réponse a un CTCP VERSION
try:
nickname = self.__Irc.User.get_nickname(self.__Base.clean_uid(serverMsg[1]))
nickname = self.__Irc.User.get_nickname(self.__Utils.clean_uid(serverMsg[1]))
dnickname = self.__Config.SERVICE_NICKNAME
arg = serverMsg[4].replace(':', '')
@@ -586,7 +587,7 @@ class Inspircd:
return None
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")
def on_time(self, serverMsg: list[str]) -> None:
"""Sending TIME answer to a requestor
@@ -598,7 +599,7 @@ class Inspircd:
# Réponse a un CTCP VERSION
try:
nickname = self.__Irc.User.get_nickname(self.__Base.clean_uid(serverMsg[1]))
nickname = self.__Irc.User.get_nickname(self.__Utils.clean_uid(serverMsg[1]))
dnickname = self.__Config.SERVICE_NICKNAME
arg = serverMsg[4].replace(':', '')
current_datetime = self.__Utils.get_sdatetime()
@@ -611,7 +612,7 @@ class Inspircd:
return None
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")
def on_ping(self, serverMsg: list[str]) -> None:
"""Sending a PING answer to requestor
@@ -623,7 +624,7 @@ class Inspircd:
# Réponse a un CTCP VERSION
try:
nickname = self.__Irc.User.get_nickname(self.__Base.clean_uid(serverMsg[1]))
nickname = self.__Irc.User.get_nickname(self.__Utils.clean_uid(serverMsg[1]))
dnickname = self.__Config.SERVICE_NICKNAME
arg = serverMsg[4].replace(':', '')
@@ -632,7 +633,7 @@ class Inspircd:
if arg == '\x01PING':
recieved_unixtime = int(serverMsg[5].replace('\x01',''))
current_unixtime = self.__Base.get_unixtime()
current_unixtime = self.__Utils.get_unixtime()
ping_response = current_unixtime - recieved_unixtime
# self.__Irc.send2socket(f':{dnickname} NOTICE {nickname} :\x01PING {ping_response} secs\x01')
@@ -644,7 +645,7 @@ class Inspircd:
return None
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")
def on_version_msg(self, serverMsg: list[str]) -> None:
"""Handle version coming from the server
@@ -654,7 +655,7 @@ class Inspircd:
"""
try:
# ['@label=0073', ':0014E7P06', 'VERSION', 'PyDefender']
getUser = self.__Irc.User.get_User(self.__Irc.User.clean_uid(serverMsg[1]))
getUser = self.__Irc.User.get_User(self.__Utils.clean_uid(serverMsg[1]))
if getUser is None:
return None
@@ -669,4 +670,4 @@ class Inspircd:
return None
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")

View File

@@ -1,6 +1,6 @@
from re import match, findall, search
from datetime import datetime
from typing import TYPE_CHECKING, Optional, Union
from typing import TYPE_CHECKING, Optional
from ssl import SSLEOFError, SSLError
if TYPE_CHECKING:
@@ -17,20 +17,31 @@ class Unrealircd6:
self.__Base = ircInstance.Base
self.__Settings = ircInstance.Base.Settings
self.__Utils = ircInstance.Loader.Utils
self.__Logs = ircInstance.Loader.Logs
self.known_protocol: set[str] = {'SJOIN', 'UID', 'MD', 'QUIT', 'SQUIT',
'EOS', 'PRIVMSG', 'MODE', 'UMODE2',
'VERSION', 'REPUTATION', 'SVS2MODE',
'SLOG', 'NICK', 'PART', 'PONG'}
'SLOG', 'NICK', 'PART', 'PONG',
'PROTOCTL', 'SERVER', 'SMOD', 'TKL', 'NETINFO'}
self.__Base.logs.info(f"** Loading protocol [{__name__}]")
self.__Logs.info(f"** Loading protocol [{__name__}]")
def get_ircd_protocol_poisition(self, cmd: list[str]) -> tuple[int, Optional[str]]:
"""Get the position of known commands
Args:
cmd (list[str]): The server response
Returns:
tuple[int, Optional[str]]: The position and the command.
"""
for index, token in enumerate(cmd):
if token.upper() in self.known_protocol:
return index, token.upper()
self.__Logs.debug(f"[IRCD LOGS] You need to handle this response: {cmd}")
return (-1, None)
def send2socket(self, message: str, print_log: bool = True) -> None:
@@ -43,24 +54,24 @@ class Unrealircd6:
with self.__Base.lock:
self.__Irc.IrcSocket.send(f"{message}\r\n".encode(self.__Config.SERVEUR_CHARSET[0]))
if print_log:
self.__Base.logs.debug(f'<< {message}')
self.__Logs.debug(f'<< {message}')
except UnicodeDecodeError as ude:
self.__Base.logs.error(f'Decode Error try iso-8859-1 - {ude} - {message}')
self.__Logs.error(f'Decode Error try iso-8859-1 - {ude} - {message}')
self.__Irc.IrcSocket.send(f"{message}\r\n".encode(self.__Config.SERVEUR_CHARSET[1],'replace'))
except UnicodeEncodeError as uee:
self.__Base.logs.error(f'Encode Error try iso-8859-1 - {uee} - {message}')
self.__Logs.error(f'Encode Error try iso-8859-1 - {uee} - {message}')
self.__Irc.IrcSocket.send(f"{message}\r\n".encode(self.__Config.SERVEUR_CHARSET[1],'replace'))
except AssertionError as ae:
self.__Base.logs.warning(f'Assertion Error {ae} - message: {message}')
self.__Logs.warning(f'Assertion Error {ae} - message: {message}')
except SSLEOFError as soe:
self.__Base.logs.error(f"SSLEOFError: {soe} - {message}")
self.__Logs.error(f"SSLEOFError: {soe} - {message}")
except SSLError as se:
self.__Base.logs.error(f"SSLError: {se} - {message}")
self.__Logs.error(f"SSLError: {se} - {message}")
except OSError as oe:
self.__Base.logs.error(f"OSError: {oe} - {message}")
self.__Logs.error(f"OSError: {oe} - {message}")
except AttributeError as ae:
self.__Base.logs.critical(f"Attribute Error: {ae}")
self.__Logs.critical(f"Attribute Error: {ae}")
def send_priv_msg(self, nick_from: str, msg: str, channel: str = None, nick_to: str = None):
"""Sending PRIVMSG to a channel or to a nickname by batches
@@ -77,7 +88,7 @@ class Unrealircd6:
User_to = self.__Irc.User.get_User(nick_to) if not nick_to is None else None
if User_from is None:
self.__Base.logs.error(f"The sender nickname [{nick_from}] do not exist")
self.__Logs.error(f"The sender nickname [{nick_from}] do not exist")
return None
if not channel is None:
@@ -91,8 +102,8 @@ class Unrealircd6:
self.send2socket(f":{nick_from} PRIVMSG {User_to.uid} :{batch}")
except Exception as err:
self.__Base.logs.error(f"General Error: {err}")
self.__Base.logs.error(f"General Error: {nick_from} - {channel} - {nick_to}")
self.__Logs.error(f"General Error: {err}")
self.__Logs.error(f"General Error: {nick_from} - {channel} - {nick_to}")
def send_notice(self, nick_from: str, nick_to: str, msg: str) -> None:
"""Sending NOTICE by batches
@@ -108,7 +119,7 @@ class Unrealircd6:
User_to = self.__Irc.User.get_User(nick_to)
if User_from is None or User_to is None:
self.__Base.logs.error(f"The sender [{nick_from}] or the Reciever [{nick_to}] do not exist")
self.__Logs.error(f"The sender [{nick_from}] or the Reciever [{nick_to}] do not exist")
return None
for i in range(0, len(str(msg)), batch_size):
@@ -116,9 +127,9 @@ class Unrealircd6:
self.send2socket(f":{User_from.uid} NOTICE {User_to.uid} :{batch}")
except Exception as err:
self.__Base.logs.error(f"General Error: {err}")
self.__Logs.error(f"General Error: {err}")
def parse_server_msg(self, server_msg: list[str]) -> Union[str, None]:
def parse_server_msg(self, server_msg: list[str]) -> Optional[str]:
"""Parse the server message and return the command
Args:
@@ -152,7 +163,7 @@ class Unrealircd6:
return None
def link(self):
def send_link(self):
"""Créer le link et envoyer les informations nécessaires pour la
connexion au serveur.
"""
@@ -175,7 +186,7 @@ class Unrealircd6:
service_id = self.__Config.SERVICE_ID
version = self.__Config.CURRENT_VERSION
unixtime = self.__Base.get_unixtime()
unixtime = self.__Utils.get_unixtime()
self.send2socket(f":{server_id} PASS :{password}", print_log=False)
self.send2socket(f":{server_id} PROTOCTL SID NOQUIT NICKv2 SJOIN SJ3 NICKIP TKLEXT2 NEXTBANS CLK EXTSWHOIS MLOCK MTAGS")
@@ -185,20 +196,20 @@ class Unrealircd6:
self.send2socket(f":{server_id} SERVER {link} 1 :{info}")
self.send2socket(f":{server_id} {nickname} :Reserved for services")
self.send2socket(f":{server_id} UID {nickname} 1 {unixtime} {username} {host} {service_id} * {smodes} * * fwAAAQ== :{realname}")
self.sjoin(chan)
self.send_sjoin(chan)
self.send2socket(f":{server_id} TKL + Q * {nickname} {host} 0 {unixtime} :Reserved for services")
self.send2socket(f":{service_id} MODE {chan} {cmodes}")
self.__Base.logs.debug(f'>> {__name__} Link information sent to the server')
self.__Logs.debug(f'>> {__name__} Link information sent to the server')
def gline(self, nickname: str, hostname: str, set_by: str, expire_timestamp: int, set_at_timestamp: int, reason: str) -> None:
def send_gline(self, nickname: str, hostname: str, set_by: str, expire_timestamp: int, set_at_timestamp: int, reason: str) -> None:
# TKL + G user host set_by expire_timestamp set_at_timestamp :reason
self.send2socket(f":{self.__Config.SERVEUR_ID} TKL + G {nickname} {hostname} {set_by} {expire_timestamp} {set_at_timestamp} :{reason}")
return None
def set_nick(self, newnickname: str) -> None:
def send_set_nick(self, newnickname: str) -> None:
"""Change nickname of the server
\n This method will also update the User object
Args:
@@ -210,7 +221,7 @@ class Unrealircd6:
self.__Irc.User.update_nickname(userObj.uid, newnickname)
return None
def squit(self, server_id: str, server_link: str, reason: str) -> None:
def send_squit(self, server_id: str, server_link: str, reason: str) -> None:
if not reason:
reason = 'Service Shutdown'
@@ -218,36 +229,36 @@ class Unrealircd6:
self.send2socket(f":{server_id} SQUIT {server_link} :{reason}")
return None
def ungline(self, nickname:str, hostname: str) -> None:
def send_ungline(self, nickname:str, hostname: str) -> None:
self.send2socket(f":{self.__Config.SERVEUR_ID} TKL - G {nickname} {hostname} {self.__Config.SERVICE_NICKNAME}")
return None
def kline(self, nickname: str, hostname: str, set_by: str, expire_timestamp: int, set_at_timestamp: int, reason: str) -> None:
def send_kline(self, nickname: str, hostname: str, set_by: str, expire_timestamp: int, set_at_timestamp: int, reason: str) -> None:
# TKL + k user host set_by expire_timestamp set_at_timestamp :reason
self.send2socket(f":{self.__Config.SERVEUR_ID} TKL + k {nickname} {hostname} {set_by} {expire_timestamp} {set_at_timestamp} :{reason}")
return None
def unkline(self, nickname:str, hostname: str) -> None:
def send_unkline(self, nickname:str, hostname: str) -> None:
self.send2socket(f":{self.__Config.SERVEUR_ID} TKL - K {nickname} {hostname} {self.__Config.SERVICE_NICKNAME}")
return None
def sjoin(self, channel: str) -> None:
def send_sjoin(self, channel: str) -> None:
"""Server will join a channel with pre defined umodes
Args:
channel (str): Channel to join
"""
if not self.__Irc.Channel.is_valid_channel(channel):
self.__Base.logs.error(f"The channel [{channel}] is not valid")
self.__Logs.error(f"The channel [{channel}] is not valid")
return None
self.send2socket(f":{self.__Config.SERVEUR_ID} SJOIN {self.__Base.get_unixtime()} {channel} {self.__Config.SERVICE_UMODES} :{self.__Config.SERVICE_ID}")
self.send2socket(f":{self.__Config.SERVEUR_ID} SJOIN {self.__Utils.get_unixtime()} {channel} {self.__Config.SERVICE_UMODES} :{self.__Config.SERVICE_ID}")
self.send2socket(f":{self.__Config.SERVICE_ID} MODE {channel} {self.__Config.SERVICE_UMODES} {self.__Config.SERVICE_ID}")
# Add defender to the channel uids list
@@ -277,7 +288,7 @@ class Unrealircd6:
return None
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")
def send_sajoin(self, nick_to_sajoin: str, channel_name: str) -> None:
"""_summary_
@@ -314,7 +325,7 @@ class Unrealircd6:
return None
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")
def send_svs_mode(self, nickname: str, user_mode: str) -> None:
try:
@@ -333,7 +344,7 @@ class Unrealircd6:
return None
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")
def send_quit(self, uid: str, reason: str, print_log: True) -> None:
"""Send quit message
@@ -355,7 +366,7 @@ class Unrealircd6:
self.__Irc.Reputation.delete(reputationObj.uid)
if not self.__Irc.Channel.delete_user_from_all_channel(uid):
self.__Base.logs.error(f"The UID [{uid}] has not been deleted from all channels")
self.__Logs.error(f"The UID [{uid}] has not been deleted from all channels")
return None
@@ -374,9 +385,9 @@ class Unrealircd6:
print_log (bool, optional): print logs if true. Defaults to True.
"""
# {self.Config.SERVEUR_ID} UID
# {clone.nickname} 1 {self.Base.get_unixtime()} {clone.username} {clone.hostname} {clone.uid} * {clone.umodes} {clone.vhost} * {self.Base.encode_ip(clone.remote_ip)} :{clone.realname}
# {clone.nickname} 1 {self.__Utils.get_unixtime()} {clone.username} {clone.hostname} {clone.uid} * {clone.umodes} {clone.vhost} * {self.Base.encode_ip(clone.remote_ip)} :{clone.realname}
try:
unixtime = self.__Base.get_unixtime()
unixtime = self.__Utils.get_unixtime()
encoded_ip = self.__Base.encode_ip(remote_ip)
# Create the user
@@ -395,7 +406,7 @@ class Unrealircd6:
return None
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")
def send_join_chan(self, uidornickname: str, channel: str, password: str = None, print_log: bool = True) -> None:
"""Joining a channel
@@ -414,7 +425,7 @@ class Unrealircd6:
return None
if not self.__Irc.Channel.is_valid_channel(channel):
self.__Base.logs.error(f"The channel [{channel}] is not valid")
self.__Logs.error(f"The channel [{channel}] is not valid")
return None
self.send2socket(f":{userObj.uid} JOIN {channel} {passwordChannel}", print_log=print_log)
@@ -450,11 +461,11 @@ class Unrealircd6:
userObj = self.__Irc.User.get_User(uidornickname)
if userObj is None:
self.__Base.logs.error(f"The user [{uidornickname}] is not valid")
self.__Logs.error(f"The user [{uidornickname}] is not valid")
return None
if not self.__Irc.Channel.is_valid_channel(channel):
self.__Base.logs.error(f"The channel [{channel}] is not valid")
self.__Logs.error(f"The channel [{channel}] is not valid")
return None
self.send2socket(f":{userObj.uid} PART {channel}", print_log=print_log)
@@ -467,7 +478,7 @@ class Unrealircd6:
channel = self.__Irc.Channel.is_valid_channel(channel_name)
if not channel:
self.__Base.logs.error(f'The channel [{channel_name}] is not correct')
self.__Logs.error(f'The channel [{channel_name}] is not correct')
return None
self.send2socket(f":{self.__Config.SERVICE_NICKNAME} MODE {channel_name} {channel_mode}")
@@ -505,9 +516,9 @@ class Unrealircd6:
return None
except IndexError as ie:
self.__Base.logs.error(f"{__name__} - Index Error: {ie}")
self.__Logs.error(f"{__name__} - Index Error: {ie}")
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")
def on_mode(self, serverMsg: list[str]) -> None:
"""Handle mode coming from a server
@@ -541,14 +552,14 @@ class Unrealircd6:
# TODO : User object should be able to update user modes
if self.__Irc.User.update_mode(userObj.uid, userMode):
return None
# self.__Base.logs.debug(f"Updating user mode for [{userObj.nickname}] [{old_umodes}] => [{userObj.umodes}]")
# self.__Logs.debug(f"Updating user mode for [{userObj.nickname}] [{old_umodes}] => [{userObj.umodes}]")
return None
except IndexError as ie:
self.__Base.logs.error(f"{__name__} - Index Error: {ie}")
self.__Logs.error(f"{__name__} - Index Error: {ie}")
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")
def on_quit(self, serverMsg: list[str]) -> None:
"""Handle quit coming from a server
@@ -569,9 +580,9 @@ class Unrealircd6:
return None
except IndexError as ie:
self.__Base.logs.error(f"{__name__} - Index Error: {ie}")
self.__Logs.error(f"{__name__} - Index Error: {ie}")
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")
def on_squit(self, serverMsg: list[str]) -> None:
"""Handle squit coming from a server
@@ -651,9 +662,9 @@ class Unrealircd6:
return None
except IndexError as ie:
self.__Base.logs.error(f"{__name__} - Index Error: {ie}")
self.__Logs.error(f"{__name__} - Index Error: {ie}")
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")
def on_sjoin(self, serverMsg: list[str]) -> None:
"""Handle sjoin coming from a server
@@ -690,7 +701,7 @@ class Unrealircd6:
# Boucle qui va ajouter l'ensemble des users (UID)
for i in range(start_boucle, len(serverMsg_copy)):
parsed_UID = str(serverMsg_copy[i])
clean_uid = self.__Irc.User.clean_uid(parsed_UID)
clean_uid = self.__Utils.clean_uid(parsed_UID)
if not clean_uid is None and len(clean_uid) == 9:
list_users.append(clean_uid)
@@ -704,9 +715,9 @@ class Unrealircd6:
return None
except IndexError as ie:
self.__Base.logs.error(f"{__name__} - Index Error: {ie}")
self.__Logs.error(f"{__name__} - Index Error: {ie}")
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")
def on_part(self, serverMsg: list[str]) -> None:
"""Handle part coming from a server
@@ -725,9 +736,9 @@ class Unrealircd6:
return None
except IndexError as ie:
self.__Base.logs.error(f"{__name__} - Index Error: {ie}")
self.__Logs.error(f"{__name__} - Index Error: {ie}")
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")
def on_eos(self, serverMsg: list[str]) -> None:
"""Handle EOS coming from a server
@@ -759,16 +770,16 @@ class Unrealircd6:
print(f"# VERSION : {version} ")
print(f"################################################")
self.__Base.logs.info(f"################### DEFENDER ###################")
self.__Base.logs.info(f"# SERVICE CONNECTE ")
self.__Base.logs.info(f"# SERVEUR : {self.__Config.SERVEUR_IP} ")
self.__Base.logs.info(f"# PORT : {self.__Config.SERVEUR_PORT} ")
self.__Base.logs.info(f"# SSL : {self.__Config.SERVEUR_SSL} ")
self.__Base.logs.info(f"# SSL VER : {self.__Config.SSL_VERSION} ")
self.__Base.logs.info(f"# NICKNAME : {self.__Config.SERVICE_NICKNAME} ")
self.__Base.logs.info(f"# CHANNEL : {self.__Config.SERVICE_CHANLOG} ")
self.__Base.logs.info(f"# VERSION : {version} ")
self.__Base.logs.info(f"################################################")
self.__Logs.info(f"################### DEFENDER ###################")
self.__Logs.info(f"# SERVICE CONNECTE ")
self.__Logs.info(f"# SERVEUR : {self.__Config.SERVEUR_IP} ")
self.__Logs.info(f"# PORT : {self.__Config.SERVEUR_PORT} ")
self.__Logs.info(f"# SSL : {self.__Config.SERVEUR_SSL} ")
self.__Logs.info(f"# SSL VER : {self.__Config.SSL_VERSION} ")
self.__Logs.info(f"# NICKNAME : {self.__Config.SERVICE_NICKNAME} ")
self.__Logs.info(f"# CHANNEL : {self.__Config.SERVICE_CHANLOG} ")
self.__Logs.info(f"# VERSION : {version} ")
self.__Logs.info(f"################################################")
if self.__Base.check_for_new_version(False):
self.send_priv_msg(
@@ -791,11 +802,11 @@ class Unrealircd6:
return None
except IndexError as ie:
self.__Base.logs.error(f"{__name__} - Key Error: {ie}")
self.__Logs.error(f"{__name__} - Key Error: {ie}")
except KeyError as ke:
self.__Base.logs.error(f"{__name__} - Key Error: {ke}")
self.__Logs.error(f"{__name__} - Key Error: {ke}")
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")
def on_reputation(self, serverMsg: list[str]) -> None:
"""Handle REPUTATION coming from a server
@@ -826,7 +837,7 @@ class Unrealircd6:
self.__Irc.first_score = 0
self.Logs.error(f'Value Error {__name__}: {ve}')
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")
def on_uid(self, serverMsg: list[str]) -> None:
"""Handle uid message coming from the server
@@ -887,9 +898,9 @@ class Unrealircd6:
)
return None
except IndexError as ie:
self.__Base.logs.error(f"{__name__} - Index Error: {ie}")
self.__Logs.error(f"{__name__} - Index Error: {ie}")
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")
def on_privmsg(self, serverMsg: list[str]) -> None:
"""Handle PRIVMSG message coming from the server
@@ -910,11 +921,11 @@ class Unrealircd6:
if cmd[2] == 'PRIVMSG' and cmd[4] == ':auth':
data_copy = cmd.copy()
data_copy[6] = '**********'
self.__Base.logs.debug(f">> {data_copy}")
self.__Logs.debug(f">> {data_copy}")
else:
self.__Base.logs.debug(f">> {cmd}")
self.__Logs.debug(f">> {cmd}")
else:
self.__Base.logs.debug(f">> {cmd}")
self.__Logs.debug(f">> {cmd}")
get_uid_or_nickname = str(cmd[0].replace(':',''))
user_trigger = self.__Irc.User.get_nickname(get_uid_or_nickname)
@@ -929,7 +940,7 @@ class Unrealircd6:
arg = convert_to_string.split()
arg.remove(f':{self.__Config.SERVICE_PREFIX}')
if not arg[0].lower() in self.__Irc.module_commands_list:
self.__Base.logs.debug(f"This command {arg[0]} is not available")
self.__Logs.debug(f"This command {arg[0]} is not available")
self.send_notice(
nick_from=self.__Config.SERVICE_NICKNAME,
nick_to=user_trigger,
@@ -968,7 +979,7 @@ class Unrealircd6:
return False
if not arg[0].lower() in self.__Irc.module_commands_list:
self.__Base.logs.debug(f"This command {arg[0]} sent by {user_trigger} is not available")
self.__Logs.debug(f"This command {arg[0]} sent by {user_trigger} is not available")
return False
cmd_to_send = convert_to_string.replace(':','')
@@ -982,11 +993,11 @@ class Unrealircd6:
return None
except KeyError as ke:
self.__Base.logs.error(f"Key Error: {ke}")
self.__Logs.error(f"Key Error: {ke}")
except AttributeError as ae:
self.__Base.logs.error(f"Attribute Error: {ae}")
self.__Logs.error(f"Attribute Error: {ae}")
except Exception as err:
self.__Base.logs.error(f"General Error: {err} - {srv_msg}")
self.__Logs.error(f"General Error: {err} - {srv_msg}")
def on_server_ping(self, serverMsg: list[str]) -> None:
"""Send a PONG message to the server
@@ -1001,7 +1012,7 @@ class Unrealircd6:
return None
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")
def on_version(self, serverMsg: list[str]) -> None:
"""Sending Server Version to the server
@@ -1013,7 +1024,7 @@ class Unrealircd6:
# Réponse a un CTCP VERSION
try:
nickname = self.__Irc.User.get_nickname(self.__Base.clean_uid(serverMsg[1]))
nickname = self.__Irc.User.get_nickname(self.__Utils.clean_uid(serverMsg[1]))
dnickname = self.__Config.SERVICE_NICKNAME
arg = serverMsg[4].replace(':', '')
@@ -1025,7 +1036,7 @@ class Unrealircd6:
return None
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")
def on_time(self, serverMsg: list[str]) -> None:
"""Sending TIME answer to a requestor
@@ -1037,7 +1048,7 @@ class Unrealircd6:
# Réponse a un CTCP VERSION
try:
nickname = self.__Irc.User.get_nickname(self.__Base.clean_uid(serverMsg[1]))
nickname = self.__Irc.User.get_nickname(self.__Utils.clean_uid(serverMsg[1]))
dnickname = self.__Config.SERVICE_NICKNAME
arg = serverMsg[4].replace(':', '')
current_datetime = self.__Utils.get_sdatetime()
@@ -1050,7 +1061,7 @@ class Unrealircd6:
return None
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")
def on_ping(self, serverMsg: list[str]) -> None:
"""Sending a PING answer to requestor
@@ -1062,7 +1073,7 @@ class Unrealircd6:
# Réponse a un CTCP VERSION
try:
nickname = self.__Irc.User.get_nickname(self.__Base.clean_uid(serverMsg[1]))
nickname = self.__Irc.User.get_nickname(self.__Utils.clean_uid(serverMsg[1]))
dnickname = self.__Config.SERVICE_NICKNAME
arg = serverMsg[4].replace(':', '')
@@ -1071,7 +1082,7 @@ class Unrealircd6:
if arg == '\x01PING':
recieved_unixtime = int(serverMsg[5].replace('\x01',''))
current_unixtime = self.__Base.get_unixtime()
current_unixtime = self.__Utils.get_unixtime()
ping_response = current_unixtime - recieved_unixtime
# self.__Irc.send2socket(f':{dnickname} NOTICE {nickname} :\x01PING {ping_response} secs\x01')
@@ -1083,7 +1094,7 @@ class Unrealircd6:
return None
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")
def on_version_msg(self, serverMsg: list[str]) -> None:
"""Handle version coming from the server
@@ -1097,7 +1108,7 @@ class Unrealircd6:
if '@' in list(serverMsg_copy[0])[0]:
serverMsg_copy.pop(0)
getUser = self.__Irc.User.get_User(self.__Irc.User.clean_uid(serverMsg_copy[0]))
getUser = self.__Irc.User.get_User(self.__Utils.clean_uid(serverMsg_copy[0]))
if getUser is None:
return None
@@ -1115,4 +1126,4 @@ class Unrealircd6:
return None
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}")

View File

@@ -1,14 +1,16 @@
from typing import Optional
from typing import TYPE_CHECKING, Optional
from core.definition import MReputation
from core.base import Base
if TYPE_CHECKING:
from core.loader import Loader
class Reputation:
UID_REPUTATION_DB: list[MReputation] = []
def __init__(self, base: Base):
def __init__(self, loader: 'Loader'):
self.Logs = base.logs
self.Logs = loader.Logs
self.MReputation: MReputation = MReputation
def insert(self, new_reputation_user: MReputation) -> bool:

View File

@@ -3,17 +3,17 @@ from typing import Any, Optional, TYPE_CHECKING
from datetime import datetime
if TYPE_CHECKING:
from core.base import Base
from core.loader import Loader
from core.definition import MUser
class User:
UID_DB: list['MUser'] = []
def __init__(self, base: 'Base'):
def __init__(self, loader: 'Loader'):
self.Logs = base.logs
self.Base = base
self.Logs = loader.Logs
self.Base = loader.Base
def insert(self, new_user: 'MUser') -> bool:
"""Insert a new User object

View File

@@ -54,7 +54,7 @@ class Irc:
self.Base = self.Loader.Base
# Logger
self.Logs = self.Loader.Base.logs
self.Logs = self.Loader.Logs
# Get Settings.
self.Settings = self.Base.Settings
@@ -216,7 +216,7 @@ class Irc:
protocol=self.Config.SERVEUR_PROTOCOL,
ircInstance=self.ircObject
).Protocol
self.Protocol.link() # Etablir le link en fonction du protocol choisi
self.Protocol.send_link() # Etablir le link en fonction du protocol choisi
self.signal = True # Une variable pour initier la boucle infinie
self.__join_saved_channels() # Join existing channels
self.load_existing_modules() # Charger les modules existant dans la base de données
@@ -240,7 +240,7 @@ class Irc:
self.init_service_user()
self.__create_socket()
self.Protocol.link()
self.Protocol.send_link()
self.__join_saved_channels()
self.load_existing_modules()
self.Config.DEFENDER_RESTART = 0
@@ -302,7 +302,7 @@ class Irc:
if result_query:
for chan_name in result_query:
chan = chan_name[0]
self.Protocol.sjoin(channel=chan)
self.Protocol.send_sjoin(channel=chan)
def send_response(self, responses:list[bytes]) -> None:
try:
@@ -359,13 +359,10 @@ class Irc:
p = self.Protocol
admin_obj = self.Admin.get_admin(nickname)
dnickname = self.Config.SERVICE_NICKNAME
color_bold = self.Config.COLORS.bold
color_nogc = self.Config.COLORS.nogc
color_blue = self.Config.COLORS.blue
color_black = self.Config.COLORS.black
color_underline = self.Config.COLORS.underline
current_level = 0
count = 0
if admin_obj is not None:
current_level = admin_obj.level
@@ -377,39 +374,11 @@ class Irc:
for cmd in self.Commands.get_commands_by_level(current_level):
if module is None or cmd.module_name.lower() == module.lower():
p.send_notice(
nick_from=dnickname,
nick_to=nickname,
msg=f" {color_black}{cmd.command_level:<8}{color_nogc}| {cmd.command_name:<25}| {cmd.module_name:<15}| {cmd.description:<35}"
)
return
for level, modules in self.module_commands.items():
if level > current_level:
break
if count > 0:
p.send_notice(nick_from=dnickname, nick_to=nickname, msg=" ")
p.send_notice(
nick_from=dnickname,
nick_to=nickname,
msg=f"{color_blue}{color_bold}Level {level}:{color_nogc}"
)
for module_name, commands in modules.items():
if module is None or module.lower() == module_name.lower():
p.send_notice(
nick_from=dnickname,
nick_to=nickname,
msg=f"{color_black} {color_underline}Module: {module_name}{color_nogc}"
msg=f" {color_black}{cmd.command_level:<8}{color_nogc}| {cmd.command_name:<25}| {cmd.module_name:<15}| {cmd.description:<35}"
)
for command, description in commands.items():
p.send_notice(nick_from=dnickname, nick_to=nickname, msg=f" {command:<20}: {description}")
count += 1
p.send_notice(nick_from=dnickname,nick_to=nickname,msg=f" ***************** FIN DES COMMANDES *****************")
return None
def generate_help_menu_bakcup(self, nickname: str) -> None:
@@ -888,16 +857,13 @@ class Irc:
case 'PING':
self.Protocol.on_server_ping(serverMsg=original_response)
self.Logs.debug(f"** handle {parsed_protocol}")
return None
case 'SJOIN':
self.Protocol.on_sjoin(serverMsg=original_response)
self.Logs.debug(f"** handle {parsed_protocol}")
case 'EOS':
self.Protocol.on_eos(serverMsg=original_response)
self.Logs.debug(f"** handle {parsed_protocol}")
case 'UID':
try:
@@ -906,48 +872,37 @@ class Irc:
for classe_name, classe_object in self.loaded_classes.items():
classe_object.cmd(original_response)
self.Logs.debug(f"** handle {parsed_protocol}")
except Exception as err:
self.Logs.error(f'General Error: {err}')
case 'QUIT':
self.Protocol.on_quit(serverMsg=original_response)
self.Logs.debug(f"** handle {parsed_protocol}")
case 'PROTOCTL':
self.Protocol.on_protoctl(serverMsg=original_response)
self.Logs.debug(f"** handle {parsed_protocol}")
case 'SVS2MODE':
# >> [':00BAAAAAG', 'SVS2MODE', '001U01R03', '-r']
self.Protocol.on_svs2mode(serverMsg=original_response)
self.Logs.debug(f"** handle {parsed_protocol}")
case 'SQUIT':
self.Protocol.on_squit(serverMsg=original_response)
self.Logs.debug(f"** handle {parsed_protocol}")
case 'PART':
self.Protocol.on_part(serverMsg=parsed_protocol)
self.Logs.debug(f"** handle {parsed_protocol}")
case 'VERSION':
self.Protocol.on_version_msg(serverMsg=original_response)
self.Logs.debug(f"** handle {parsed_protocol}")
case 'UMODE2':
# [':adator_', 'UMODE2', '-i']
self.Protocol.on_umode2(serverMsg=original_response)
self.Logs.debug(f"** handle {parsed_protocol}")
case 'NICK':
self.Protocol.on_nick(serverMsg=original_response)
self.Logs.debug(f"** handle {parsed_protocol}")
case 'REPUTATION':
self.Protocol.on_reputation(serverMsg=original_response)
self.Logs.debug(f"** handle {parsed_protocol}")
case 'SLOG': # TODO
self.Logs.debug(f"** handle {parsed_protocol}")
@@ -957,7 +912,6 @@ class Irc:
case 'PRIVMSG':
self.Protocol.on_privmsg(serverMsg=original_response)
self.Logs.debug(f"** handle {parsed_protocol}")
case 'PONG': # TODO
self.Logs.debug(f"** handle {parsed_protocol}")
@@ -985,10 +939,9 @@ class Irc:
classe_object.cmd(original_response)
except IndexError as ie:
self.Logs.error(f"{ie} / {original_response} / length {str(len(original_response))}")
self.Logs.error(f"IndexError: {ie}")
except Exception as err:
self.Logs.error(f"General Error: {err}")
self.Logs.error(f"General Error: {traceback.format_exc()}")
self.Logs.error(f"General Error: {err}", exc_info=True)
def hcmds(self, user: str, channel: Union[str, None], cmd: list, fullcmd: list = []) -> None:
"""Create
@@ -1461,9 +1414,6 @@ class Irc:
module_name = str(cmd[1]) if len(cmd) == 2 else None
self.generate_help_menu(nickname=fromuser, module=module_name)
for com in self.Commands.get_ordered_commands():
print(com)
case 'load':
try:
# Load a module ex: .load mod_defender
@@ -1513,7 +1463,7 @@ class Irc:
nick_to=fromuser,
msg=f"Arrêt du service {dnickname}"
)
self.Protocol.squit(server_id=self.Config.SERVEUR_ID, server_link=self.Config.SERVEUR_LINK, reason=final_reason)
self.Protocol.send_squit(server_id=self.Config.SERVEUR_ID, server_link=self.Config.SERVEUR_LINK, reason=final_reason)
self.Logs.info(f'Arrêt du server {dnickname}')
self.Config.DEFENDER_RESTART = 0
self.signal = False
@@ -1540,7 +1490,7 @@ class Irc:
self.Channel.UID_CHANNEL_DB.clear() # Clear Channel Object
self.Base.delete_logger(self.Config.LOGGING_NAME)
self.Protocol.squit(server_id=self.Config.SERVEUR_ID, server_link=self.Config.SERVEUR_LINK, reason=final_reason)
self.Protocol.send_squit(server_id=self.Config.SERVEUR_ID, server_link=self.Config.SERVEUR_LINK, reason=final_reason)
self.Logs.info(f'Redémarrage du server {dnickname}')
self.loaded_classes.clear()
self.Config.DEFENDER_RESTART = 1 # Set restart status to 1 saying that the service will restart
@@ -1592,14 +1542,16 @@ class Irc:
restart_flag = True
if service_nickname != self.Config.SERVICE_NICKNAME:
self.Protocol.set_nick(self.Config.SERVICE_NICKNAME)
self.Protocol.send_set_nick(self.Config.SERVICE_NICKNAME)
if restart_flag:
self.Config.SERVEUR_ID = serveur_id
self.Protocol.send_priv_msg(nick_from=self.Config.SERVICE_NICKNAME, msg='You need to restart defender !', channel=self.Config.SERVICE_CHANLOG)
self.Base.delete_logger(self.Config.LOGGING_NAME)
self.Base = self.Loader.BaseModule.Base(self.Config, self.Settings)
self.Loader.ServiceLogging.remove_logger()
self.Loader.ServiceLogging = self.Loader.LoggingModule.ServiceLogging()
self.Loader.Logs = self.Loader.ServiceLogging.get_logger()
self.Base = self.Loader.BaseModule.Base(self.Loader)
importlib.reload(mod_unreal6)
importlib.reload(mod_protocol)

View File

@@ -1,4 +1,6 @@
from logging import Logger
from core.classes import user, admin, client, channel, reputation, settings, commands
import core.logs as logs
import core.definition as df
import core.utils as utils
import core.base as base_module
@@ -9,29 +11,35 @@ class Loader:
def __init__(self):
# Load Main Modules
self.Definition: df = df
self.Definition: df = df
self.ConfModule: conf_module = conf_module
self.ConfModule: conf_module = conf_module
self.BaseModule: base_module = base_module
self.BaseModule: base_module = base_module
self.Utils: utils = utils
self.Utils: utils = utils
self.LoggingModule: logs = logs
# Load Classes
self.ServiceLogging: logs.ServiceLogging = logs.ServiceLogging()
self.Logs: Logger = self.ServiceLogging.get_logger()
self.Settings: settings.Settings = settings.Settings()
self.Config: df.MConfig = self.ConfModule.Configuration().ConfigObject
self.Base: base_module.Base = self.BaseModule.Base(self)
self.Base: base_module.Base = self.BaseModule.Base(self)
self.User: user.User = user.User(self.Base)
self.User: user.User = user.User(self)
self.Client: client.Client = client.Client(self.Base)
self.Client: client.Client = client.Client(self)
self.Admin: admin.Admin = admin.Admin(self.Base)
self.Admin: admin.Admin = admin.Admin(self)
self.Channel: channel.Channel = channel.Channel(self.Base)
self.Channel: channel.Channel = channel.Channel(self)
self.Reputation: reputation.Reputation = reputation.Reputation(self.Base)
self.Reputation: reputation.Reputation = reputation.Reputation(self)
self.Commands: commands.Command = commands.Command(self.Base)
self.Commands: commands.Command = commands.Command(self)

108
core/logs.py Normal file
View File

@@ -0,0 +1,108 @@
import logging
from os import path, makedirs, sep
class ServiceLogging:
def __init__(self, loggin_name: str = "defender"):
"""Create the Logging object
"""
self.OS_SEP = sep
self.LOGGING_NAME = loggin_name
self.DEBUG_LEVEL, self.DEBUG_FILE_LEVEL, self.DEBUG_STDOUT_LEVEL = (10, 10, 10)
self.SERVER_PREFIX = None
self.LOGGING_CONSOLE = True
self.LOG_FILTERS: list[str] = ['PING', f":{self.SERVER_PREFIX}auth", "['PASS'"]
self.file_handler = None
self.stdout_handler = None
self.logs: logging.Logger = self.start_log_system()
def get_logger(self) -> logging.Logger:
logs_obj: logging.Logger = self.logs
return logs_obj
def remove_logger(self) -> None:
# Récupérer le logger
logger = logging.getLogger(self.LOGGING_NAME)
# Retirer tous les gestionnaires du logger et les fermer
for handler in logger.handlers[:]: # Utiliser une copie de la liste
# print(handler)
logger.removeHandler(handler)
handler.close()
# Supprimer le logger du dictionnaire global
logging.Logger.manager.loggerDict.pop(self.LOGGING_NAME, None)
return None
def start_log_system(self) -> logging.Logger:
os_sep = self.OS_SEP
logging_name = self.LOGGING_NAME
debug_level = self.DEBUG_LEVEL
debug_file_level = self.DEBUG_FILE_LEVEL
debug_stdout_level = self.DEBUG_STDOUT_LEVEL
# Create folder if not available
logs_directory = f'logs{os_sep}'
if not path.exists(f'{logs_directory}'):
makedirs(logs_directory)
# Init logs object
logs = logging.getLogger(logging_name)
logs.setLevel(debug_level)
# Add Handlers
self.file_handler = logging.FileHandler(f'logs{os_sep}{logging_name}.log',encoding='UTF-8')
self.file_handler.setLevel(debug_file_level)
self.stdout_handler = logging.StreamHandler()
self.stdout_handler.setLevel(debug_stdout_level)
# Define log format
formatter = logging.Formatter(
fmt='%(asctime)s - %(levelname)s - %(message)s (%(filename)s:%(funcName)s:%(lineno)d)',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Apply log format
self.file_handler.setFormatter(formatter)
self.stdout_handler.setFormatter(formatter)
# Add handler to logs
logs.addHandler(self.file_handler)
logs.addHandler(self.stdout_handler)
# Apply the filter
logs.addFilter(self.replace_filter)
logs.info(f'#################### STARTING {self.LOGGING_NAME} ####################')
return logs
def set_stdout_handler_level(self, level: int) -> None:
self.stdout_handler.setLevel(level)
def set_file_handler_level(self, level: int) -> None:
self.file_handler.setLevel(level)
def replace_filter(self, record: logging.LogRecord) -> bool:
response = True
filter: list[str] = ['PING', f":{self.SERVER_PREFIX}auth", "['PASS'"]
# record.msg = record.getMessage().replace("PING", "[REDACTED]")
# if self.LOGGING_CONSOLE:
# print(record.getMessage())
for f in filter:
if f in record.getMessage():
response = False
return response # Retourne True pour permettre l'affichage du message

View File

@@ -2,8 +2,9 @@
Main utils library.
'''
from pathlib import Path
from re import sub
from typing import Literal, Optional, Any
from datetime import datetime
from datetime import datetime, timedelta, timezone
from time import time
from random import choice
from hashlib import md5, sha3_512
@@ -29,6 +30,9 @@ def get_unixtime() -> int:
Returns:
int: Current time in seconds since the Epoch (int)
"""
cet_offset = timezone(timedelta(hours=2))
now_cet = datetime.now(cet_offset)
unixtime_cet = int(now_cet.timestamp())
return int(time())
def get_sdatetime() -> str:
@@ -89,4 +93,21 @@ def get_all_modules() -> list[str]:
list[str]: List of module names.
"""
base_path = Path('mods')
return [file.name.replace('.py', '') for file in base_path.rglob('mod_*.py')]
return [file.name.replace('.py', '') for file in base_path.rglob('mod_*.py')]
def clean_uid(uid: str) -> Optional[str]:
"""Clean UID by removing @ / % / + / ~ / * / :
Args:
uid (str): The UID to clean
Returns:
str: Clean UID without any sign
"""
if uid is None:
return None
pattern = fr'[:|@|%|\+|~|\*]*'
parsed_UID = sub(pattern, '', uid)
return parsed_UID

View File

@@ -28,7 +28,7 @@ class Clone:
self.Base = irc_instance.Base
# Add logs object to the module (Mandatory)
self.Logs = irc_instance.Base.logs
self.Logs = irc_instance.Loader.Logs
# Add User object to the module (Mandatory)
self.User = irc_instance.User

View File

@@ -156,7 +156,7 @@ def create_new_clone(uplink: 'Clone', faker_instance: 'Faker', group: str = 'Def
uid = generate_uid_for_clone(faker, uplink.Config.SERVEUR_ID)
checkUid = uplink.Clone.uid_exists(uid=uid)
clone = uplink.Definition.MClone(
clone = uplink.Schemas.MClone(
connected=False,
nickname=nickname,
username=username,
@@ -176,7 +176,7 @@ def create_new_clone(uplink: 'Clone', faker_instance: 'Faker', group: str = 'Def
def handle_on_privmsg(uplink: 'Clone', srvmsg: list[str]):
uid_sender = uplink.User.clean_uid(srvmsg[1])
uid_sender = uplink.Irc.Utils.clean_uid(srvmsg[1])
senderObj = uplink.User.get_User(uid_sender)
if senderObj.hostname in uplink.Config.CLONE_LOG_HOST_EXEMPT:

View File

@@ -34,8 +34,11 @@ class Command:
# Add Base object to the module (Mandatory)
self.Base = ircInstance.Base
# Add main Utils to the module
self.MainUtils = ircInstance.Utils
# Add logs object to the module (Mandatory)
self.Logs = ircInstance.Base.logs
self.Logs = ircInstance.Loader.Logs
# Add User object to the module (Mandatory)
self.User = ircInstance.User
@@ -296,7 +299,7 @@ class Command:
except Exception as err:
self.Logs.error(f"General Error: {err}")
def hcmds(self, uidornickname: str, channel_name: Optional[str], cmd: list, fullcmd: list = []) -> None:
def hcmds(self, uidornickname: str, channel_name: Optional[str], cmd: list, fullcmd: list = []):
command = str(cmd[0]).lower()
dnickname = self.Config.SERVICE_NICKNAME
@@ -307,7 +310,8 @@ class Command:
fromchannel = channel_name
match command:
case "automode":
case 'automode':
# automode set nickname [+/-mode] #channel
# automode set adator +o #channel
try:
@@ -346,7 +350,7 @@ class Command:
if db_result is not None:
if sign == '+':
db_data = {"updated_on": self.Base.get_datetime(), "nickname": nickname, "channel": chan, "mode": mode}
db_data = {"updated_on": self.MainUtils.get_datetime(), "nickname": nickname, "channel": chan, "mode": mode}
db_result = self.Base.db_execute_query(query="UPDATE command_automode SET mode = :mode, updated_on = :updated_on WHERE nickname = :nickname and channel = :channel",
params=db_data)
if db_result.rowcount > 0:
@@ -432,7 +436,7 @@ class Command:
for uid in uids_split:
for i in range(0, len(uid)):
mode += set_mode
users += f'{self.User.get_nickname(self.Base.clean_uid(uid[i]))} '
users += f'{self.User.get_nickname(self.MainUtils.clean_uid(uid[i]))} '
if i == len(uid) - 1:
self.Protocol.send2socket(f":{service_id} MODE {fromchannel} +{mode} {users}")
mode = ''
@@ -454,7 +458,7 @@ class Command:
for uid in uids_split:
for i in range(0, len(uid)):
mode += set_mode
users += f'{self.User.get_nickname(self.Base.clean_uid(uid[i]))} '
users += f'{self.User.get_nickname(self.MainUtils.clean_uid(uid[i]))} '
if i == len(uid) - 1:
self.Protocol.send2socket(f":{service_id} MODE {fromchannel} +{mode} {users}")
mode = ''
@@ -1146,7 +1150,7 @@ class Command:
# .svsnick nickname newnickname
nickname = str(cmd[1])
newnickname = str(cmd[2])
unixtime = self.Base.get_unixtime()
unixtime = self.MainUtils.get_unixtime()
if self.User.get_nickname(nickname) is None:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" This nickname do not exist")
@@ -1192,7 +1196,7 @@ class Command:
nickname = str(cmd[1])
hostname = str(cmd[2])
set_at_timestamp = self.Base.get_unixtime()
set_at_timestamp = self.MainUtils.get_unixtime()
expire_time = (60 * 60 * 24) + set_at_timestamp
gline_reason = ' '.join(cmd[3:])
@@ -1201,7 +1205,7 @@ class Command:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" /msg {dnickname} {command.upper()} nickname host reason")
return None
self.Protocol.gline(nickname=nickname, hostname=hostname, set_by=dnickname, expire_timestamp=expire_time, set_at_timestamp=set_at_timestamp, reason=gline_reason)
self.Protocol.send_gline(nickname=nickname, hostname=hostname, set_by=dnickname, expire_timestamp=expire_time, set_at_timestamp=set_at_timestamp, reason=gline_reason)
except KeyError as ke:
self.Logs.error(ke)
@@ -1222,7 +1226,7 @@ class Command:
hostname = str(cmd[2])
# self.Protocol.send2socket(f":{self.Config.SERVEUR_ID} TKL - G {nickname} {hostname} {dnickname}")
self.Protocol.ungline(nickname=nickname, hostname=hostname)
self.Protocol.send_ungline(nickname=nickname, hostname=hostname)
except KeyError as ke:
self.Logs.error(ke)
@@ -1240,7 +1244,7 @@ class Command:
nickname = str(cmd[1])
hostname = str(cmd[2])
set_at_timestamp = self.Base.get_unixtime()
set_at_timestamp = self.MainUtils.get_unixtime()
expire_time = (60 * 60 * 24) + set_at_timestamp
gline_reason = ' '.join(cmd[3:])
@@ -1249,7 +1253,7 @@ class Command:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" /msg {dnickname} {command.upper()} nickname host reason")
return None
self.Protocol.kline(nickname=nickname, hostname=hostname, set_by=dnickname, expire_timestamp=expire_time, set_at_timestamp=set_at_timestamp, reason=gline_reason)
self.Protocol.send_kline(nickname=nickname, hostname=hostname, set_by=dnickname, expire_timestamp=expire_time, set_at_timestamp=set_at_timestamp, reason=gline_reason)
except KeyError as ke:
self.Logs.error(ke)
@@ -1269,7 +1273,7 @@ class Command:
nickname = str(cmd[1])
hostname = str(cmd[2])
self.Protocol.unkline(nickname=nickname, hostname=hostname)
self.Protocol.send_unkline(nickname=nickname, hostname=hostname)
except KeyError as ke:
self.Logs.error(ke)
@@ -1288,7 +1292,7 @@ class Command:
nickname = str(cmd[1])
hostname = str(cmd[2])
set_at_timestamp = self.Base.get_unixtime()
set_at_timestamp = self.MainUtils.get_unixtime()
expire_time = (60 * 60 * 24) + set_at_timestamp
shun_reason = ' '.join(cmd[3:])

View File

@@ -30,7 +30,7 @@ class Defender:
self.Base = irc_instance.Base
# Add logs object to the module (Mandatory)
self.Logs = irc_instance.Base.logs
self.Logs = irc_instance.Loader.Logs
# Add User object to the module (Mandatory)
self.User = irc_instance.User
@@ -118,7 +118,7 @@ class Defender:
self.Base.create_thread(func=thds.thread_autolimit, func_args=(self, ))
if self.ModConfig.reputation == 1:
self.Protocol.sjoin(self.Config.SALON_JAIL)
self.Protocol.send_sjoin(self.Config.SALON_JAIL)
self.Protocol.send2socket(f":{self.Config.SERVICE_NICKNAME} SAMODE {self.Config.SALON_JAIL} +o {self.Config.SERVICE_NICKNAME}")
return None
@@ -176,7 +176,7 @@ class Defender:
self.Schemas.DB_FREEIPAPI_USERS = freeipapi
if cloudfilt:
self.Schemas.DB_CLOUD_FILT_USERS = cloudfilt
self.Schemas.DB_CLOUDFILT_USERS = cloudfilt
if psutils:
self.Schemas.DB_PSUTIL_USERS = psutils
@@ -239,7 +239,7 @@ class Defender:
for channel in channels:
chan = channel[0]
self.Protocol.sjoin(chan)
self.Protocol.send_sjoin(chan)
if chan == jail_chan:
self.Protocol.send2socket(f":{service_id} SAMODE {jail_chan} +{dumodes} {dnickname}")
self.Protocol.send2socket(f":{service_id} MODE {jail_chan} +{jail_chan_mode}")
@@ -931,15 +931,15 @@ class Defender:
case 'info':
try:
if len(cmd) < 2:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Syntax. /msg {dnickname} INFO [nickname]")
return None
nickoruid = cmd[1]
UserObject = self.User.get_User(nickoruid)
if UserObject is not None:
channels: list = []
for chan in self.Channel.UID_CHANNEL_DB:
for uid_in_chan in chan.uids:
if self.Base.clean_uid(uid_in_chan) == UserObject.uid:
channels.append(chan.name)
channels: list = [chan.name for chan in self.Channel.UID_CHANNEL_DB for uid_in_chan in chan.uids if self.Loader.Utils.clean_uid(uid_in_chan) == UserObject.uid]
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' UID : {UserObject.uid}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' NICKNAME : {UserObject.nickname}')
@@ -956,7 +956,7 @@ class Defender:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' CHANNELS : {channels}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' CONNECTION TIME : {UserObject.connexion_datetime}')
else:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f":{dnickname} NOTICE {fromuser} : This user {nickoruid} doesn't exist")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"This user {nickoruid} doesn't exist")
except KeyError as ke:
self.Logs.warning(f"Key error info user : {ke}")

View File

@@ -81,7 +81,7 @@ def handle_on_sjoin(uplink: 'Defender', srvmsg: list[str]):
confmodel = uplink.ModConfig
parsed_chan = srvmsg[4] if irc.Channel.is_valid_channel(srvmsg[4]) else None
parsed_UID = irc.User.clean_uid(srvmsg[5])
parsed_UID = uplink.Loader.Utils.clean_uid(srvmsg[5])
if parsed_chan is None or parsed_UID is None:
return
@@ -145,7 +145,7 @@ def handle_on_nick(uplink: 'Defender', srvmsg: list[str]):
srvmsg (list[str]): The Server MSG
confmodel (ModConfModel): The Module Configuration
"""
uid = uplink.User.clean_uid(str(srvmsg[1]))
uid = uplink.Loader.Utils.clean_uid(str(srvmsg[1]))
p = uplink.Protocol
confmodel = uplink.ModConfig
@@ -180,7 +180,7 @@ def handle_on_quit(uplink: 'Defender', srvmsg: list[str]):
confmodel = uplink.ModConfig
ban_all_chan = uplink.Base.int_if_possible(confmodel.reputation_ban_all_chan)
final_UID = uplink.User.clean_uid(str(srvmsg[1]))
final_UID = uplink.Loader.Utils.clean_uid(str(srvmsg[1]))
jail_salon = uplink.Config.SALON_JAIL
service_id = uplink.Config.SERVICE_ID
get_user_reputation = uplink.Reputation.get_Reputation(final_UID)
@@ -238,7 +238,7 @@ def handle_on_uid(uplink: 'Defender', srvmsg: list[str]):
irc.Reputation.insert(
irc.Loader.Definition.MReputation(
**_User.to_dict(),
secret_code=irc.Base.get_random(8)
secret_code=irc.Utils.generate_random_string(8)
)
)
if irc.Reputation.is_exist(_User.uid):
@@ -278,7 +278,7 @@ def action_on_flood(uplink: 'Defender', srvmsg: list[str]):
get_detected_uid = User.uid
get_detected_nickname = User.nickname
unixtime = irc.Base.get_unixtime()
unixtime = irc.Utils.get_unixtime()
get_diff_secondes = 0
def get_flood_user(uid: str) -> Optional[FloodUser]:

View File

@@ -33,7 +33,7 @@ class Jsonrpc():
self.Base = ircInstance.Base
# Add logs object to the module (Mandatory)
self.Logs = ircInstance.Base.logs
self.Logs = ircInstance.Loader.Logs
# Add User object to the module (Mandatory)
self.User = ircInstance.User

View File

@@ -34,7 +34,7 @@ class Test():
self.Base = ircInstance.Base
# Add logs object to the module (Mandatory)
self.Logs = ircInstance.Base.logs
self.Logs = ircInstance.Loader.Logs
# Add User object to the module (Mandatory)
self.User = ircInstance.User

View File

@@ -10,7 +10,9 @@
"""
import re
import mods.votekick.schemas as schemas
import mods.votekick.utils as utils
from mods.votekick.votekick_manager import VotekickManager
import mods.votekick.threads as thds
from typing import TYPE_CHECKING, Any, Optional
if TYPE_CHECKING:
@@ -40,7 +42,7 @@ class Votekick:
self.Base = uplink.Base
# Add logs object to the module
self.Logs = uplink.Base.logs
self.Logs = uplink.Logs
# Add User object to the module
self.User = uplink.User
@@ -51,9 +53,15 @@ class Votekick:
# Add Utils.
self.Utils = uplink.Utils
# Add Utils module
self.ModUtils = utils
# Add Schemas module
self.Schemas = schemas
# Add Threads module
self.Threads = thds
# Add VoteKick Manager
self.VoteKickManager = VotekickManager(self)
@@ -77,7 +85,7 @@ class Votekick:
# Add admin object to retrieve admin users
self.Admin = self.Irc.Admin
self.__create_tables()
self.join_saved_channels()
self.ModUtils.join_saved_channels(self)
return None
@@ -126,139 +134,28 @@ class Votekick:
except Exception as err:
self.Logs.error(f'General Error: {err}')
def init_vote_system(self, channel: str) -> bool:
response = False
for chan in self.VoteKickManager.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
chan.target_user = ''
chan.voter_users = []
chan.vote_against = 0
chan.vote_for = 0
response = True
return response
def insert_vote_channel(self, channel_obj: schemas.VoteChannelModel) -> bool:
result = False
found = False
for chan in self.VoteKickManager.VOTE_CHANNEL_DB:
if chan.channel_name == channel_obj.channel_name:
found = True
if not found:
self.VoteKickManager.VOTE_CHANNEL_DB.append(channel_obj)
self.Logs.debug(f"The channel has been added {channel_obj}")
# self.db_add_vote_channel(ChannelObject.channel_name)
return result
def db_add_vote_channel(self, channel: str) -> bool:
"""Cette fonction ajoute les salons ou seront autoriser les votes
Args:
channel (str): le salon à enregistrer.
"""
current_datetime = self.Utils.get_sdatetime()
mes_donnees = {'channel': channel}
response = self.Base.db_execute_query("SELECT id FROM votekick_channel WHERE channel = :channel", mes_donnees)
is_channel_exist = response.fetchone()
if is_channel_exist is None:
mes_donnees = {'datetime': current_datetime, 'channel': channel}
insert = self.Base.db_execute_query(f"INSERT INTO votekick_channel (datetime, channel) VALUES (:datetime, :channel)", mes_donnees)
if insert.rowcount > 0:
return True
else:
return False
else:
return False
def db_delete_vote_channel(self, channel: str) -> bool:
"""Cette fonction supprime les salons de join de Defender
Args:
channel (str): le salon à enregistrer.
"""
mes_donnes = {'channel': channel}
response = self.Base.db_execute_query("DELETE FROM votekick_channel WHERE channel = :channel", mes_donnes)
affected_row = response.rowcount
if affected_row > 0:
return True
else:
return False
def join_saved_channels(self) -> None:
param = {'module_name': self.module_name}
result = self.Base.db_execute_query(f"SELECT id, channel_name FROM {self.Config.TABLE_CHANNEL} WHERE module_name = :module_name", param)
channels = result.fetchall()
for channel in channels:
id_, chan = channel
self.insert_vote_channel(self.Schemas.VoteChannelModel(channel_name=chan, target_user='', voter_users=[], vote_for=0, vote_against=0))
self.Protocol.sjoin(channel=chan)
self.Protocol.send2socket(f":{self.Config.SERVICE_NICKNAME} SAMODE {chan} +o {self.Config.SERVICE_NICKNAME}")
return None
def is_vote_ongoing(self, channel: str) -> bool:
response = False
for vote in self.VoteKickManager.VOTE_CHANNEL_DB:
if vote.channel_name == channel:
if vote.target_user:
response = True
return response
def timer_vote_verdict(self, channel: str) -> None:
dnickname = self.Config.SERVICE_NICKNAME
if not self.is_vote_ongoing(channel):
return None
for chan in self.VoteKickManager.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
target_user = self.User.get_nickname(chan.target_user)
if chan.vote_for > chan.vote_against:
self.Protocol.send_priv_msg(
nick_from=dnickname,
msg=f"User {self.Config.COLORS.bold}{target_user}{self.Config.COLORS.nogc} has {chan.vote_against} votes against and {chan.vote_for} votes for. For this reason, it'll be kicked from the channel",
channel=channel
)
self.Protocol.send2socket(f":{dnickname} KICK {channel} {target_user} Following the vote, you are not welcome in {channel}")
self.Channel.delete_user_from_channel(channel, self.User.get_uid(target_user))
elif chan.vote_for <= chan.vote_against:
self.Protocol.send_priv_msg(
nick_from=dnickname,
msg=f"User {self.Config.COLORS.bold}{target_user}{self.Config.COLORS.nogc} has {chan.vote_against} votes against and {chan.vote_for} votes for. For this reason, it\'ll remain in the channel",
channel=channel
)
# Init the system
if self.init_vote_system(channel):
self.Protocol.send_priv_msg(
nick_from=dnickname,
msg="System vote re initiated",
channel=channel
)
return None
def cmd(self, data: list) -> None:
cmd = list(data).copy()
if not data or len(data) < 2:
return None
cmd = data.copy() if isinstance(data, list) else list(data).copy()
index, command = self.Irc.Protocol.get_ircd_protocol_poisition(cmd)
if index == -1:
return None
try:
return None
match command:
case 'PRIVMSG':
return None
case 'QUIT':
return None
case _:
return None
except KeyError as ke:
self.Logs.error(f"Key Error: {ke}")
@@ -307,24 +204,17 @@ class Votekick:
if sentchannel is None:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f" The correct command is {self.Config.SERVICE_PREFIX}{command} {option} #CHANNEL")
self.insert_vote_channel(
self.Schemas.VoteChannelModel(
channel_name=sentchannel,
target_user='',
voter_users=[],
vote_for=0,
vote_against=0
)
)
if self.VoteKickManager.activate_new_channel(sentchannel):
self.Channel.db_query_channel('add', self.module_name, sentchannel)
self.Protocol.send_join_chan(uidornickname=dnickname, channel=sentchannel)
self.Protocol.send2socket(f":{dnickname} SAMODE {sentchannel} +o {dnickname}")
self.Protocol.send_priv_msg(nick_from=dnickname,
msg="You can now use !submit <nickname> to decide if he will stay or not on this channel ",
channel=sentchannel
)
self.Channel.db_query_channel('add', self.module_name, sentchannel)
return None
self.Protocol.send_join_chan(uidornickname=dnickname, channel=sentchannel)
self.Protocol.send2socket(f":{dnickname} SAMODE {sentchannel} +o {dnickname}")
self.Protocol.send_priv_msg(nick_from=dnickname,
msg="You can now use !submit <nickname> to decide if he will stay or not on this channel ",
channel=sentchannel
)
except Exception as err:
self.Logs.error(f'{err}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} {command} {option} #channel')
@@ -344,12 +234,10 @@ class Votekick:
self.Protocol.send2socket(f":{dnickname} SAMODE {sentchannel} -o {dnickname}")
self.Protocol.send_part_chan(uidornickname=dnickname, channel=sentchannel)
for chan in self.VoteKickManager.VOTE_CHANNEL_DB:
if chan.channel_name == sentchannel:
self.VoteKickManager.VOTE_CHANNEL_DB.remove(chan)
self.Channel.db_query_channel('del', self.module_name, chan.channel_name)
if self.VoteKickManager.drop_vote_channel_model(sentchannel):
self.Channel.db_query_channel('del', self.module_name, sentchannel)
return None
self.Logs.debug(f"The Channel {sentchannel} has been deactivated from the vote system")
except Exception as err:
self.Logs.error(f'{err}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f" /msg {dnickname} {command} {option} #channel")
@@ -359,20 +247,11 @@ class Votekick:
try:
# vote +
channel = fromchannel
for chan in self.VoteKickManager.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
if fromuser in chan.voter_users:
self.Protocol.send_priv_msg(nick_from=dnickname,
msg="You already submitted a vote",
channel=channel
)
else:
chan.vote_for += 1
chan.voter_users.append(fromuser)
self.Protocol.send_priv_msg(nick_from=dnickname,
msg="Vote recorded, thank you",
channel=channel
)
if self.VoteKickManager.action_vote(channel, fromuser, '+'):
self.Protocol.send_priv_msg(nick_from=dnickname, msg="Vote recorded, thank you",channel=channel)
else:
self.Protocol.send_priv_msg(nick_from=dnickname, msg="You already submitted a vote", channel=channel)
except Exception as err:
self.Logs.error(f'{err}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} {command} {option}')
@@ -382,20 +261,11 @@ class Votekick:
try:
# vote -
channel = fromchannel
for chan in self.VoteKickManager.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
if fromuser in chan.voter_users:
self.Protocol.send_priv_msg(nick_from=dnickname,
msg="You already submitted a vote",
channel=channel
)
else:
chan.vote_against += 1
chan.voter_users.append(fromuser)
self.Protocol.send_priv_msg(nick_from=dnickname,
msg="Vote recorded, thank you",
channel=channel
)
if self.VoteKickManager.action_vote(channel, fromuser, '-'):
self.Protocol.send_priv_msg(nick_from=dnickname, msg="Vote recorded, thank you",channel=channel)
else:
self.Protocol.send_priv_msg(nick_from=dnickname, msg="You already submitted a vote", channel=channel)
except Exception as err:
self.Logs.error(f'{err}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} {command} {option}')
@@ -414,11 +284,11 @@ class Votekick:
for vote in self.VoteKickManager.VOTE_CHANNEL_DB:
if vote.channel_name == channel:
self.init_vote_system(channel)
self.Protocol.send_priv_msg(nick_from=dnickname,
msg="Vote system re-initiated",
channel=channel
)
if self.VoteKickManager.init_vote_system(channel):
self.Protocol.send_priv_msg(nick_from=dnickname,
msg="Vote system re-initiated",
channel=channel
)
except Exception as err:
self.Logs.error(f'{err}')
@@ -452,16 +322,15 @@ class Votekick:
ongoing_user = None
# check if there is an ongoing vote
if self.is_vote_ongoing(channel):
for vote in self.VoteKickManager.VOTE_CHANNEL_DB:
if vote.channel_name == channel:
ongoing_user = self.User.get_nickname(vote.target_user)
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"There is an ongoing vote on {ongoing_user}",
channel=channel
)
return None
if self.VoteKickManager.is_vote_ongoing(channel):
votec = self.VoteKickManager.get_vote_channel_model(channel)
if votec:
ongoing_user = self.User.get_nickname(votec.target_user)
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"There is an ongoing vote on {ongoing_user}",
channel=channel
)
return None
# check if the user exist
if user_submitted is None:
@@ -471,7 +340,7 @@ class Votekick:
)
return None
uid_cleaned = self.Base.clean_uid(uid_submitted)
uid_cleaned = self.Loader.Utils.clean_uid(uid_submitted)
channel_obj = self.Channel.get_channel(channel)
if channel_obj is None:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' This channel [{channel}] do not exist in the Channel Object')
@@ -479,7 +348,7 @@ class Votekick:
clean_uids_in_channel: list = []
for uid in channel_obj.uids:
clean_uids_in_channel.append(self.Base.clean_uid(uid))
clean_uids_in_channel.append(self.Loader.Utils.clean_uid(uid))
if not uid_cleaned in clean_uids_in_channel:
self.Protocol.send_priv_msg(nick_from=dnickname,
@@ -507,7 +376,7 @@ class Votekick:
channel=channel
)
self.Base.create_timer(60, self.timer_vote_verdict, (channel, ))
self.Base.create_timer(60, self.Threads.timer_vote_verdict, (self, channel))
self.Protocol.send_priv_msg(nick_from=dnickname,
msg="This vote will end after 60 secondes",
channel=channel
@@ -524,30 +393,31 @@ class Votekick:
if self.Admin.get_admin(fromuser) is None:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f'Your are not allowed to execute this command')
return None
for chan in self.VoteKickManager.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
target_user = self.User.get_nickname(chan.target_user)
if chan.vote_for > chan.vote_against:
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"User {self.Config.COLORS.bold}{target_user}{self.Config.COLORS.nogc} has {chan.vote_against} votes against and {chan.vote_for} votes for. For this reason, it\'ll be kicked from the channel",
votec = self.VoteKickManager.get_vote_channel_model(channel)
if votec:
target_user = self.User.get_nickname(votec.target_user)
if votec.vote_for >= votec.vote_against:
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"User {self.Config.COLORS.bold}{target_user}{self.Config.COLORS.nogc} has {votec.vote_against} votes against and {votec.vote_for} votes for. For this reason, it\'ll be kicked from the channel",
channel=channel
)
self.Protocol.send2socket(f":{dnickname} KICK {channel} {target_user} Following the vote, you are not welcome in {channel}")
elif chan.vote_for <= chan.vote_against:
self.Protocol.send_priv_msg(
self.Protocol.send2socket(f":{dnickname} KICK {channel} {target_user} Following the vote, you are not welcome in {channel}")
else:
self.Protocol.send_priv_msg(
nick_from=dnickname,
msg=f"User {self.Config.COLORS.bold}{target_user}{self.Config.COLORS.nogc} has {chan.vote_against} votes against and {chan.vote_for} votes for. For this reason, it\'ll remain in the channel",
msg=f"User {self.Config.COLORS.bold}{target_user}{self.Config.COLORS.nogc} has {votec.vote_against} votes against and {votec.vote_for} votes for. For this reason, it\'ll remain in the channel",
channel=channel
)
# Init the system
if self.init_vote_system(channel):
self.Protocol.send_priv_msg(
if self.VoteKickManager.init_vote_system(channel):
self.Protocol.send_priv_msg(
nick_from=dnickname,
msg="System vote re initiated",
channel=channel
)
return None
except Exception as err:
self.Logs.error(f'{err}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} {command} {option}')

40
mods/votekick/threads.py Normal file
View File

@@ -0,0 +1,40 @@
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from mods.votekick.mod_votekick import Votekick
def timer_vote_verdict(uplink: 'Votekick', channel: str) -> None:
dnickname = uplink.Config.SERVICE_NICKNAME
if not uplink.VoteKickManager.is_vote_ongoing(channel):
return None
votec = uplink.VoteKickManager.get_vote_channel_model(channel)
if votec:
target_user = uplink.User.get_nickname(votec.target_user)
if votec.vote_for >= votec.vote_against and votec.vote_for != 0:
uplink.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"User {uplink.Config.COLORS.bold}{target_user}{uplink.Config.COLORS.nogc} has {votec.vote_against} votes against and {votec.vote_for} votes for. For this reason, it\'ll be kicked from the channel",
channel=channel
)
uplink.Protocol.send2socket(f":{dnickname} KICK {channel} {target_user} Following the vote, you are not welcome in {channel}")
else:
uplink.Protocol.send_priv_msg(
nick_from=dnickname,
msg=f"User {uplink.Config.COLORS.bold}{target_user}{uplink.Config.COLORS.nogc} has {votec.vote_against} votes against and {votec.vote_for} votes for. For this reason, it\'ll remain in the channel",
channel=channel
)
if uplink.VoteKickManager.init_vote_system(channel):
uplink.Protocol.send_priv_msg(
nick_from=dnickname,
msg="System vote re initiated",
channel=channel
)
return None
return None

74
mods/votekick/utils.py Normal file
View File

@@ -0,0 +1,74 @@
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from mods.votekick.mod_votekick import Votekick
def add_vote_channel_to_database(uplink: 'Votekick', channel: str) -> bool:
"""Adds a new channel to the votekick database if it doesn't already exist.
This function checks if the specified channel is already registered in the
`votekick_channel` table. If not, it inserts a new entry with the current timestamp.
Args:
uplink (Votekick): The main votekick system instance that provides access to utilities and database operations.
channel (str): The name of the channel to be added to the database.
Returns:
bool: True if the channel was successfully inserted into the database.
False if the channel already exists or the insertion failed.
"""
current_datetime = uplink.Utils.get_sdatetime()
mes_donnees = {'channel': channel}
response = uplink.Base.db_execute_query("SELECT id FROM votekick_channel WHERE channel = :channel", mes_donnees)
is_channel_exist = response.fetchone()
if is_channel_exist is None:
mes_donnees = {'datetime': current_datetime, 'channel': channel}
insert = uplink.Base.db_execute_query(f"INSERT INTO votekick_channel (datetime, channel) VALUES (:datetime, :channel)", mes_donnees)
if insert.rowcount > 0:
return True
else:
return False
else:
return False
def delete_vote_channel_from_database(uplink: 'Votekick', channel: str) -> bool:
"""Deletes a channel entry from the votekick database.
This function removes the specified channel from the `votekick_channel` table
if it exists. It returns True if the deletion was successful.
Args:
uplink (Votekick): The main votekick system instance used to execute the database operation.
channel (str): The name of the channel to be removed from the database.
Returns:
bool: True if the channel was successfully deleted, False if no rows were affected.
"""
mes_donnes = {'channel': channel}
response = uplink.Base.db_execute_query("DELETE FROM votekick_channel WHERE channel = :channel", mes_donnes)
affected_row = response.rowcount
if affected_row > 0:
return True
else:
return False
def join_saved_channels(uplink: 'Votekick') -> None:
param = {'module_name': uplink.module_name}
result = uplink.Base.db_execute_query(f"SELECT id, channel_name FROM {uplink.Config.TABLE_CHANNEL} WHERE module_name = :module_name", param)
channels = result.fetchall()
for channel in channels:
id_, chan = channel
uplink.VoteKickManager.activate_new_channel(chan)
uplink.Protocol.send_sjoin(channel=chan)
uplink.Protocol.send2socket(f":{uplink.Config.SERVICE_NICKNAME} SAMODE {chan} +o {uplink.Config.SERVICE_NICKNAME}")
return None

View File

@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Literal, Optional
from mods.votekick.schemas import VoteChannelModel
if TYPE_CHECKING:
@@ -38,7 +38,32 @@ class VotekickManager:
return True
return False
def init_vote_system(self, channel_name: str) -> bool:
"""Initializes or resets the votekick system for a given channel.
This method clears the current target, voter list, and vote counts
in preparation for a new votekick session.
Args:
channel_name (str): The name of the channel for which the votekick system should be initialized.
Returns:
bool: True if the votekick system was successfully initialized, False if the channel is not found.
"""
votec = self.get_vote_channel_model(channel_name)
if votec is None:
self.Logs.debug(f"[VOTEKICK MANAGER] The channel ({channel_name}) is not active!")
return False
votec.target_user = ''
votec.voter_users = []
votec.vote_for = 0
votec.vote_against = 0
self.Logs.debug(f"[VOTEKICK MANAGER] The channel ({channel_name}) has been successfully initialized!")
return True
def get_vote_channel_model(self, channel_name: str) -> Optional[VoteChannelModel]:
"""Get Vote Channel Object model
@@ -96,3 +121,43 @@ class VotekickManager:
self.Logs.debug(f'[VOTEKICK MANAGER] {channel_name} is activated but there is no ongoing vote!')
return False
def action_vote(self, channel_name: str, nickname: str, action: Literal['+', '-']) -> bool:
"""
Registers a vote (for or against) in an active votekick session on a channel.
Args:
channel_name (str): The name of the channel where the votekick session is active.
nickname (str): The nickname of the user casting the vote.
action (Literal['+', '-']): The vote action. Use '+' to vote for kicking, '-' to vote against.
Returns:
bool: True if the vote was successfully registered, False otherwise.
This can fail if:
- The action is invalid (not '+' or '-')
- The user has already voted
- The channel has no active votekick session
"""
if action not in ['+', '-']:
self.Logs.debug(f"[VOTEKICK MANAGER] The action must be + or - while you have provided ({action})")
return False
votec = self.get_vote_channel_model(channel_name)
if votec:
client_obj = self.uplink.User.get_User(votec.target_user)
client_to_punish = votec.target_user if client_obj is None else client_obj.nickname
if nickname in votec.voter_users:
self.Logs.debug(f"[VOTEKICK MANAGER] This nickname ({nickname}) has already voted for ({client_to_punish})")
return False
else:
if action == '+':
votec.vote_for += 1
elif action == '-':
votec.vote_against += 1
votec.voter_users.append(nickname)
self.Logs.debug(f"[VOTEKICK MANAGER] The ({nickname}) has voted to ban ({client_to_punish})")
return True
else:
self.Logs.debug(f"[VOTEKICK MANAGER] This channel {channel_name} is not active!")
return False