From b81f502b955f8619cbbc73e91be037c04bd4ed17 Mon Sep 17 00:00:00 2001 From: adator <85586985+adator85@users.noreply.github.com> Date: Fri, 15 Nov 2024 22:14:11 +0100 Subject: [PATCH] V6.0.2 --- core/base.py | 4 +- core/classes/protocols/unreal6.py | 110 +++++++++++-- core/classes/settings.py | 3 +- core/classes/user.py | 2 +- core/irc.py | 252 ++++++++++++++---------------- mods/mod_clone.py | 2 +- mods/mod_command.py | 131 ++++++++++------ mods/mod_defender.py | 10 +- mods/mod_jsonrpc.py | 2 +- mods/mod_test.py | 2 +- mods/mod_votekick.py | 2 +- 11 files changed, 310 insertions(+), 210 deletions(-) diff --git a/core/base.py b/core/base.py index 6eb2edd..a159eb6 100644 --- a/core/base.py +++ b/core/base.py @@ -222,8 +222,8 @@ class Base: filter: list[str] = ['PING', f":{self.Config.SERVICE_PREFIX}auth"] # record.msg = record.getMessage().replace("PING", "[REDACTED]") - # if self.Settings.CONSOLE: - # print(record.getMessage()) + if self.Settings.CONSOLE: + print(record.getMessage()) for f in filter: if f in record.getMessage(): diff --git a/core/classes/protocols/unreal6.py b/core/classes/protocols/unreal6.py index 81a1bfa..dffd537 100644 --- a/core/classes/protocols/unreal6.py +++ b/core/classes/protocols/unreal6.py @@ -1,9 +1,11 @@ from re import match, findall from datetime import datetime -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Union from ssl import SSLEOFError, SSLError from dataclasses import dataclass +from websockets import serve + if TYPE_CHECKING: from core.irc import Irc @@ -17,6 +19,12 @@ class Unrealircd6: self.__Base = ircInstance.Base self.__Settings = ircInstance.Base.Settings + self.known_protocol = ['SJOIN', 'UID', 'MD', 'QUIT', 'SQUIT', + 'EOS', 'PRIVMSG', 'MODE', 'UMODE2', + 'VERSION', 'REPUTATION', 'SVS2MODE', + 'SLOG', 'NICK', 'PART', 'PONG' + ] + self.__Base.logs.info(f"** Loading protocol [{__name__}]") def send2socket(self, message: str, print_log: bool = True) -> None: @@ -104,6 +112,40 @@ class Unrealircd6: except Exception as err: self.__Base.logs.error(f"General Error: {err}") + def parse_server_msg(self, server_msg: list[str]) -> Union[str, None]: + """Parse the server message and return the command + + Args: + server_msg (list[str]): The Original server message >> + + Returns: + Union[str, None]: Return the command protocol name + """ + protocol_exception = ['PING', 'SERVER', 'PROTOCTL'] + increment = 0 + server_msg_copy = server_msg.copy() + first_index = 0 + second_index = 0 + for index, element in enumerate(server_msg_copy): + # Handle the protocol exceptions ex. ping, server .... + if element in protocol_exception and index == 0: + return element + + if element.startswith(':'): + increment += 1 + first_index = index + 1 if increment == 1 else first_index + second_index = index if increment == 2 else second_index + + second_index = len(server_msg_copy) if second_index == 0 else second_index + + parsed_msg = server_msg_copy[first_index:second_index] + + for cmd in parsed_msg: + if cmd in self.known_protocol: + return cmd + + return None + def link(self): """Créer le link et envoyer les informations nécessaires pour la connexion au serveur. @@ -408,6 +450,32 @@ class Unrealircd6: self.__Irc.Channel.delete_user_from_channel(channel, userObj.uid) return None + def on_svs2mode(self, serverMsg: list[str]) -> None: + """Handle svs2mode coming from a server + + Args: + serverMsg (list[str]): Original server message + """ + try: + # >> [':00BAAAAAG', 'SVS2MODE', '001U01R03', '-r'] + + uid_user_to_edit = serverMsg[2] + umode = serverMsg[3] + + userObj = self.__Irc.User.get_User(uid_user_to_edit) + + if userObj is None: + return None + + if self.__Irc.User.update_mode(userObj.uid, umode): + return None + + return None + except IndexError as ie: + self.__Base.logs.error(f"{__name__} - Index Error: {ie}") + except Exception as err: + self.__Base.logs.error(f"{__name__} - General Error: {err}") + def on_umode2(self, serverMsg: list[str]) -> None: """Handle umode2 coming from a server @@ -490,12 +558,32 @@ class Unrealircd6: serverMsg (list[str]): Original server message """ # ['PROTOCTL', 'CHANMODES=beI,fkL,lFH,cdimnprstzCDGKMNOPQRSTVZ', 'USERMODES=diopqrstwxzBDGHIRSTWZ', 'BOOTED=1728815798', 'PREFIX=(qaohv)~&@%+', 'SID=001', 'MLOCK', 'TS=1730662755', 'EXTSWHOIS'] - if len(serverMsg) > 5: - if '=' in serverMsg[5]: - serveur_hosting_id = str(serverMsg[5]).split('=') - self.__Config.HSID = serveur_hosting_id[1] - if 'USERMODES=' in serverMsg[2]: - self.__Settings.USER_MODES = list(serverMsg[2].split('=')[1]) + user_modes: str = None + prefix: str = None + host_server_id: str = None + + for msg in serverMsg: + pattern = None + if msg.startswith('PREFIX='): + pattern = r'^PREFIX=\((.*)\).*$' + find_match = match(pattern, msg) + prefix = find_match.group(1) if find_match else None + if find_match: + prefix = find_match.group(1) + + elif msg.startswith('USERMODES='): + pattern = r'^USERMODES=(.*)$' + find_match = match(pattern, msg) + user_modes = find_match.group(1) if find_match else None + elif msg.startswith('SID='): + host_server_id = msg.split('=')[1] + + if user_modes is None or prefix is None or host_server_id is None: + return None + + self.__Config.HSID = host_server_id + self.__Settings.PROTOCTL_USER_MODES = list(user_modes) + self.__Settings.PROTOCTL_PREFIX = list(prefix) return None @@ -630,7 +718,7 @@ class Unrealircd6: else: geoip = None - score_connexion = 0 + score_connexion = self.__Irc.first_score self.__Irc.User.insert( self.__Irc.Loader.Definition.MUser( @@ -662,7 +750,7 @@ class Unrealircd6: serverMsg (list[str]): List of str coming from the server """ try: - + # pong = str(serverMsg[1]).replace(':','') self.send2socket(f"PONG :{pong}", print_log=False) @@ -754,7 +842,7 @@ class Unrealircd6: def on_version_msg(self, serverMsg: list[str]) -> None: """Handle version coming from the server - + \n ex. /version Defender Args: serverMsg (list[str]): Original message from the server """ @@ -776,7 +864,7 @@ class Unrealircd6: response_005 = ' | '.join(modules) self.send2socket(f':{self.__Config.SERVICE_HOST} 005 {getUser.nickname} {response_005} are supported by this server') - response_005 = ''.join(self.__Settings.USER_MODES) + response_005 = ''.join(self.__Settings.PROTOCTL_USER_MODES) self.send2socket(f":{self.__Config.SERVICE_HOST} 005 {getUser.nickname} {response_005} are supported by this server") return None diff --git a/core/classes/settings.py b/core/classes/settings.py index d34010b..67aed51 100644 --- a/core/classes/settings.py +++ b/core/classes/settings.py @@ -11,4 +11,5 @@ class Settings: CONSOLE: bool = False - USER_MODES: list[str] = [] + PROTOCTL_USER_MODES: list[str] = [] + PROTOCTL_PREFIX: list[str] = [] diff --git a/core/classes/user.py b/core/classes/user.py index 035a9cb..ba7ce10 100644 --- a/core/classes/user.py +++ b/core/classes/user.py @@ -92,7 +92,7 @@ class User: return False liste_umodes = list(umodes) - final_umodes_liste = [x for x in self.Base.Settings.USER_MODES if x in liste_umodes] + final_umodes_liste = [x for x in self.Base.Settings.PROTOCTL_USER_MODES if x in liste_umodes] final_umodes = ''.join(final_umodes_liste) userObj.umodes = f"+{final_umodes}" diff --git a/core/irc.py b/core/irc.py index 6464db1..a94086c 100644 --- a/core/irc.py +++ b/core/irc.py @@ -1,3 +1,5 @@ +from ast import parse +from http import server import sys import socket import threading @@ -73,10 +75,13 @@ class Irc: self.autolimit_started: bool = False """This variable is to make sure the thread is not running""" - self.first_score: int = 100 + # define first reputation score to 0 + self.first_score: int = 0 + # Define the dict that will contain all loaded modules self.loaded_classes:dict[str, 'Irc'] = {} # Definir la variable qui contiendra la liste modules chargés + # Define the IrcSocket object self.IrcSocket:Union[socket.socket, SSLSocket] = None # Liste des commandes internes du bot @@ -731,109 +736,28 @@ class Irc: """ try: original_response: list[str] = data.copy() - interm_response: list[str] = data.copy() """This the original without first value""" - interm_response.pop(0) if len(original_response) == 0 or len(original_response) == 1: self.Logs.warning(f'Size ({str(len(original_response))}) - {original_response}') return False - if len(original_response) == 7: - if original_response[2] == 'PRIVMSG' and original_response[4] == ':auth': - data_copy = original_response.copy() - data_copy[6] = '**********' - self.Logs.debug(f">> {data_copy}") - else: - self.Logs.debug(f">> {original_response}") - else: - self.Logs.debug(f">> {original_response}") + parsed_protocol = self.Protocol.parse_server_msg(original_response.copy()) - match original_response[0]: + match parsed_protocol: case 'PING': - # Sending PONG response to the serveur - self.Protocol.on_server_ping(original_response) + self.Protocol.on_server_ping(serverMsg=original_response) + # print(f"** handle {parsed_protocol}") return None - case 'PROTOCTL': - #['PROTOCTL', 'CHANMODES=beI,fkL,lFH,cdimnprstzCDGKMNOPQRSTVZ', 'USERMODES=diopqrstwxzBDGHIRSTWZ', 'BOOTED=1702138935', - # 'PREFIX=(qaohv)~&@%+', 'SID=001', 'MLOCK', 'TS=1703793941', 'EXTSWHOIS'] - - # GET SERVER ID HOST - self.Protocol.on_protoctl(serverMsg=original_response) - - return None - - case _: - pass - - if len(original_response) < 2: - return False - - match original_response[1]: - - case 'PING': - # Sending PONG response to the serveur - self.Protocol.on_server_ping(original_response) - return None - - case 'SLOG': - # self.Base.scan_ports(cmd[7]) - # if self.Config.ABUSEIPDB == 1: - # self.Base.create_thread(self.abuseipdb_scan, (cmd[7], )) - pass - - case 'VERSION': - self.Protocol.on_version_msg(original_response) - - case 'UMODE2': - # [':adator_', 'UMODE2', '-i'] - self.Protocol.on_umode2(serverMsg=original_response) - - case 'SQUIT': - - self.Protocol.on_squit(serverMsg=original_response) - - case 'REPUTATION': - # :001 REPUTATION 127.0.0.1 118 - try: - self.first_connexion_ip = original_response[2] - - self.first_score = 0 - if str(original_response[3]).find('*') != -1: - # If * available, it means that an ircop changed the repurtation score - # means also that the user exist will try to update all users with same IP - self.first_score = int(str(original_response[3]).replace('*','')) - for user in self.User.UID_DB: - if user.remote_ip == self.first_connexion_ip: - user.score_connexion = self.first_score - else: - self.first_score = int(original_response[3]) - - # Possibilité de déclancher les bans a ce niveau. - except IndexError as ie: - self.Logs.error(f'{ie}') - except ValueError as ve: - self.first_score = 0 - self.Logs.error(f'Impossible to convert first_score: {ve}') - - case '320': - #:irc.deb.biz.st 320 PyDefender IRCParis07 :is in security-groups: known-users,webirc-users,tls-and-known-users,tls-users - pass - - case '318': - #:irc.deb.biz.st 318 PyDefender IRCParis93 :End of /WHOIS list. - pass - - case 'MD': - # [':001', 'MD', 'client', '001CG0TG7', 'webirc', ':2'] - pass - - case 'EOS': + case 'SJOIN': + self.Protocol.on_sjoin(serverMsg=original_response) + # print(f"** handle {parsed_protocol}") + case 'EOS': # TODO hsid = str(original_response[0]).replace(':','') if hsid == self.Config.HSID: if self.Config.DEFENDER_INIT == 1: @@ -886,45 +810,9 @@ class Irc: classe_object.cmd(original_response) # Stop here When EOS + # print(f"** handle {parsed_protocol}") return None - case _: - pass - - if len(original_response) < 3: - return False - - match original_response[2]: - - case 'VERSION': - self.Protocol.on_version_msg(original_response) - - case 'QUIT': - - self.Protocol.on_quit(serverMsg=original_response) - - case 'PONG': - # ['@msgid=aTNJhp17kcPboF5diQqkUL;time=2023-12-28T20:35:58.411Z', ':irc.deb.biz.st', 'PONG', 'irc.deb.biz.st', ':Dev-PyDefender'] - self.Base.execute_periodic_action() - - case 'NICK': - - self.Protocol.on_nick(original_response) - - case 'MODE': - #['@msgid=d0ySx56Yd0nc35oHts2SkC-/J9mVUA1hfM6+Z4494xWUg;time=2024-08-09T12:45:36.651Z', - # ':001', 'MODE', '#a', '+nt', '1723207536'] - # [':adator_', 'UMODE2', '-i'] - pass - - case 'SJOIN': - - self.Protocol.on_sjoin(serverMsg=original_response) - - case 'PART': - - self.Protocol.on_part(serverMsg=original_response) - case 'UID': try: self.Protocol.on_uid(serverMsg=original_response) @@ -932,10 +820,76 @@ class Irc: for classe_name, classe_object in self.loaded_classes.items(): classe_object.cmd(original_response) + # print(f"** handle {parsed_protocol}") + except Exception as err: self.Logs.error(f'General Error: {err}') - case 'PRIVMSG': + case 'QUIT': + self.Protocol.on_quit(serverMsg=original_response) + # print(f"** handle {parsed_protocol}") + + case 'PROTOCTL': + self.Protocol.on_protoctl(serverMsg=original_response) + # print(f"** handle {parsed_protocol}") + + case 'SVS2MODE': + # >> [':00BAAAAAG', 'SVS2MODE', '001U01R03', '-r'] + self.Protocol.on_svs2mode(serverMsg=original_response) + # print(f"** handle {parsed_protocol}") + + case 'SQUIT': + self.Protocol.on_squit(serverMsg=original_response) + # print(f"** handle {parsed_protocol}") + + case 'PART': + self.Protocol.on_part(serverMsg=parsed_protocol) + # print(f"** handle {parsed_protocol}") + + case 'VERSION': + self.Protocol.on_version_msg(serverMsg=original_response) + # print(f"** handle {parsed_protocol}") + + case 'UMODE2': + # [':adator_', 'UMODE2', '-i'] + self.Protocol.on_umode2(serverMsg=original_response) + # print(f"** handle {parsed_protocol}") + + case 'NICK': + self.Protocol.on_nick(serverMsg=original_response) + # print(f"** handle {parsed_protocol}") + + case 'REPUTATION': # TODO + # :001 REPUTATION 127.0.0.1 118 + try: + self.first_connexion_ip = original_response[2] + self.first_score = 0 + + if str(original_response[3]).find('*') != -1: + # If * available, it means that an ircop changed the repurtation score + # means also that the user exist will try to update all users with same IP + self.first_score = int(str(original_response[3]).replace('*','')) + for user in self.User.UID_DB: + if user.remote_ip == self.first_connexion_ip: + user.score_connexion = self.first_score + else: + self.first_score = int(original_response[3]) + + # print(f"** handle {parsed_protocol}") + # Possibilité de déclancher les bans a ce niveau. + except IndexError as ie: + self.Logs.error(f'{ie}') + except ValueError as ve: + self.first_score = 0 + self.Logs.error(f'Impossible to convert first_score: {ve}') + + case 'SLOG': # TODO + print(f"** handle {parsed_protocol}") + + case 'MD': # TODO + print(f"** handle {parsed_protocol}") + + case 'PRIVMSG': # TODO try: # Supprimer la premiere valeur cmd = interm_response.copy() @@ -975,7 +929,7 @@ class Irc: self.Base.log_cmd(user_trigger, cmd_to_send) fromchannel = str(cmd[2]).lower() if self.Channel.Is_Channel(cmd[2]) else None - self._hcmds(user_trigger, fromchannel, arg, cmd) + self.hcmds(user_trigger, fromchannel, arg, cmd) if cmd[2] == self.Config.SERVICE_ID: pattern = fr'^:.*?:(.*)$' @@ -1013,13 +967,41 @@ class Irc: if len(arg) >= 2: fromchannel = str(arg[1]).lower() if self.Channel.Is_Channel(arg[1]) else None - self._hcmds(user_trigger, fromchannel, arg, cmd) + self.hcmds(user_trigger, fromchannel, arg, cmd) + # print(f"** handle {parsed_protocol}") except IndexError as io: self.Logs.error(f'{io}') - case _: - pass + case 'PONG': # TODO + print(f"** handle {parsed_protocol}") + + case 'MODE': # TODO + #['@msgid=d0ySx56Yd0nc35oHts2SkC-/J9mVUA1hfM6+Z4494xWUg;time=2024-08-09T12:45:36.651Z', + # ':001', 'MODE', '#a', '+nt', '1723207536'] + # [':adator_', 'UMODE2', '-i'] + print(f"** handle {parsed_protocol}") + + case '320': # TODO + #:irc.deb.biz.st 320 PyDefender IRCParis07 :is in security-groups: known-users,webirc-users,tls-and-known-users,tls-users + print(f"** handle {parsed_protocol}") + + case '318': # TODO + #:irc.deb.biz.st 318 PyDefender IRCParis93 :End of /WHOIS list. + print(f"** handle {parsed_protocol}") + + case None: + print(f"** TO BE HANDLE {original_response}") + + if len(original_response) == 7: + if original_response[2] == 'PRIVMSG' and original_response[4] == ':auth': + data_copy = original_response.copy() + data_copy[6] = '**********' + self.Logs.debug(f">> {data_copy}") + else: + self.Logs.debug(f">> {original_response}") + else: + self.Logs.debug(f">> {original_response}") if original_response[2] != 'UID': # Envoyer la commande aux classes dynamiquement chargées @@ -1032,8 +1014,8 @@ class Irc: self.Logs.error(f"General Error: {err}") self.Logs.error(f"General Error: {traceback.format_exc()}") - def _hcmds(self, user: str, channel: Union[str, None], cmd: list, fullcmd: list = []) -> None: - """_summary_ + def hcmds(self, user: str, channel: Union[str, None], cmd: list, fullcmd: list = []) -> None: + """Create Args: user (str): The user who sent the query @@ -1064,7 +1046,7 @@ class Irc: # Envoyer la commande aux classes dynamiquement chargées if command != 'notallowed': for classe_name, classe_object in self.loaded_classes.items(): - classe_object._hcmds(user, channel, cmd, fullcmd) + classe_object.hcmds(user, channel, cmd, fullcmd) match command: diff --git a/mods/mod_clone.py b/mods/mod_clone.py index aa826f0..01f21a4 100644 --- a/mods/mod_clone.py +++ b/mods/mod_clone.py @@ -306,7 +306,7 @@ class Clone(): except Exception as err: self.Logs.error(f'General Error: {err}') - def _hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None: + def hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None: try: command = str(cmd[0]).lower() diff --git a/mods/mod_command.py b/mods/mod_command.py index 919c98e..867399a 100644 --- a/mods/mod_command.py +++ b/mods/mod_command.py @@ -1,10 +1,10 @@ from typing import Union, TYPE_CHECKING from dataclasses import dataclass -from core.classes import user - if TYPE_CHECKING: from core.irc import Irc + from core.definition import MUser + from sqlalchemy import CursorResult, Row, Sequence class Command: @@ -246,7 +246,7 @@ class Command: return None user_uid = self.User.clean_uid(cmd[5]) - userObj = self.User.get_User(user_uid) + userObj: MUser = self.User.get_User(user_uid) channel_name = cmd[4] if self.Channel.Is_Channel(cmd[4]) else None if userObj is None: @@ -255,7 +255,7 @@ class Command: if 'r' not in userObj.umodes: return None - db_data = {"nickname": userObj.nickname, "channel": channel_name} + db_data: dict[str, str] = {"nickname": userObj.nickname, "channel": channel_name} db_query = self.Base.db_execute_query("SELECT id, mode FROM command_automode WHERE nickname = :nickname AND channel = :channel", db_data) db_result = db_query.fetchone() if db_result is not None: @@ -264,11 +264,10 @@ class Command: except KeyError as ke: self.Logs.error(f"Key Error: {err}") - except Exception as err: self.Logs.error(f"General Error: {err}") - def _hcmds(self, uidornickname: str, channel_name: Union[str, None], cmd: list, fullcmd: list = []) -> None: + def hcmds(self, uidornickname: str, channel_name: Union[str, None], cmd: list, fullcmd: list = []) -> None: command = str(cmd[0]).lower() dnickname = self.Config.SERVICE_NICKNAME @@ -280,65 +279,95 @@ class Command: match command: case "automode": - # automode nickname +/-mode #channel - # automode adator +o #channel + # automode set nickname [+/-mode] #channel + # automode set adator +o #channel try: + option: str = str(cmd[1]).lower() + match option: + case 'set': + if len(cmd) < 5: + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} [nickname] [+/-mode] [#channel]") + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"AutoModes available: {' / '.join(allowed_modes)}") + return None - allowed_modes = ['q','a','o','h','v'] - userObj = self.User.get_User(str(cmd[1])) + allowed_modes: list[str] = self.Base.Settings.PROTOCTL_PREFIX # ['q','a','o','h','v'] + # userObj: MUser = self.User.get_User(str(cmd[2])) + nickname = str(cmd[2]) + mode = str(cmd[3]) + chan: str = str(cmd[4]).lower() if self.Channel.Is_Channel(cmd[4]) else None + sign = mode[0] if mode.startswith( ('+', '-')) else None + clean_mode = mode[1:] if len(mode) > 0 else None - if len(cmd) != 4: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} [nickname] [+/-mode] [#channel]") - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"AutoModes available: {' / '.join(allowed_modes)}") - return None + if sign is None: + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="You must provide the flag mode + or -") + return None - mode = str(cmd[2]) - flag_found: bool = False - if mode.startswith( ('+', '-') ): - flag_found = True + if clean_mode not in allowed_modes: + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You should use one of those modes {' / '.join(allowed_modes)}") + return None - if not flag_found: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="You must provide the flag mode + or -") - return None + if chan is None: + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You should use one of those modes {' / '.join(allowed_modes)}") + return None - mode = str(cmd[2]) - chan = cmd[3] if self.Channel.Is_Channel(cmd[3]) else None + db_data: dict[str, str] = {"nickname": nickname, "channel": chan} + db_query = self.Base.db_execute_query(query="SELECT id FROM command_automode WHERE nickname = :nickname and channel = :channel", params=db_data) + db_result = db_query.fetchone() - if mode.lstrip('+-') not in allowed_modes: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You should use one of those modes {' / '.join(allowed_modes)}") - return None + if db_result is not None: + if sign == '+': + db_data = {"updated_on": self.Base.get_datetime(), "nickname": nickname, "channel": chan, "mode": mode} + db_result = self.Base.db_execute_query(query="UPDATE command_automode SET mode = :mode, updated_on = :updated_on WHERE nickname = :nickname and channel = :channel", + params=db_data) + if db_result.rowcount > 0: + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Automode {mode} edited for {nickname} in {chan}") + elif sign == '-': + db_data = {"nickname": nickname, "channel": chan, "mode": f"+{clean_mode}"} + db_result = self.Base.db_execute_query(query="DELETE FROM command_automode WHERE nickname = :nickname and channel = :channel and mode = :mode", + params=db_data) + if db_result.rowcount > 0: + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Automode {mode} deleted for {nickname} in {chan}") + else: + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"The mode [{mode}] has not been found for {nickname} in channel {chan}") - if chan is None: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You should use one of those modes {' / '.join(allowed_modes)}") - return None + return None - db_data: dict[str, str] = {"nickname": userObj.nickname, "channel": chan} - db_query = self.Base.db_execute_query(query="SELECT id FROM command_automode WHERE nickname = :nickname and channel = :channel", params=db_data) - db_result = db_query.fetchone() + # Instert a new automode + if sign == '+': + db_data = {"created_on": self.Base.get_datetime(), "updated_on": self.Base.get_datetime(), "nickname": nickname, "channel": chan, "mode": mode} + db_query = self.Base.db_execute_query( + query="INSERT INTO command_automode (created_on, updated_on, nickname, channel, mode) VALUES (:created_on, :updated_on, :nickname, :channel, :mode)", + params=db_data + ) - if db_result is not None: - db_data = {"updated_on": self.Base.get_datetime(), "nickname": userObj.nickname, "channel": chan, "mode": mode} - db_result = self.Base.db_execute_query(query="UPDATE command_automode SET mode = :mode, updated_on = :updated_on WHERE nickname = :nickname and channel = :channel", - params=db_data) - if db_result.rowcount > 0: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Automode {mode} edited for {userObj.nickname} in {chan}") + if db_query.rowcount > 0: + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Automode {mode} applied to {nickname} in {chan}") + self.Protocol.send2socket(f":{service_id} MODE {chan} {mode} {nickname}") + else: + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"AUTOMODE {mode} cannot be added to {nickname} in {chan} because it doesn't exist") - return None + case 'list': + db_query: CursorResult = self.Base.db_execute_query("SELECT nickname, channel, mode FROM command_automode") + db_results: Sequence[Row] = db_query.fetchall() - # Instert a new automode - db_data = {"created_on": self.Base.get_datetime(), "updated_on": self.Base.get_datetime(), "nickname": userObj.nickname, "channel": chan, "mode": mode} - db_query = self.Base.db_execute_query( - query="INSERT INTO command_automode (created_on, updated_on, nickname, channel, mode) VALUES (:created_on, :updated_on, :nickname, :channel, :mode)", - params=db_data - ) + if not db_results: + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, + msg="There is no automode to display.") - if db_query.rowcount > 0: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Automode {mode} applied to {userObj.nickname} in {chan}") - self.Protocol.send2socket(f":{service_id} MODE {chan} {mode} {userObj.nickname}") + for db_result in db_results: + db_nickname, db_channel, db_mode = db_result + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, + msg=f"Nickname: {db_nickname} | Channel: {db_channel} | Mode: {db_mode}") - except KeyError: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} [nickname] [+/-mode] [#channel]") - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"AutoModes available: {' / '.join(allowed_modes)}") + case _: + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} SET [nickname] [+/-mode] [#channel]") + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} LIST") + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"[AUTOMODES AVAILABLE] are {' / '.join(allowed_modes)}") + + except IndexError: + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} SET [nickname] [+/-mode] [#channel]") + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} LIST") + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"[AUTOMODES AVAILABLE] are {' / '.join(self.Base.Settings.PROTOCTL_PREFIX)}") except Exception as err: self.Logs.error(f"General Error: {err}") diff --git a/mods/mod_defender.py b/mods/mod_defender.py index a0bd33d..3305695 100644 --- a/mods/mod_defender.py +++ b/mods/mod_defender.py @@ -18,7 +18,7 @@ import core.definition as df # 4 . Créer vos tables, en utilisant toujours le nom des votre classe en minuscule ==> defender_votre-table # 3. Methode suivantes: # cmd(self, data:list) -# _hcmds(self, user:str, cmd: list) +# hcmds(self, user:str, cmd: list) # unload(self) if TYPE_CHECKING: @@ -990,7 +990,7 @@ class Defender(): match cmd[1]: case 'REPUTATION': - # :001 REPUTATION 91.168.141.239 118 + # :001 REPUTATION 8.8.8.8 118 try: self.reputation_first_connexion['ip'] = cmd[2] self.reputation_first_connexion['score'] = cmd[3] @@ -1183,7 +1183,7 @@ class Defender(): except Exception as err: self.Logs.error(f"General Error: {err}") - def _hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None: + def hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None: command = str(cmd[0]).lower() fromuser = user @@ -1213,12 +1213,12 @@ class Defender(): case 'show_reputation': if not self.Reputation.UID_REPUTATION_DB: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=" No one is suspected") + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="No one is suspected") for suspect in self.Reputation.UID_REPUTATION_DB: self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, - msg=f" Uid: {suspect.uid} | Nickname: {suspect.nickname} | Reputation: {suspect.score_connexion} | Secret code: {suspect.secret_code} | Connected on: {suspect.connected_datetime}") + msg=f" Uid: {suspect.uid} | Nickname: {suspect.nickname} | Reputation: {suspect.score_connexion} | Secret code: {suspect.secret_code} | Connected on: {suspect.connexion_datetime}") case 'code': try: diff --git a/mods/mod_jsonrpc.py b/mods/mod_jsonrpc.py index 1a1d660..8b7d053 100644 --- a/mods/mod_jsonrpc.py +++ b/mods/mod_jsonrpc.py @@ -208,7 +208,7 @@ class Jsonrpc(): return None - def _hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None: + def hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None: command = str(cmd[0]).lower() dnickname = self.Config.SERVICE_NICKNAME diff --git a/mods/mod_test.py b/mods/mod_test.py index 3b99a73..f18d038 100644 --- a/mods/mod_test.py +++ b/mods/mod_test.py @@ -147,7 +147,7 @@ class Test(): except Exception as err: self.Logs.error(f"General Error: {err}") - def _hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None: + def hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None: command = str(cmd[0]).lower() dnickname = self.Config.SERVICE_NICKNAME diff --git a/mods/mod_votekick.py b/mods/mod_votekick.py index d16a63d..2e95b11 100644 --- a/mods/mod_votekick.py +++ b/mods/mod_votekick.py @@ -274,7 +274,7 @@ class Votekick(): except Exception as err: self.Logs.error(f"General Error: {err}") - def _hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None: + def hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None: # cmd is the command starting from the user command # full cmd is sending the entire server response