from dataclasses import dataclass from typing import Any from core.classes.interfaces.imodule import IModule import mods.defender.schemas as schemas import mods.defender.utils as utils import mods.defender.threads as thds from core.utils import tr class Defender(IModule): @dataclass class ModConfModel(schemas.ModConfModel): ... MOD_HEADER: dict[str, str] = { 'name':'Defender', 'version':'1.0.0', 'description':'Defender main module that uses the reputation security.', 'author':'Defender Team', 'core_version':'Defender-6' } def create_tables(self) -> None: """Methode qui va créer la base de donnée si elle n'existe pas. Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module Args: database_name (str): Nom de la base de données ( pas d'espace dans le nom ) Returns: None: Aucun retour n'es attendu """ # table_autoop = '''CREATE TABLE IF NOT EXISTS defender_autoop ( # id INTEGER PRIMARY KEY AUTOINCREMENT, # datetime TEXT, # nickname TEXT, # channel TEXT # ) # ''' # self.Base.db_execute_query(table_autoop) # self.Base.db_execute_query(table_config) # self.Base.db_execute_query(table_trusted) return None async def load(self): # Add module schemas self.Schemas = schemas # Add utils functions self.Utils = utils # Variable qui va contenir les options de configuration du module Defender self.ModConfig: schemas.ModConfModel = self.ModConfModel() # Create module commands (Mandatory) self.Irc.build_command(0, self.module_name, 'code', 'Display the code or key for access') self.Irc.build_command(1, self.module_name, 'info', 'Provide information about the channel or server') self.Irc.build_command(1, self.module_name, 'autolimit', 'Automatically set channel user limits') self.Irc.build_command(3, self.module_name, 'reputation', 'Check or manage user reputation') self.Irc.build_command(3, self.module_name, 'proxy_scan', 'Scan users for proxy connections') self.Irc.build_command(3, self.module_name, 'flood', 'Handle flood detection and mitigation') self.Irc.build_command(3, self.module_name, 'status', 'Check the status of the server or bot') self.Irc.build_command(3, self.module_name, 'show_reputation', 'Display reputation information') self.Irc.build_command(3, self.module_name, 'sentinel', 'Monitor and guard the channel or server') self.timeout = self.Config.API_TIMEOUT # Listes qui vont contenir les ip a scanner avec les différentes API self.Schemas.DB_ABUSEIPDB_USERS = self.Schemas.DB_FREEIPAPI_USERS = self.Schemas.DB_CLOUDFILT_USERS = [] self.Schemas.DB_PSUTIL_USERS = self.Schemas.DB_LOCALSCAN_USERS = [] # Variables qui indique que les threads sont en cours d'éxecutions self.abuseipdb_isRunning = self.freeipapi_isRunning = self.cloudfilt_isRunning = True self.psutil_isRunning = self.localscan_isRunning = self.reputationTimer_isRunning = True self.autolimit_isRunning = True # Variable qui va contenir les users self.flood_system = {} # Contient les premieres informations de connexion self.reputation_first_connexion = {'ip': '', 'score': -1} # Laisser vide si aucune clé self.abuseipdb_key = '13c34603fee4d2941a2c443cc5c77fd750757ca2a2c1b304bd0f418aff80c24be12651d1a3cfe674' self.cloudfilt_key = 'r1gEtjtfgRQjtNBDMxsg' # Démarrer les threads pour démarrer les api self.Base.create_thread(func=thds.thread_freeipapi_scan, func_args=(self, )) self.Base.create_thread(func=thds.thread_cloudfilt_scan, func_args=(self, )) self.Base.create_thread(func=thds.thread_abuseipdb_scan, func_args=(self, )) self.Base.create_thread(func=thds.thread_local_scan, func_args=(self, )) self.Base.create_thread(func=thds.thread_psutil_scan, func_args=(self, )) self.Base.create_thread(func=thds.thread_apply_reputation_sanctions, func_args=(self, )) if self.ModConfig.autolimit == 1: self.Base.create_thread(func=thds.thread_autolimit, func_args=(self, )) if self.ModConfig.reputation == 1: await self.Protocol.send_sjoin(self.Config.SALON_JAIL) await self.Protocol.send2socket(f":{self.Config.SERVICE_NICKNAME} SAMODE {self.Config.SALON_JAIL} +o {self.Config.SERVICE_NICKNAME}") def __onload(self): abuseipdb = self.Settings.get_cache('ABUSEIPDB') freeipapi = self.Settings.get_cache('FREEIPAPI') cloudfilt = self.Settings.get_cache('CLOUDFILT') psutils = self.Settings.get_cache('PSUTIL') localscan = self.Settings.get_cache('LOCALSCAN') if abuseipdb: self.Schemas.DB_ABUSEIPDB_USERS = abuseipdb if freeipapi: self.Schemas.DB_FREEIPAPI_USERS = freeipapi if cloudfilt: self.Schemas.DB_CLOUDFILT_USERS = cloudfilt if psutils: self.Schemas.DB_PSUTIL_USERS = psutils if localscan: self.Schemas.DB_LOCALSCAN_USERS = localscan def unload(self) -> None: """Cette methode sera executée a chaque désactivation ou rechargement de module """ self.Schemas.DB_ABUSEIPDB_USERS = [] self.Schemas.DB_FREEIPAPI_USERS = [] self.Schemas.DB_CLOUDFILT_USERS = [] self.Schemas.DB_PSUTIL_USERS = [] self.Schemas.DB_LOCALSCAN_USERS = [] self.abuseipdb_isRunning:bool = False self.freeipapi_isRunning:bool = False self.cloudfilt_isRunning:bool = False self.psutil_isRunning:bool = False self.localscan_isRunning:bool = False self.reputationTimer_isRunning:bool = False self.autolimit_isRunning: bool = False self.Irc.Commands.drop_command_by_module(self.module_name) return None def insert_db_trusted(self, uid: str, nickname:str) -> None: u = self.User.get_user(uid) if u is None: return None uid = u.uid nickname = u.nickname query = "SELECT id FROM def_trusted WHERE user = ?" exec_query = self.Base.db_execute_query(query, {"user": nickname}) response = exec_query.fetchone() if response is not None: q_insert = "INSERT INTO def_trusted (datetime, user, host, vhost) VALUES (?, ?, ?, ?)" mes_donnees = {'datetime': self.Loader.Utils.get_datetime(), 'user': nickname, 'host': '*', 'vhost': '*'} exec_query = self.Base.db_execute_query(q_insert, mes_donnees) pass async def join_saved_channels(self) -> None: """_summary_ """ try: result = self.Base.db_execute_query(f"SELECT distinct channel_name FROM {self.Config.TABLE_CHANNEL}") channels = result.fetchall() jail_chan = self.Config.SALON_JAIL jail_chan_mode = self.Config.SALON_JAIL_MODES service_id = self.Config.SERVICE_ID dumodes = self.Config.SERVICE_UMODES dnickname = self.Config.SERVICE_NICKNAME for channel in channels: chan = channel[0] await self.Protocol.send_sjoin(chan) if chan == jail_chan: await self.Protocol.send2socket(f":{service_id} SAMODE {jail_chan} +{dumodes} {dnickname}") await self.Protocol.send2socket(f":{service_id} MODE {jail_chan} +{jail_chan_mode}") return None except Exception as err: self.Logs.error(f"General Error: {err}") def cmd(self, data: list[str]) -> None: if not data or len(data) < 2: return None cmd = data.copy() if isinstance(data, list) else list(data).copy() try: index, command = self.Irc.Protocol.get_ircd_protocol_poisition(cmd) if index == -1: return None match command: case 'REPUTATION': self.Utils.handle_on_reputation(self, cmd) return None case 'MODE': self.Utils.handle_on_mode(self, cmd) return None case 'PRIVMSG': self.Utils.handle_on_privmsg(self, cmd) return None case 'UID': self.Utils.handle_on_uid(self, cmd) return None case 'SJOIN': self.Utils.handle_on_sjoin(self, cmd) return None case 'SLOG': self.Utils.handle_on_slog(self, cmd) return None case 'NICK': self.Utils.handle_on_nick(self, cmd) return None case 'QUIT': self.Utils.handle_on_quit(self, cmd) return None case _: return None except KeyError as ke: self.Logs.error(f"{ke} / {cmd} / length {str(len(cmd))}") except IndexError as ie: self.Logs.error(f"{ie} / {cmd} / length {str(len(cmd))}") except Exception as err: self.Logs.error(f"General Error: {err}", exc_info=True) async def hcmds(self, user: str, channel: Any, cmd: list, fullcmd: list = []) -> None: u = self.User.get_user(user) if u is None: return None command = str(cmd[0]).lower() fromuser = u.nickname channel = fromchannel = channel if self.Channel.is_valid_channel(channel) else None dnickname = self.Config.SERVICE_NICKNAME # Defender nickname dchanlog = self.Config.SERVICE_CHANLOG # Defender chan log dumodes = self.Config.SERVICE_UMODES # Les modes de Defender service_id = self.Config.SERVICE_ID # Defender serveur id jail_chan = self.Config.SALON_JAIL # Salon pot de miel jail_chan_mode = self.Config.SALON_JAIL_MODES # Mode du salon "pot de miel" match command: case 'show_reputation': if not self.Reputation.UID_REPUTATION_DB: await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="No one is suspected") for suspect in self.Reputation.UID_REPUTATION_DB: await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Uid: {suspect.uid} | Nickname: {suspect.nickname} | Reputation: {suspect.score_connexion} | Secret code: {suspect.secret_code} | Connected on: {suspect.connexion_datetime}") case 'code': try: release_code = cmd[1] jailed_nickname = u.nickname jailed_UID = u.uid get_reputation = self.Reputation.get_reputation(jailed_UID) if get_reputation is None: await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=" No code is requested ...") return False jailed_IP = get_reputation.remote_ip jailed_salon = self.Config.SALON_JAIL reputation_seuil = self.ModConfig.reputation_seuil welcome_salon = self.Config.SALON_LIBERER self.Logs.debug(f"IP de {jailed_nickname} : {jailed_IP}") link = self.Config.SERVEUR_LINK color_green = self.Config.COLORS.green color_black = self.Config.COLORS.black if release_code == get_reputation.secret_code: await self.Protocol.send_priv_msg(nick_from=dnickname, msg="Bon mot de passe. Allez du vent !", channel=jailed_salon) if self.ModConfig.reputation_ban_all_chan == 1: for chan in self.Channel.UID_CHANNEL_DB: if chan.name != jailed_salon: await self.Protocol.send2socket(f":{service_id} MODE {chan.name} -b {jailed_nickname}!*@*") self.Reputation.delete(jailed_UID) self.Logs.debug(f'{jailed_UID} - {jailed_nickname} removed from REPUTATION_DB') await self.Protocol.send_sapart(nick_to_sapart=jailed_nickname, channel_name=jailed_salon) await self.Protocol.send_sajoin(nick_to_sajoin=jailed_nickname, channel_name=welcome_salon) await self.Protocol.send2socket(f":{link} REPUTATION {jailed_IP} {self.ModConfig.reputation_score_after_release}") u.score_connexion = reputation_seuil + 1 await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[{color_green} MOT DE PASS CORRECT {color_black}] : You have now the right to enjoy the network !", nick_to=jailed_nickname) else: await self.Protocol.send_priv_msg( nick_from=dnickname, msg="Mauvais password", channel=jailed_salon ) await self.Protocol.send_priv_msg( nick_from=dnickname, msg=f"[{color_green} MAUVAIS PASSWORD {color_black}] You have typed a wrong code. for recall your password is: {self.Config.SERVICE_PREFIX}code {get_reputation.secret_code}", nick_to=jailed_nickname ) except IndexError as ie: self.Logs.error(f'Index Error: {ie}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} code [code]") except KeyError as ke: self.Logs.error(f'_hcmd code: KeyError {ke}') case 'autolimit': try: # autolimit on # autolimit set [amount] [interval] if len(cmd) < 2: await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} ON") await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} SET [AMOUNT] [INTERVAL]") return None arg = str(cmd[1]).lower() match arg: case 'on': if self.ModConfig.autolimit == 0: self.update_configuration('autolimit', 1) self.autolimit_isRunning = True self.Base.create_thread(func=thds.thread_autolimit, func_args=(self, )) await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[{self.Config.COLORS.green}AUTOLIMIT{self.Config.COLORS.nogc}] Activated", channel=self.Config.SERVICE_CHANLOG) else: await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[{self.Config.COLORS.red}AUTOLIMIT{self.Config.COLORS.nogc}] Already activated", channel=self.Config.SERVICE_CHANLOG) case 'off': if self.ModConfig.autolimit == 1: self.update_configuration('autolimit', 0) await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[{self.Config.COLORS.green}AUTOLIMIT{self.Config.COLORS.nogc}] Deactivated", channel=self.Config.SERVICE_CHANLOG) else: await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[{self.Config.COLORS.red}AUTOLIMIT{self.Config.COLORS.nogc}] Already Deactivated", channel=self.Config.SERVICE_CHANLOG) case 'set': amount = int(cmd[2]) interval = int(cmd[3]) self.update_configuration('autolimit_amount', amount) self.update_configuration('autolimit_interval', interval) await self.Protocol.send_priv_msg( nick_from=dnickname, msg=f"[{self.Config.COLORS.green}AUTOLIMIT{self.Config.COLORS.nogc}] Amount set to ({amount}) | Interval set to ({interval})", channel=self.Config.SERVICE_CHANLOG ) case _: await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} ON") await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} SET [AMOUNT] [INTERVAL]") except Exception as err: await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} ON") await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} SET [AMOUNT] [INTERVAL]") self.Logs.error(f"Value Error -> {err}") case 'reputation': # .reputation [on/off] --> activate or deactivate reputation system # .reputation set banallchan [on/off] --> activate or deactivate ban in all channel # .reputation set limit [xxxx] --> change the reputation threshold # .reputation release [nick] # .reputation [arg1] [arg2] [arg3] try: len_cmd = len(cmd) if len_cmd < 2: raise IndexError("Showing help!") activation = str(cmd[1]).lower() # Nous sommes dans l'activation ON / OFF if len_cmd == 2: key = 'reputation' if activation == 'on': if self.ModConfig.reputation == 1: await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}REPUTATION{self.Config.COLORS.black} ] : Already activated", channel=dchanlog) return False # self.update_db_configuration('reputation', 1) self.update_configuration(key, 1) await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}REPUTATION{self.Config.COLORS.black} ] : Activated by {fromuser}", channel=dchanlog) await self.Protocol.send_join_chan(uidornickname=dnickname, channel=jail_chan) await self.Protocol.send2socket(f":{service_id} SAMODE {jail_chan} +{dumodes} {dnickname}") await self.Protocol.send2socket(f":{service_id} MODE {jail_chan} +{jail_chan_mode}") if self.ModConfig.reputation_sg == 1: for chan in self.Channel.UID_CHANNEL_DB: if chan.name != jail_chan: await self.Protocol.send2socket(f":{service_id} MODE {chan.name} +b ~security-group:unknown-users") await self.Protocol.send2socket(f":{service_id} MODE {chan.name} +eee ~security-group:webirc-users ~security-group:known-users ~security-group:websocket-users") self.Channel.db_query_channel('add', self.module_name, jail_chan) if activation == 'off': if self.ModConfig.reputation == 0: await self.Protocol.send_priv_msg( nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}REPUTATION{self.Config.COLORS.black} ] : Already deactivated", channel=dchanlog ) return False self.update_configuration(key, 0) await self.Protocol.send_priv_msg( nick_from=dnickname, msg=f"[ {self.Config.COLORS.red}REPUTATION{self.Config.COLORS.black} ] : Deactivated by {fromuser}", channel=dchanlog ) await self.Protocol.send2socket(f":{service_id} SAMODE {jail_chan} -{dumodes} {dnickname}") await self.Protocol.send2socket(f":{service_id} MODE {jail_chan} -sS") await self.Protocol.send2socket(f":{service_id} PART {jail_chan}") for chan in self.Channel.UID_CHANNEL_DB: if chan.name != jail_chan: await self.Protocol.send2socket(f":{service_id} MODE {chan.name} -b ~security-group:unknown-users") await self.Protocol.send2socket(f":{service_id} MODE {chan.name} -eee ~security-group:webirc-users ~security-group:known-users ~security-group:websocket-users") self.Channel.db_query_channel('del', self.module_name, jail_chan) if len_cmd == 3: get_options = str(cmd[1]).lower() match get_options: case 'release': # .reputation release [nick] p = await self.Protocol link = self.Config.SERVEUR_LINK jailed_salon = self.Config.SALON_JAIL welcome_salon = self.Config.SALON_LIBERER client_obj = self.User.get_user(str(cmd[2])) if client_obj is None: p.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"This nickname ({str(cmd[2])}) is not connected to the network!") return None client_to_release = self.Reputation.get_reputation(client_obj.uid) if client_to_release is None: p.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"This nickname ({str(cmd[2])}) doesn't exist in the reputation databalse!") return None if self.Reputation.delete(client_to_release.uid): p.send_priv_msg( nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}REPUTATION RELEASE{self.Config.COLORS.black} ] : {client_to_release.nickname} has been released", channel=dchanlog) p.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"This nickname has been released from reputation system") p.send_notice(nick_from=dnickname, nick_to=client_to_release.nickname, msg=f"You have been released from the reputation system by ({fromuser})") p.send_sapart(nick_to_sapart=client_to_release.nickname, channel_name=jailed_salon) p.send_sajoin(nick_to_sajoin=client_to_release.nickname, channel_name=welcome_salon) p.send2socket(f":{link} REPUTATION {client_to_release.remote_ip} {self.ModConfig.reputation_score_after_release}") return None else: p.send_priv_msg( nick_from=dnickname, msg=f"[ {self.Config.COLORS.red}REPUTATION RELEASE ERROR{self.Config.COLORS.black} ] : " f"{client_to_release.nickname} has not been released! as he is not in the reputation database", channel=dchanlog ) if len_cmd > 4: get_set = str(cmd[1]).lower() if get_set != 'set': raise IndexError('Showing help') get_options = str(cmd[2]).lower() match get_options: case 'banallchan': key = 'reputation_ban_all_chan' get_value = str(cmd[3]).lower() if get_value == 'on': if self.ModConfig.reputation_ban_all_chan == 1: await self.Protocol.send_priv_msg( nick_from=dnickname, msg=f"[ {self.Config.COLORS.red}BAN ON ALL CHANS{self.Config.COLORS.black} ] : Already activated", channel=dchanlog ) return False # self.update_db_configuration(key, 1) self.update_configuration(key, 1) await self.Protocol.send_priv_msg( nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}BAN ON ALL CHANS{self.Config.COLORS.black} ] : Activated by {fromuser}", channel=dchanlog ) elif get_value == 'off': if self.ModConfig.reputation_ban_all_chan == 0: await self.Protocol.send_priv_msg( nick_from=dnickname, msg=f"[ {self.Config.COLORS.red}BAN ON ALL CHANS{self.Config.COLORS.black} ] : Already deactivated", channel=dchanlog ) return False # self.update_db_configuration(key, 0) self.update_configuration(key, 0) await self.Protocol.send_priv_msg( nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}BAN ON ALL CHANS{self.Config.COLORS.black} ] : Deactivated by {fromuser}", channel=dchanlog ) case 'limit': reputation_seuil = int(cmd[3]) key = 'reputation_seuil' # self.update_db_configuration(key, reputation_seuil) self.update_configuration(key, reputation_seuil) await self.Protocol.send_priv_msg( nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}REPUTATION SEUIL{self.Config.COLORS.black} ] : Limit set to {str(reputation_seuil)} by {fromuser}", channel=dchanlog ) await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Reputation set to {reputation_seuil}") case 'timer': reputation_timer = int(cmd[3]) key = 'reputation_timer' self.update_configuration(key, reputation_timer) await self.Protocol.send_priv_msg( nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}REPUTATION TIMER{self.Config.COLORS.black} ] : Timer set to {str(reputation_timer)} minute(s) by {fromuser}", channel=dchanlog ) await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Reputation set to {reputation_timer}") case 'score_after_release': reputation_score_after_release = int(cmd[3]) key = 'reputation_score_after_release' self.update_configuration(key, reputation_score_after_release) await self.Protocol.send_priv_msg( nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}REPUTATION SCORE AFTER RELEASE{self.Config.COLORS.black} ] : Reputation score after release set to {str(reputation_score_after_release)} by {fromuser}", channel=dchanlog ) await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Reputation score after release set to {reputation_score_after_release}") case 'security_group': reputation_sg = int(cmd[3]) key = 'reputation_sg' self.update_configuration(key, reputation_sg) await self.Protocol.send_priv_msg( nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}REPUTATION SECURITY-GROUP{self.Config.COLORS.black} ] : Reputation Security-group set to {str(reputation_sg)} by {fromuser}", channel=dchanlog ) await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Reputation score after release set to {reputation_sg}") case _: await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation [ON/OFF]") await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation release [nickname]") await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set banallchan [ON/OFF]") await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set limit [1234]") await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set score_after_release [1234]") await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set timer [1234]") await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set action [kill|None]") except IndexError as ie: self.Logs.warning(f'{ie}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation [ON/OFF]") await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation release [nickname]") await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set banallchan [ON/OFF]") await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set limit [1234]") await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set score_after_release [1234]") await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set timer [1234]") await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set action [kill|None]") except ValueError as ve: self.Logs.warning(f'{ve}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=" La valeur devrait etre un entier >= 0") case 'proxy_scan': # .proxy_scan set local_scan on/off --> Va activer le scan des ports # .proxy_scan set psutil_scan on/off --> Active les informations de connexion a la machine locale # .proxy_scan set abuseipdb_scan on/off --> Active le scan via l'api abuseipdb len_cmd = len(cmd) color_green = self.Config.COLORS.green color_red = self.Config.COLORS.red color_black = self.Config.COLORS.black if len_cmd == 4: set_key = str(cmd[1]).lower() if set_key != 'set': await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set local_scan [ON/OFF]') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set psutil_scan [ON/OFF]') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set abuseipdb_scan [ON/OFF]') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set freeipapi_scan [ON/OFF]') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set cloudfilt_scan [ON/OFF]') option = str(cmd[2]).lower() # => local_scan, psutil_scan, abuseipdb_scan action = str(cmd[3]).lower() # => on / off match option: case 'local_scan': if action == 'on': if self.ModConfig.local_scan == 1: await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog) return None self.update_configuration(option, 1) await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog) elif action == 'off': if self.ModConfig.local_scan == 0: await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog) return None self.update_configuration(option, 0) await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog) case 'psutil_scan': if action == 'on': if self.ModConfig.psutil_scan == 1: await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog) return None self.update_configuration(option, 1) await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog) elif action == 'off': if self.ModConfig.psutil_scan == 0: await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog) return None self.update_configuration(option, 0) await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog) case 'abuseipdb_scan': if action == 'on': if self.ModConfig.abuseipdb_scan == 1: await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog) return None self.update_configuration(option, 1) await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog) elif action == 'off': if self.ModConfig.abuseipdb_scan == 0: await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog) return None self.update_configuration(option, 0) await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog) case 'freeipapi_scan': if action == 'on': if self.ModConfig.freeipapi_scan == 1: await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog) return None self.update_configuration(option, 1) await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog) elif action == 'off': if self.ModConfig.freeipapi_scan == 0: await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog) return None self.update_configuration(option, 0) await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog) case 'cloudfilt_scan': if action == 'on': if self.ModConfig.cloudfilt_scan == 1: await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog) return None self.update_configuration(option, 1) await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog) elif action == 'off': if self.ModConfig.cloudfilt_scan == 0: await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog) return None self.update_configuration(option, 0) await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog) case _: await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set local_scan [ON/OFF]') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set psutil_scan [ON/OFF]') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set abuseipdb_scan [ON/OFF]') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set freeipapi_scan [ON/OFF]') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set cloudfilt_scan [ON/OFF]') else: await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set local_scan [ON/OFF]') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set psutil_scan [ON/OFF]') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set abuseipdb_scan [ON/OFF]') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set freeipapi_scan [ON/OFF]') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set cloudfilt_scan [ON/OFF]') case 'flood': # .flood on/off # .flood set flood_message 5 # .flood set flood_time 1 # .flood set flood_timer 20 try: len_cmd = len(cmd) if len_cmd == 2: activation = str(cmd[1]).lower() key = 'flood' if activation == 'on': if self.ModConfig.flood == 1: await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Already activated", channel=dchanlog) return False self.update_configuration(key, 1) await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Activated by {fromuser}", channel=dchanlog) if activation == 'off': if self.ModConfig.flood == 0: await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.red}FLOOD{self.Config.COLORS.black} ] : Already Deactivated", channel=dchanlog) return False self.update_configuration(key, 0) await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Deactivated by {fromuser}", channel=dchanlog) if len_cmd == 4: set_key = str(cmd[2]).lower() if str(cmd[1]).lower() == 'set': match set_key: case 'flood_message': key = 'flood_message' set_value = int(cmd[3]) self.update_configuration(key, set_value) await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Flood message set to {set_value} by {fromuser}", channel=dchanlog) case 'flood_time': key = 'flood_time' set_value = int(cmd[3]) self.update_configuration(key, set_value) await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Flood time set to {set_value} by {fromuser}", channel=dchanlog) case 'flood_timer': key = 'flood_timer' set_value = int(cmd[3]) self.update_configuration(key, set_value) await self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Flood timer set to {set_value} by {fromuser}", channel=dchanlog) case _: pass except ValueError as ve: self.Logs.error(f"{self.__class__.__name__} Value Error : {ve}") case 'status': color_green = self.Config.COLORS.green color_red = self.Config.COLORS.red color_black = self.Config.COLORS.black nogc = self.Config.COLORS.nogc try: await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' [{color_green if self.ModConfig.reputation == 1 else color_red}Reputation{nogc}] ==> {self.ModConfig.reputation}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' reputation_seuil ==> {self.ModConfig.reputation_seuil}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' reputation_after_release ==> {self.ModConfig.reputation_score_after_release}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' reputation_ban_all_chan ==> {self.ModConfig.reputation_ban_all_chan}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' reputation_timer ==> {self.ModConfig.reputation_timer}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=' [Proxy_scan]') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' {color_green if self.ModConfig.local_scan == 1 else color_red}local_scan{nogc} ==> {self.ModConfig.local_scan}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' {color_green if self.ModConfig.psutil_scan == 1 else color_red}psutil_scan{nogc} ==> {self.ModConfig.psutil_scan}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' {color_green if self.ModConfig.abuseipdb_scan == 1 else color_red}abuseipdb_scan{nogc} ==> {self.ModConfig.abuseipdb_scan}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' {color_green if self.ModConfig.freeipapi_scan == 1 else color_red}freeipapi_scan{nogc} ==> {self.ModConfig.freeipapi_scan}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' {color_green if self.ModConfig.cloudfilt_scan == 1 else color_red}cloudfilt_scan{nogc} ==> {self.ModConfig.cloudfilt_scan}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' [{color_green if self.ModConfig.autolimit == 1 else color_red}Autolimit{nogc}] ==> {self.ModConfig.autolimit}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' {color_green if self.ModConfig.autolimit == 1 else color_red}Autolimit Amount{nogc} ==> {self.ModConfig.autolimit_amount}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' {color_green if self.ModConfig.autolimit == 1 else color_red}Autolimit Interval{nogc} ==> {self.ModConfig.autolimit_interval}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' [{color_green if self.ModConfig.flood == 1 else color_red}Flood{nogc}] ==> {self.ModConfig.flood}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=' flood_action ==> Coming soon') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' flood_message ==> {self.ModConfig.flood_message}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' flood_time ==> {self.ModConfig.flood_time}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' flood_timer ==> {self.ModConfig.flood_timer}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' [{color_green if self.ModConfig.flood == 1 else color_red}Sentinel{nogc}] ==> {self.ModConfig.sentinel}') except KeyError as ke: self.Logs.error(f"Key Error : {ke}") case 'info': try: if len(cmd) < 2: await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Syntax. /msg {dnickname} INFO [nickname]") return None nickoruid = cmd[1] UserObject = self.User.get_user(nickoruid) if UserObject is not None: channels: list = [chan.name for chan in self.Channel.UID_CHANNEL_DB for uid_in_chan in chan.uids if self.Loader.Utils.clean_uid(uid_in_chan) == UserObject.uid] await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' UID : {UserObject.uid}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' NICKNAME : {UserObject.nickname}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' USERNAME : {UserObject.username}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' REALNAME : {UserObject.realname}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' HOSTNAME : {UserObject.hostname}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' VHOST : {UserObject.vhost}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' IP : {UserObject.remote_ip}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Country : {UserObject.geoip}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' WebIrc : {UserObject.isWebirc}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' WebWebsocket : {UserObject.isWebsocket}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' REPUTATION : {UserObject.score_connexion}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' MODES : {UserObject.umodes}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' CHANNELS : {", ".join(channels)}') await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' CONNECTION TIME : {UserObject.connexion_datetime}') else: await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"This user {nickoruid} doesn't exist") except KeyError as ke: self.Logs.warning(f"Key error info user : {ke}") case 'sentinel': # .sentinel on activation = str(cmd[1]).lower() channel_to_dont_quit = [self.Config.SALON_JAIL, self.Config.SERVICE_CHANLOG] if activation == 'on': result = self.Base.db_execute_query(f"SELECT distinct channel_name FROM {self.Config.TABLE_CHANNEL}") channels = result.fetchall() channel_in_db = [channel[0] for channel in channels] channel_to_dont_quit.extend(channel_in_db) self.update_configuration('sentinel', 1) for chan in self.Channel.UID_CHANNEL_DB: if chan.name not in channel_to_dont_quit: await self.Protocol.send_join_chan(uidornickname=dnickname, channel=chan.name) await self.Protocol.send_priv_msg(dnickname, f"Sentinel mode activated on {channel}", channel=chan.name) return None if activation == 'off': result = self.Base.db_execute_query(f"SELECT distinct channel_name FROM {self.Config.TABLE_CHANNEL}") channels = result.fetchall() channel_in_db = [channel[0] for channel in channels] channel_to_dont_quit.extend(channel_in_db) self.update_configuration('sentinel', 0) for chan in self.Channel.UID_CHANNEL_DB: if chan.name not in channel_to_dont_quit: await self.Protocol.send_part_chan(uidornickname=dnickname, channel=chan.name) await self.Protocol.send_priv_msg(dnickname, f"Sentinel mode deactivated on {channel}", channel=chan.name) self.join_saved_channels() return None