from datetime import datetime from typing import Union import re, socket, psutil, requests, json from core.irc import Irc # Le module crée devra réspecter quelques conditions # 1. Le nom de la classe devra toujours s'appeler comme le module. Exemple => nom de class Defender | nom du module mod_defender # 2. la methode __init__ devra toujours avoir les parametres suivant (self, irc:object) # 1 . Créer la variable Irc dans le module # 2 . Récuperer la configuration dans une variable # 3 . Définir et enregistrer les nouvelles commandes # 4 . Créer vos tables, en utilisant toujours le nom des votre classe en minuscule ==> defender_votre-table # 3. une methode _hcmds(self, user:str, cmd: list) devra toujours etre crée. class Defender(): def __init__(self, ircInstance:Irc) -> None: self.Irc = ircInstance # Ajouter l'object mod_irc a la classe ( Obligatoire ) self.Config = ircInstance.Config # Ajouter la configuration a la classe ( Obligatoire ) self.Base = ircInstance.Base # Ajouter l'objet Base au module ( Obligatoire ) self.timeout = self.Config.API_TIMEOUT # API Timeout self.Irc.debug(f'Module {self.__class__.__name__} loaded ...') # Créer les nouvelles commandes du module self.commands_level = { 0: ['code'], 1: ['join','part', 'info'], 2: ['q', 'dq', 'o', 'do', 'h', 'dh', 'v', 'dv', 'b', 'ub','k', 'kb'], 3: ['reputation','proxy_scan', 'flood', 'status', 'timer','show_reputation', 'show_users'] } self.__set_commands(self.commands_level) # Enrigstrer les nouvelles commandes dans le code self.__create_tables() # Créer les tables necessaire a votre module (ce n'es pas obligatoire) self.init_defender() # Créer une methode init ( ce n'es pas obligatoire ) def __set_commands(self, commands:dict) -> None: """### Rajoute les commandes du module au programme principal Args: commands (list): Liste des commandes du module Returns: None: Aucun retour attendu """ for level, com in commands.items(): for c in commands[level]: if not c in self.Irc.commands: self.Irc.commands_level[level].append(c) self.Irc.commands.append(c) return None def __create_tables(self) -> None: """Methode qui va créer la base de donnée si elle n'existe pas. Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module Args: database_name (str): Nom de la base de données ( pas d'espace dans le nom ) Returns: None: Aucun retour n'es attendu """ table_channel = '''CREATE TABLE IF NOT EXISTS def_channels ( id INTEGER PRIMARY KEY AUTOINCREMENT, datetime TEXT, channel TEXT ) ''' table_config = '''CREATE TABLE IF NOT EXISTS def_config ( id INTEGER PRIMARY KEY AUTOINCREMENT, datetime TEXT, parameter TEXT, value TEXT ) ''' table_trusted = '''CREATE TABLE IF NOT EXISTS def_trusted ( id INTEGER PRIMARY KEY AUTOINCREMENT, datetime TEXT, user TEXT, host TEXT, vhost TEXT ) ''' self.Base.db_execute_query(table_channel) self.Base.db_execute_query(table_config) self.Base.db_execute_query(table_trusted) return None def init_defender(self) -> bool: self.db_reputation = {} # Definir la variable qui contiendra la liste des user concerné par la réputation self.flood_system = {} # Variable qui va contenir les users self.reputation_first_connexion = {'ip': '', 'score': -1} # Contient les premieres informations de connexion self.abuseipdb_key = '13c34603fee4d2941a2c443cc5c77fd750757ca2a2c1b304bd0f418aff80c24be12651d1a3cfe674' # Laisser vide si aucune clé self.cloudfilt_key = 'r1gEtjtfgRQjtNBDMxsg' # Laisser vide si aucune clé # Rejoindre les salons self.join_saved_channels() # Variable qui va contenir les options de configuration du module Defender self.defConfig = { 'reputation': 0, 'reputation_timer': 0, 'reputation_seuil': 600, 'reputation_ban_all_chan': 0, 'local_scan': 0, 'psutil_scan': 0, 'abuseipdb_scan': 0, 'freeipapi_scan': 0, 'cloudfilt_scan': 0, 'flood': 0, 'flood_message': 5, 'flood_time': 1, 'flood_timer': 20 } # Syncrhoniser la variable defConfig avec la configuration de la base de données. self.sync_db_configuration() return True def sync_db_configuration(self) -> None: query = "SELECT parameter, value FROM def_config" response = self.Base.db_execute_query(query) result = response.fetchall() # Si le resultat ne contient aucune valeur if not result: # Base de données vide Inserer la premiere configuration for param, value in self.defConfig.items(): mes_donnees = {'datetime': self.Base.get_datetime(), 'parameter': param, 'value': value} insert = self.Base.db_execute_query('INSERT INTO def_config (datetime, parameter, value) VALUES (:datetime, :parameter, :value)', mes_donnees) insert_rows = insert.rowcount if insert_rows > 0: self.Irc.debug(f'Row affected into def_config : {insert_rows}') # Inserer une nouvelle configuration for param, value in self.defConfig.items(): mes_donnees = {'parameter': param} search_param_query = "SELECT parameter, value FROM def_config WHERE parameter = :parameter" result = self.Base.db_execute_query(search_param_query, mes_donnees) isParamExist = result.fetchone() if isParamExist is None: mes_donnees = {'datetime': self.Base.get_datetime(), 'parameter': param, 'value': value} insert = self.Base.db_execute_query('INSERT INTO def_config (datetime, parameter, value) VALUES (:datetime, :parameter, :value)', mes_donnees) insert_rows = insert.rowcount if insert_rows > 0: self.Irc.debug(f'DB_Def_config - new param included : {insert_rows}') # Supprimer un parameter si il n'existe plus dans la variable global query = "SELECT parameter FROM def_config" response = self.Base.db_execute_query(query) dbresult = response.fetchall() for dbparam in dbresult: if not dbparam[0] in self.defConfig: mes_donnees = {'parameter': dbparam[0]} delete = self.Base.db_execute_query('DELETE FROM def_config WHERE parameter = :parameter', mes_donnees) row_affected = delete.rowcount if row_affected > 0: self.Irc.debug(f'DB_Def_config - param [{dbparam[0]}] has been deleted') # Synchroniser la base de données avec la variable global query = "SELECT parameter, value FROM def_config" response = self.Base.db_execute_query(query) result = response.fetchall() for param, value in result: self.defConfig[param] = self.Base.int_if_possible(value) self.Irc.debug(self.defConfig) return None def update_db_configuration(self, param:str, value:str) -> None: if not param in self.defConfig: self.Irc.debug(f"Le parametre {param} n'existe pas dans la variable global") return None mes_donnees = {'parameter': param} search_param_query = "SELECT parameter, value FROM def_config WHERE parameter = :parameter" result = self.Base.db_execute_query(search_param_query, mes_donnees) isParamExist = result.fetchone() if not isParamExist is None: mes_donnees = {'datetime': self.Base.get_datetime(), 'parameter': param, 'value': value} update = self.Base.db_execute_query('UPDATE def_config SET datetime = :datetime, value = :value WHERE parameter = :parameter', mes_donnees) updated_rows = update.rowcount if updated_rows > 0: self.defConfig[param] = self.Base.int_if_possible(value) self.Irc.debug(f'DB_Def_config - new param updated : {param} {value}') self.Irc.debug(self.defConfig) def add_defender_channel(self, channel:str) -> bool: """Cette fonction ajoute les salons de join de Defender Args: channel (str): le salon à enregistrer. """ mes_donnees = {'channel': channel} response = self.Base.db_execute_query("SELECT id FROM def_channels WHERE channel = :channel", mes_donnees) isChannelExist = response.fetchone() if isChannelExist is None: mes_donnees = {'datetime': self.Base.get_datetime(), 'channel': channel} insert = self.Base.db_execute_query(f"INSERT INTO def_channels (datetime, channel) VALUES (:datetime, :channel)", mes_donnees) return True else: return False def delete_defender_channel(self, channel:str) -> bool: """Cette fonction supprime les salons de join de Defender Args: channel (str): le salon à enregistrer. """ mes_donnes = {'channel': channel} response = self.Base.db_execute_query("DELETE FROM def_channels WHERE channel = :channel", mes_donnes) affected_row = response.rowcount if affected_row > 0: return True else: return False def insert_db_reputation(self, uid:str, ip:str, nickname:str, username:str, hostname:str, umodes:str, vhost:str, score:int, isWebirc:bool) -> None: currentDateTime = self.Base.get_datetime() secret_code = self.Base.get_random(8) # Vérifier si le uid existe déja if uid in self.db_reputation: return None self.db_reputation[uid] = { 'nickname': nickname, 'username': username, 'hostname': hostname, 'umodes': umodes, 'vhost': vhost, 'ip': ip, 'score': score, 'isWebirc': isWebirc, 'secret_code': secret_code, 'connected_datetime': currentDateTime, 'updated_datetime': currentDateTime } return None def update_db_reputation(self, uidornickname:str, newnickname:str) -> None: uid = self.Irc.get_uid(uidornickname) currentDateTime = self.Base.get_datetime() secret_code = self.Base.get_random(8) if not uid in self.Irc.db_uid: self.Irc.debug(f'Etrange UID {uid}') return None if uid in self.db_reputation: self.db_reputation[uid]['nickname'] = newnickname self.db_reputation[uid]['updated_datetime'] = currentDateTime self.db_reputation[uid]['secret_code'] = secret_code else: self.Irc.debug(f"L'UID {uid} n'existe pas dans REPUTATION_DB") return None def delete_db_reputation(self, uid:str) -> None: """Cette fonction va supprimer le UID du dictionnaire self.db_reputation Args: uid (str): le uid ou le nickname du user """ # Si le UID existe dans le dictionnaire alors le supprimer if uid in self.db_reputation: # Si le nickname existe dans le dictionnaire alors le supprimer del self.db_reputation[uid] self.Irc.debug(f"Le UID {uid} a été supprimé du REPUTATION_DB") def insert_db_trusted(self, uid: str, nickname:str) -> None: uid = self.Irc.get_uid(uid) nickname = self.Irc.get_nickname(nickname) query = "SELECT id FROM def_trusted WHERE user = ?" exec_query = self.Base.db_execute_query(query, {"user": nickname}) response = exec_query.fetchone() if not response is None: q_insert = "INSERT INTO def_trusted (datetime, user, host, vhost) VALUES (?, ?, ?, ?)" mes_donnees = {'datetime': self.Base.get_datetime(), 'user': nickname, 'host': '*', 'vhost': '*'} exec_query = self.Base.db_execute_query(q_insert, mes_donnees) pass def join_saved_channels(self) -> None: result = self.Base.db_execute_query("SELECT id, channel FROM def_channels") channels = result.fetchall() jail_chan = self.Config.SALON_JAIL jail_chan_mode = self.Config.SALON_JAIL_MODES service_id = self.Config.SERVICE_ID dumodes = self.Config.SERVICE_UMODES dnickname = self.Config.SERVICE_NICKNAME unixtime = self.Base.get_unixtime() for channel in channels: id, chan = channel self.Irc.send2socket(f":{self.Config.SERVEUR_ID} SJOIN {unixtime} {chan} + :{self.Config.SERVICE_ID}") if chan == jail_chan: self.Irc.send2socket(f":{service_id} SAMODE {jail_chan} +{dumodes} {dnickname}") self.Irc.send2socket(f":{service_id} MODE {jail_chan} +{jail_chan_mode}") return None def get_user_uptime_in_minutes(self, uidornickname:str) -> float: """Retourne depuis quand l'utilisateur est connecté (en secondes ). Args: uid (str): le uid ou le nickname de l'utilisateur Returns: int: Temps de connexion de l'utilisateur en secondes """ get_uid = self.Irc.get_uid(uidornickname) if not get_uid in self.Irc.db_uid: return 0 # Convertir la date enregistrée dans UID_DB en un objet {datetime} connected_time_string = self.Irc.db_uid[get_uid]['datetime'] if type(connected_time_string) == datetime: connected_time = connected_time_string else: connected_time = datetime.strptime(connected_time_string, "%Y-%m-%d %H:%M:%S.%f") # Quelle heure est-il ? current_datetime = datetime.now() uptime = current_datetime - connected_time convert_to_minutes = uptime.seconds / 60 uptime_minutes = round(number=convert_to_minutes, ndigits=0) return uptime_minutes def system_reputation(self, uid:str)-> None: # Reputation security # - Activation ou désactivation du système --> OK # - Le user sera en mesure de changer la limite de la réputation --> OK # - Defender devra envoyer l'utilisateur sur un salon défini dans la configuration, {jail_chan} # - Defender devra bloquer cet utilisateur sur le salon qui sera en mode (+m) # - Defender devra envoyer un message du type "Merci de taper cette comande /msg {nomdudefender} {un code généré aléatoirement} # - Defender devra reconnaître le code # - Defender devra libérer l'utilisateur et l'envoyer vers un salon défini dans la configuration {welcome_chan} # - Defender devra intégrer une liste d'IDs (pseudo/host) exemptés de 'Reputation security' malgré un score de rép. faible et un pseudo non enregistré. try: if not uid in self.db_reputation: return False code = self.db_reputation[uid]['secret_code'] salon_logs = self.Config.SERVICE_CHANLOG salon_jail = self.Config.SALON_JAIL jailed_nickname = self.db_reputation[uid]['nickname'] jailed_score = self.db_reputation[uid]['score'] color_red = self.Config.CONFIG_COLOR['rouge'] color_black = self.Config.CONFIG_COLOR['noire'] color_bold = self.Config.CONFIG_COLOR['gras'] service_id = self.Config.SERVICE_ID service_prefix = self.Config.SERVICE_PREFIX reputation_ban_all_chan = self.Base.int_if_possible(self.defConfig['reputation_ban_all_chan']) if not self.db_reputation[uid]['isWebirc']: # Si le user ne vient pas de webIrc self.Irc.send2socket(f":{service_id} SAJOIN {jailed_nickname} {salon_jail}") self.Irc.send2socket(f":{service_id} PRIVMSG {salon_logs} :[{color_red} REPUTATION {color_black}] : Connexion de {jailed_nickname} ({jailed_score}) ==> {salon_jail}") self.Irc.send2socket(f":{service_id} NOTICE {jailed_nickname} :[{color_red} {jailed_nickname} {color_black}] : Merci de tapez la commande suivante {color_bold}{service_prefix}code {code}{color_bold}") if reputation_ban_all_chan == 1: for chan in self.Irc.db_chan: if chan != salon_jail: self.Irc.send2socket(f":{service_id} MODE {chan} +b {jailed_nickname}!*@*") self.Irc.send2socket(f":{service_id} KICK {chan} {jailed_nickname}") self.Irc.debug(f"system_reputation : {jailed_nickname} à été capturé par le système de réputation") # self.Irc.create_ping_timer(int(self.defConfig['reputation_timer']) * 60, 'Defender', 'system_reputation_timer') self.Base.create_timer(int(self.defConfig['reputation_timer']) * 60, self.system_reputation_timer) else: self.Irc.debug(f"system_reputation : {jailed_nickname} à été supprimé du système de réputation car connecté via WebIrc ou il est dans la 'Trusted list'") self.delete_db_reputation(uid) except IndexError as e: self.Irc.debug(f"system_reputation : {str(e)}") def system_reputation_timer(self) -> None: try: reputation_flag = int(self.defConfig['reputation']) reputation_timer = int(self.defConfig['reputation_timer']) reputation_seuil = self.defConfig['reputation_seuil'] service_id = self.Config.SERVICE_ID dchanlog = self.Config.SERVICE_CHANLOG color_red = self.Config.CONFIG_COLOR['rouge'] color_black = self.Config.CONFIG_COLOR['noire'] salon_jail = self.Config.SALON_JAIL if reputation_flag == 0: return None elif reputation_timer == 0: return None # self.Irc.debug(self.db_reputation) uid_to_clean = [] for uid in self.db_reputation: if not self.db_reputation[uid]['isWebirc']: # Si il ne vient pas de WebIRC self.Irc.debug(f"Nickname: {self.db_reputation[uid]['nickname']} | uptime: {self.get_user_uptime_in_minutes(uid)} | reputation time: {reputation_timer}") if self.get_user_uptime_in_minutes(uid) >= reputation_timer and int(self.db_reputation[uid]['score']) <= int(reputation_seuil): self.Irc.debug('-----'*20) self.Irc.send2socket(f":{service_id} PRIVMSG {dchanlog} :[{color_red} REPUTATION {color_black}] : Action sur {self.db_reputation[uid]['nickname']} aprés {str(reputation_timer)} minutes d'inactivité") # if not system_reputation_timer_action(cglobal['reputation_timer_action'], uid, self.db_reputation[uid]['nickname']): # return False self.Irc.send2socket(f":{service_id} KILL {self.db_reputation[uid]['nickname']} After {str(reputation_timer)} minutes of inactivity you should reconnect and type the password code ") self.Irc.debug(f"Action sur {self.db_reputation[uid]['nickname']} aprés {str(reputation_timer)} minutes d'inactivité") uid_to_clean.append(uid) for uid in uid_to_clean: # Suppression des éléments dans {UID_DB} et {REPUTATION_DB} for chan in self.Irc.db_chan: if chan != salon_jail: self.Irc.send2socket(f":{service_id} MODE {chan} -b {self.db_reputation[uid]['nickname']}!*@*") # Lorsqu'un utilisateur quitte, il doit être supprimé de {UID_DB}. self.Irc.delete_db_uid(uid) self.delete_db_reputation(uid) except AssertionError as ae: self.Irc.debug(f'Assertion Error -> {ae}') def _execute_flood_action(self, action:str, channel:str) -> None: """DO NOT EXECUTE THIS FUNCTION WITHOUT THREADING Args: action (str): _description_ timer (int): _description_ nickname (str): _description_ channel (str): _description_ Returns: _type_: _description_ """ service_id = self.Config.SERVICE_ID match action: case 'mode-m': # Action -m sur le salon self.Irc.send2socket(f":{service_id} MODE {channel} -m") case _: pass return None def flood(self, detected_user:str, channel:str) -> None: if self.defConfig['flood'] == 0: return None if not '#' in channel: return None flood_time = self.defConfig['flood_time'] flood_message = self.defConfig['flood_message'] flood_timer = self.defConfig['flood_timer'] service_id = self.Config.SERVICE_ID dnickname = self.Config.SERVICE_NICKNAME color_red = self.Config.CONFIG_COLOR['rouge'] color_bold = self.Config.CONFIG_COLOR['gras'] get_detected_uid = self.Irc.get_uid(detected_user) get_detected_nickname = self.Irc.get_nickname(detected_user) unixtime = self.Base.get_unixtime() get_diff_secondes = 0 if not get_detected_uid in self.flood_system: self.flood_system[get_detected_uid] = { 'nbr_msg': 0, 'first_msg_time': unixtime } self.flood_system[get_detected_uid]['nbr_msg'] += 1 get_diff_secondes = unixtime - self.flood_system[get_detected_uid]['first_msg_time'] if get_diff_secondes > flood_time: self.flood_system[get_detected_uid]['first_msg_time'] = unixtime self.flood_system[get_detected_uid]['nbr_msg'] = 0 get_diff_secondes = unixtime - self.flood_system[get_detected_uid]['first_msg_time'] elif self.flood_system[get_detected_uid]['nbr_msg'] > flood_message: self.Irc.debug('system de flood detecté') self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} : {color_red} {color_bold} Flood detected. Apply the +m mode (Ô_o)') self.Irc.send2socket(f":{service_id} MODE {channel} +m") self.Irc.debug(f'FLOOD Détecté sur {get_detected_nickname} mode +m appliqué sur le salon {channel}') self.flood_system[get_detected_uid]['nbr_msg'] = 0 self.flood_system[get_detected_uid]['first_msg_time'] = unixtime self.Base.create_timer(flood_timer, self._execute_flood_action, ('mode-m', channel)) def run_db_action_timer(self, wait_for: float = 0) -> None: query = "SELECT parameter FROM def_config" res = self.Base.db_execute_query(query) service_id = self.Config.SERVICE_ID dchanlog = self.Config.SERVICE_CHANLOG for param in res.fetchall(): if param[0] == 'reputation': self.Irc.send2socket(f":{service_id} PRIVMSG {dchanlog} : ===> {param[0]}") else: self.Irc.send2socket(f":{service_id} PRIVMSG {dchanlog} : {param[0]}") # print(f":{service_id} PRIVMSG {dchanlog} : {param[0]}") # self.Base.garbage_collector_timer() return None def scan_ports(self, remote_ip: str) -> None: if remote_ip in self.Config.WHITELISTED_IP: return None for port in self.Config.PORTS_TO_SCAN: newSocket = '' newSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) newSocket.settimeout(0.5) try: connection = (remote_ip, self.Base.int_if_possible(port)) newSocket.connect(connection) self.Irc.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} :[ {self.Config.CONFIG_COLOR['rouge']}PROXY_SCAN{self.Config.CONFIG_COLOR['noire']} ] : Port [{str(port)}] ouvert sur l'adresse ip [{remote_ip}]") # print(f"=======> Le port {str(port)} est ouvert !!") self.Base.running_sockets.append(newSocket) # print(newSocket) newSocket.shutdown(socket.SHUT_RDWR) newSocket.close() except (socket.timeout, ConnectionRefusedError): self.Irc.debug(f"Le port {str(port)} est fermé") except AttributeError as ae: self.Irc.debug(f"AttributeError : {ae}") except socket.gaierror as err: self.Irc.debug(f"Address Info Error: {err}") finally: # newSocket.shutdown(socket.SHUT_RDWR) newSocket.close() self.Irc.debug('=======> Fermeture de la socket') pass def get_ports_connexion(self, remote_ip: str) -> list[int]: if remote_ip in self.Config.WHITELISTED_IP: return None connections = psutil.net_connections(kind='inet') matching_ports = [conn.raddr.port for conn in connections if conn.raddr and conn.raddr.ip == remote_ip] self.Irc.debug(f"Connexion of {remote_ip} using ports : {str(matching_ports)}") return matching_ports def abuseipdb_scan(self, remote_ip:str) -> Union[dict[str, any], None]: """Analyse l'ip avec AbuseIpDB Cette methode devra etre lancer toujours via un thread ou un timer. Args: remote_ip (_type_): l'ip a analyser Returns: dict[str, any] | None: les informations du provider keys : 'score', 'country', 'isTor', 'totalReports' """ if remote_ip in self.Config.WHITELISTED_IP: return None if self.defConfig['abuseipdb_scan'] == 0: return None if self.abuseipdb_key == '': return None url = 'https://api.abuseipdb.com/api/v2/check' querystring = { 'ipAddress': remote_ip, 'maxAgeInDays': '90' } headers = { 'Accept': 'application/json', 'Key': self.abuseipdb_key } response = requests.request(method='GET', url=url, headers=headers, params=querystring, timeout=self.timeout) # Formatted output decodedResponse = json.loads(response.text) try: if not 'data' in decodedResponse: return None result = { 'score': decodedResponse['data']['abuseConfidenceScore'], 'country': decodedResponse['data']['countryCode'], 'isTor': decodedResponse['data']['isTor'], 'totalReports': decodedResponse['data']['totalReports'] } service_id = self.Config.SERVICE_ID service_chanlog = self.Config.SERVICE_CHANLOG color_red = self.Config.CONFIG_COLOR['rouge'] color_black = self.Config.CONFIG_COLOR['noire'] self.Irc.send2socket(f":{service_id} PRIVMSG {service_chanlog} :[ {color_red}ABUSEIPDB_SCAN{color_black} ] : Connexion de {remote_ip} ==> Score: {str(result['score'])} | Country : {result['country']} | Tor : {str(result['isTor'])} | Total Reports : {str(result['totalReports'])}") response.close() return result except KeyError as ke: self.Irc.debug(f"AbuseIpDb KeyError : {ke}") def freeipapi_scan(self, remote_ip:str) -> Union[dict[str, any], None]: """Analyse l'ip avec Freeipapi Cette methode devra etre lancer toujours via un thread ou un timer. Args: remote_ip (_type_): l'ip a analyser Returns: dict[str, any] | None: les informations du provider keys : 'countryCode', 'isProxy' """ if remote_ip in self.Config.WHITELISTED_IP: return None if self.defConfig['freeipapi_scan'] == 0: return None service_id = self.Config.SERVICE_ID service_chanlog = self.Config.SERVICE_CHANLOG color_red = self.Config.CONFIG_COLOR['rouge'] color_black = self.Config.CONFIG_COLOR['noire'] url = f'https://freeipapi.com/api/json/{remote_ip}' headers = { 'Accept': 'application/json', } response = requests.request(method='GET', url=url, headers=headers, timeout=self.timeout) # Formatted output decodedResponse = json.loads(response.text) try: status_code = response.status_code if status_code == 429: self.Irc.debug(f'Too Many Requests - The rate limit for the API has been exceeded.') return None elif status_code != 200: print("salut salut") return None result = { 'countryCode': decodedResponse['countryCode'] if 'countryCode' in decodedResponse else None, 'isProxy': decodedResponse['isProxy'] if 'isProxy' in decodedResponse else None } self.Irc.send2socket(f":{service_id} PRIVMSG {service_chanlog} :[ {color_red}FREEIPAPI_SCAN{color_black} ] : Connexion de {remote_ip} ==> Proxy: {str(result['isProxy'])} | Country : {result['countryCode']}") response.close() return result except KeyError as ke: self.Irc.debug(f"FREEIPAPI_SCAN KeyError : {ke}") def cloudfilt_scan(self, remote_ip:str) -> Union[dict[str, any], None]: """Analyse l'ip avec cloudfilt Cette methode devra etre lancer toujours via un thread ou un timer. Args: remote_ip (_type_): l'ip a analyser Returns: dict[str, any] | None: les informations du provider keys : 'countryCode', 'isProxy' """ if remote_ip in self.Config.WHITELISTED_IP: return None if self.defConfig['cloudfilt_scan'] == 0: return None service_id = self.Config.SERVICE_ID service_chanlog = self.Config.SERVICE_CHANLOG color_red = self.Config.CONFIG_COLOR['rouge'] color_black = self.Config.CONFIG_COLOR['noire'] url = f"https://developers18334.cloudfilt.com/" data = { 'ip': remote_ip, 'key': 'r1gEtjtfgRQjtNBDMxsg' } response = requests.post(url=url, data=data) # Formatted output decodedResponse = json.loads(response.text) try: status_code = response.status_code if status_code != 200: self.Irc.debug(f'Error connecting to cloudfilt API | Code: {str(status_code)}') return None result = { 'countryiso': decodedResponse['countryiso'] if 'countryiso' in decodedResponse else None, 'listed': decodedResponse['listed'] if 'listed' in decodedResponse else None, 'listed_by': decodedResponse['listed_by'] if 'listed_by' in decodedResponse else None, 'host': decodedResponse['host'] if 'host' in decodedResponse else None } self.Irc.send2socket(f":{service_id} PRIVMSG {service_chanlog} :[ {color_red}CLOUDFILT_SCAN{color_black} ] : Connexion de {remote_ip} ==> host: {str(result['host'])} | country: {str(result['countryiso'])} | listed: {str(result['listed'])} | listed by : {result['listed_by']}") response.close() return result except KeyError as ke: self.Irc.debug(f"CLOUDFILT_SCAN KeyError : {ke}") return None def cmd(self, data:list) -> None: service_id = self.Config.SERVICE_ID # Defender serveur id cmd = list(data).copy() if len(cmd) < 2: return None match cmd[1]: case 'REPUTATION': # :001 REPUTATION 91.168.141.239 118 try: self.reputation_first_connexion['ip'] = cmd[2] self.reputation_first_connexion['score'] = cmd[3] # self.Base.scan_ports(cmd[2]) if self.defConfig['local_scan'] == 1 and not cmd[2] in self.Config.WHITELISTED_IP: self.Base.create_thread(self.scan_ports, (cmd[2], )) if self.defConfig['psutil_scan'] == 1 and not cmd[2] in self.Config.WHITELISTED_IP: self.Base.create_thread(self.get_ports_connexion, (cmd[2], )) if self.defConfig['abuseipdb_scan'] == 1 and not cmd[2] in self.Config.WHITELISTED_IP: self.Base.create_thread(self.abuseipdb_scan, (cmd[2], )) if self.defConfig['freeipapi_scan'] == 1 and not cmd[2] in self.Config.WHITELISTED_IP: self.Base.create_thread(self.freeipapi_scan, (cmd[2], )) if self.defConfig['cloudfilt_scan'] == 1 and not cmd[2] in self.Config.WHITELISTED_IP: self.Base.create_thread(self.cloudfilt_scan, (cmd[2], )) # Possibilité de déclancher les bans a ce niveau. except IndexError: self.Irc.debug(f'cmd reputation: index error') match cmd[2]: case 'PRIVMSG': cmd.pop(0) user_trigger = str(cmd[0]).replace(':','') channel = cmd[2] find_nickname = self.Irc.get_nickname(user_trigger) self.flood(find_nickname, channel) case 'UID': if self.Irc.INIT == 1: return None if 'webirc' in cmd[0]: isWebirc = True else: isWebirc = False # Supprimer la premiere valeur et finir le code normalement cmd.pop(0) uid = str(cmd[7]) nickname = str(cmd[2]) username = str(cmd[5]) hostname = str(cmd[6]) umodes = str(cmd[9]) vhost = str(cmd[10]) reputation_flag = self.Base.int_if_possible(self.defConfig['reputation']) reputation_seuil = self.Base.int_if_possible(self.defConfig['reputation_seuil']) if self.Irc.INIT == 0: # A chaque nouvelle connexion chargé les données dans reputation client_ip = '' client_score = 0 if 'ip' in self.reputation_first_connexion: client_ip = self.reputation_first_connexion['ip'] if 'score' in self.reputation_first_connexion: client_score = self.reputation_first_connexion['score'] # Si réputation activé lancer un whois sur le nickname connecté # Si le user n'es pas un service ni un IrcOP alors whois if not re.match(fr'^.*[S|o?].*$', umodes): if reputation_flag == 1 and int(client_score) <= int(reputation_seuil): # if not db_isTrusted_user(user_id): self.insert_db_reputation(uid, client_ip, nickname, username, hostname, umodes, vhost, client_score, isWebirc) # self.Irc.send2socket(f":{service_id} WHOIS {nickname}") if uid in self.db_reputation: if reputation_flag == 1 and int(client_score) <= int(reputation_seuil): self.system_reputation(uid) self.Irc.debug('Démarrer le systeme de reputation') case 'SJOIN': # ['@msgid=F9B7JeHL5pj9nN57cJ5pEr;time=2023-12-28T20:47:24.305Z', ':001', 'SJOIN', '1702138958', '#welcome', ':0015L1AHL'] try: cmd.pop(0) parsed_chan = cmd[3] self.Irc.insert_db_chan(parsed_chan) if self.defConfig['reputation'] == 1: parsed_UID = cmd[4] pattern = fr'^:[@|%|\+|~|\*]*' parsed_UID = re.sub(pattern, '', parsed_UID) if parsed_UID in self.db_reputation: # print(f"====> {str(self.db_reputation)}") isWebirc = self.db_reputation[parsed_UID]['isWebirc'] if self.defConfig['reputation_ban_all_chan'] == 1 and not isWebirc: if parsed_chan != self.Config.SALON_JAIL: self.Irc.send2socket(f":{service_id} MODE {parsed_chan} +b {self.db_reputation[parsed_UID]['nickname']}!*@*") self.Irc.send2socket(f":{service_id} KICK {parsed_chan} {self.db_reputation[parsed_UID]['nickname']}") self.Irc.debug(f'SJOIN parsed_uid : {parsed_UID}') except KeyError as ke: self.Irc.debug(f"key error SJOIN : {ke}") case 'SLOG': # self.Base.scan_ports(cmd[7]) cmd.pop(0) if self.defConfig['local_scan'] == 1 and not cmd[7] in self.Config.WHITELISTED_IP: self.Base.create_thread(self.scan_ports, (cmd[7], )) if self.defConfig['psutil_scan'] == 1 and not cmd[7] in self.Config.WHITELISTED_IP: self.Base.create_thread(self.get_ports_connexion, (cmd[7], )) if self.defConfig['abuseipdb_scan'] == 1 and not cmd[7] in self.Config.WHITELISTED_IP: self.Base.create_thread(self.abuseipdb_scan, (cmd[7], )) if self.defConfig['freeipapi_scan'] == 1 and not cmd[7] in self.Config.WHITELISTED_IP: self.Base.create_thread(self.freeipapi_scan, (cmd[7], )) if self.defConfig['cloudfilt_scan'] == 1 and not cmd[7] in self.Config.WHITELISTED_IP: self.Base.create_thread(self.cloudfilt_scan, (cmd[7], )) case 'NICK': # :0010BS24L NICK [NEWNICK] 1697917711 # Changement de nickname try: cmd.pop(0) uid = str(cmd[0]).replace(':','') oldnick = self.db_reputation[uid]['nickname'] newnickname = cmd[2] jail_salon = self.Config.SALON_JAIL service_id = self.Config.SERVICE_ID self.update_db_reputation(uid, newnickname) if uid in self.db_reputation: for chan in self.Irc.db_chan: if chan != jail_salon: self.Irc.send2socket(f":{service_id} MODE {chan} -b {oldnick}!*@*") self.Irc.send2socket(f":{service_id} MODE {chan} +b {newnickname}!*@*") except KeyError as ke: self.Irc.debug(f'cmd - NICK - KeyError: {ke}') case 'QUIT': # :001N1WD7L QUIT :Quit: free_znc_1 cmd.pop(0) user_id = str(cmd[0]).replace(':','') final_UID = user_id jail_salon = self.Config.SALON_JAIL service_id = self.Config.SERVICE_ID if final_UID in self.db_reputation: final_nickname = self.db_reputation[user_id]['nickname'] for chan in self.Irc.db_chan: if chan != jail_salon: self.Irc.send2socket(f":{service_id} MODE {chan} -b {final_nickname}!*@*") self.delete_db_reputation(final_UID) def _hcmds(self, user:str, cmd: list) -> None: command = str(cmd[0]).lower() fromuser = user dnickname = self.Config.SERVICE_NICKNAME # Defender nickname dchanlog = self.Config.SERVICE_CHANLOG # Defender chan log dumodes = self.Config.SERVICE_UMODES # Les modes de Defender service_id = self.Config.SERVICE_ID # Defender serveur id jail_chan = self.Config.SALON_JAIL # Salon pot de miel jail_chan_mode = self.Config.SALON_JAIL_MODES # Mode du salon "pot de miel" match command: case 'timer': try: timer_sent = self.Base.int_if_possible(cmd[1]) timer_sent = int(timer_sent) # self.Irc.create_ping_timer(timer_sent, 'Defender', 'run_db_action_timer') self.Base.create_timer(timer_sent, self.run_db_action_timer) # self.Base.create_timer(timer_sent, self.Base.garbage_collector_sockets) except TypeError as te: self.Irc.debug(f"Type Error -> {te}") except ValueError as ve: self.Irc.debug(f"Value Error -> {ve}") case 'show_reputation': if not self.db_reputation: self.Irc.send2socket(f':{dnickname} PRIVMSG {dchanlog} : No one is suspected') for uid, nickname in self.db_reputation.items(): self.Irc.send2socket(f':{dnickname} PRIVMSG {dchanlog} : Uid: {uid} | Nickname: {self.db_reputation[uid]["nickname"]} | Connected on: {self.db_reputation[uid]["connected_datetime"]} | Updated on: {self.db_reputation[uid]["updated_datetime"]}') case 'code': try: release_code = cmd[1] jailed_nickname = self.Irc.get_nickname(fromuser) jailed_UID = self.Irc.get_uid(fromuser) if not jailed_UID in self.db_reputation: self.Irc.send2socket(f":{dnickname} NOTICE {fromuser} : No code is requested ...") return False jailed_IP = self.db_reputation[jailed_UID]['ip'] jailed_salon = self.Config.SALON_JAIL reputation_seuil = self.defConfig['reputation_seuil'] welcome_salon = self.Config.SALON_LIBERER self.Irc.debug(f"IP de {jailed_nickname} : {jailed_IP}") link = self.Config.SERVEUR_LINK color_green = self.Config.CONFIG_COLOR['verte'] color_black = self.Config.CONFIG_COLOR['noire'] if jailed_UID in self.db_reputation: if release_code == self.db_reputation[jailed_UID]['secret_code']: self.Irc.send2socket(f':{dnickname} PRIVMSG {jailed_salon} : Bon mot de passe. Allez du vent !') if self.defConfig['reputation_ban_all_chan'] == 1: for chan in self.Irc.db_chan: if chan != jailed_salon: self.Irc.send2socket(f":{service_id} MODE {chan} -b {jailed_nickname}!*@*") del self.db_reputation[jailed_UID] self.Irc.debug(f'{jailed_UID} - {jailed_nickname} removed from REPUTATION_DB') self.Irc.send2socket(f":{service_id} SAPART {jailed_nickname} {jailed_salon}") self.Irc.send2socket(f":{service_id} SAJOIN {jailed_nickname} {welcome_salon}") self.Irc.send2socket(f":{link} REPUTATION {jailed_IP} {int(reputation_seuil) + 1}") self.Irc.send2socket(f":{service_id} PRIVMSG {jailed_nickname} :[{color_green} MOT DE PASS CORRECT {color_black}] : You have now the right to enjoy the network !") else: self.Irc.send2socket(f':{dnickname} PRIVMSG {jailed_salon} : Mauvais password') self.Irc.send2socket(f":{service_id} PRIVMSG {jailed_nickname} :[{color_green} MAUVAIS PASSWORD {color_black}]") else: self.Irc.send2socket(f":{dnickname} PRIVMSG {jailed_salon} : Ce n'est pas à toi de taper le mot de passe !") except IndexError: self.Irc.debug('_hcmd code: out of index') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} code [code]') except KeyError as ke: self.Irc.debug(f'_hcmd code: KeyError {ke}') # self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} code [code]') pass case 'reputation': # .reputation [on/off] --> activate or deactivate reputation system # .reputation set banallchan [on/off] --> activate or deactivate ban in all channel # .reputation set limit [xxxx] --> change the reputation threshold # .reputation [arg1] [arg2] [arg3] try: len_cmd = len(cmd) activation = str(cmd[1]).lower() # Nous sommes dans l'activation ON / OFF if len_cmd == 2: key = 'reputation' if activation == 'on': if self.defConfig[key] == 1: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['verte']}REPUTATION{self.Config.CONFIG_COLOR['noire']} ] : Already activated") return False self.update_db_configuration('reputation', 1) self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['verte']}REPUTATION{self.Config.CONFIG_COLOR['noire']} ] : Activated by {fromuser}") self.Irc.send2socket(f":{service_id} JOIN {jail_chan}") self.Irc.send2socket(f":{service_id} SAMODE {jail_chan} +{dumodes} {dnickname}") self.Irc.send2socket(f":{service_id} MODE {jail_chan} +{jail_chan_mode}") self.add_defender_channel(jail_chan) if activation == 'off': if self.defConfig[key] == 0: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['verte']}REPUTATION{self.Config.CONFIG_COLOR['noire']} ] : Already deactivated") return False self.update_db_configuration('reputation', 0) self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['rouge']}REPUTATION{self.Config.CONFIG_COLOR['noire']} ] : Deactivated by {fromuser}") self.Irc.send2socket(f":{service_id} SAMODE {jail_chan} -{dumodes} {dnickname}") self.Irc.send2socket(f":{service_id} MODE {jail_chan} -sS") self.Irc.send2socket(f":{service_id} PART {jail_chan}") self.delete_defender_channel(jail_chan) if len_cmd == 4: get_set = str(cmd[1]).lower() if get_set != 'set': return False get_options = str(cmd[2]).lower() match get_options: case 'banallchan': key = 'reputation_ban_all_chan' get_value = str(cmd[3]).lower() if get_value == 'on': if self.defConfig[key] == 1: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['rouge']}BAN ON ALL CHANS{self.Config.CONFIG_COLOR['noire']} ] : Already activated") return False self.update_db_configuration(key, 1) self.Irc.send2socket(f':{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR["verte"]}BAN ON ALL CHANS{self.Config.CONFIG_COLOR["noire"]} ] : Activated by {fromuser}') elif get_value == 'off': print(get_value) if self.defConfig[key] == 0: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['rouge']}BAN ON ALL CHANS{self.Config.CONFIG_COLOR['noire']} ] : Already deactivated") return False self.update_db_configuration(key, 0) self.Irc.send2socket(f':{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR["verte"]}BAN ON ALL CHANS{self.Config.CONFIG_COLOR["noire"]} ] : Deactivated by {fromuser}') case 'limit': reputation_seuil = int(cmd[3]) key = 'reputation_seuil' self.update_db_configuration(key, reputation_seuil) self.Irc.send2socket(f':{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR["verte"]}REPUTATION SEUIL{self.Config.CONFIG_COLOR["noire"]} ] : Limit set to {str(reputation_seuil)} by {fromuser}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Reputation set to {reputation_seuil}') case 'timer': reputation_timer = int(cmd[3]) key = 'reputation_timer' self.update_db_configuration(key, reputation_timer) self.Irc.send2socket(f':{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR["verte"]}REPUTATION TIMER{self.Config.CONFIG_COLOR["noire"]} ] : Timer set to {str(reputation_timer)} minute(s) by {fromuser}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Reputation set to {reputation_timer}') case _: pass except IndexError: self.Irc.debug('_hcmd reputation: out of index') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} reputation [ON/OFF]') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} reputation set banallchan [ON/OFF]') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} reputation set limit [1234]') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} reputation set timer [1234]') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} reputation set action [kill|None]') except ValueError: self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : La valeur devrait etre un entier >= 0') case 'proxy_scan': # .proxy_scan set local_scan on/off --> Va activer le scan des ports # .proxy_scan set psutil_scan on/off --> Active les informations de connexion a la machine locale # .proxy_scan set abuseipdb_scan on/off --> Active le scan via l'api abuseipdb len_cmd = len(cmd) color_green = self.Config.CONFIG_COLOR['verte'] color_red = self.Config.CONFIG_COLOR['rouge'] color_black = self.Config.CONFIG_COLOR['noire'] if len_cmd == 4: set_key = str(cmd[1]).lower() if set_key != 'set': self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} proxy_scan set local_scan [ON/OFF]') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} proxy_scan set psutil_scan [ON/OFF]') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} proxy_scan set abuseipdb_scan [ON/OFF]') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} proxy_scan set freeipapi_scan [ON/OFF]') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} proxy_scan set cloudfilt_scan [ON/OFF]') option = str(cmd[2]).lower() # => local_scan, psutil_scan, abuseipdb_scan action = str(cmd[3]).lower() # => on / off match option: case 'local_scan': if action == 'on': if self.defConfig[option] == 1: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated") return None self.update_db_configuration(option, 1) self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}") elif action == 'off': if self.defConfig[option] == 0: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated") return None self.update_db_configuration(option, 0) self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}") case 'psutil_scan': if action == 'on': if self.defConfig[option] == 1: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated") return None self.update_db_configuration(option, 1) self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}") elif action == 'off': if self.defConfig[option] == 0: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated") return None self.update_db_configuration(option, 0) self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}") case 'abuseipdb_scan': if action == 'on': if self.defConfig[option] == 1: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated") return None self.update_db_configuration(option, 1) self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}") elif action == 'off': if self.defConfig[option] == 0: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated") return None self.update_db_configuration(option, 0) self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}") case 'freeipapi_scan': if action == 'on': if self.defConfig[option] == 1: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated") return None self.update_db_configuration(option, 1) self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}") elif action == 'off': if self.defConfig[option] == 0: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated") return None self.update_db_configuration(option, 0) self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}") case 'cloudfilt_scan': if action == 'on': if self.defConfig[option] == 1: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated") return None self.update_db_configuration(option, 1) self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}") elif action == 'off': if self.defConfig[option] == 0: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated") return None self.update_db_configuration(option, 0) self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}") case _: self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} proxy_scan set local_scan [ON/OFF]') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} proxy_scan set psutil_scan [ON/OFF]') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} proxy_scan set abuseipdb_scan [ON/OFF]') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} proxy_scan set freeipapi_scan [ON/OFF]') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} proxy_scan set cloudfilt_scan [ON/OFF]') else: self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} proxy_scan set local_scan [ON/OFF]') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} proxy_scan set psutil_scan [ON/OFF]') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} proxy_scan set abuseipdb_scan [ON/OFF]') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} proxy_scan set freeipapi_scan [ON/OFF]') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} proxy_scan set cloudfilt_scan [ON/OFF]') case 'flood': # .flood on/off # .flood set flood_message 5 # .flood set flood_time 1 # .flood set flood_timer 20 try: len_cmd = len(cmd) if len_cmd == 2: activation = str(cmd[1]).lower() key = 'flood' if activation == 'on': if self.defConfig[key] == 1: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['verte']}FLOOD{self.Config.CONFIG_COLOR['noire']} ] : Already activated") return False self.update_db_configuration(key, 1) self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['verte']}FLOOD{self.Config.CONFIG_COLOR['noire']} ] : Activated by {fromuser}") if activation == 'off': if self.defConfig[key] == 0: self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['rouge']}FLOOD{self.Config.CONFIG_COLOR['noire']} ] : Already Deactivated") return False self.update_db_configuration(key, 0) self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['verte']}FLOOD{self.Config.CONFIG_COLOR['noire']} ] : Deactivated by {fromuser}") if len_cmd == 4: set_key = str(cmd[2]).lower() if str(cmd[1]).lower() == 'set': match set_key: case 'flood_message': key = 'flood_message' set_value = int(cmd[3]) print(f"{str(set_value)} - {set_key}") self.update_db_configuration(key, set_value) self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['verte']}FLOOD{self.Config.CONFIG_COLOR['noire']} ] : Flood message set to {set_value} by {fromuser}") case 'flood_time': key = 'flood_time' set_value = int(cmd[3]) self.update_db_configuration(key, set_value) self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['verte']}FLOOD{self.Config.CONFIG_COLOR['noire']} ] : Flood time set to {set_value} by {fromuser}") case 'flood_timer': key = 'flood_timer' set_value = int(cmd[3]) self.update_db_configuration(key, set_value) self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['verte']}FLOOD{self.Config.CONFIG_COLOR['noire']} ] : Flood timer set to {set_value} by {fromuser}") case _: pass except ValueError as ve: self.Irc.debug(f"{self.__class__.__name__} Value Error : {ve}") case 'status': color_green = self.Config.CONFIG_COLOR['verte'] color_red = self.Config.CONFIG_COLOR['rouge'] color_black = self.Config.CONFIG_COLOR['noire'] try: self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : [{color_green if self.defConfig["reputation"] == 1 else color_red}Reputation{color_black}] ==> {self.defConfig["reputation"]}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : reputation_seuil ==> {self.defConfig["reputation_seuil"]}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : reputation_ban_all_chan ==> {self.defConfig["reputation_ban_all_chan"]}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : reputation_timer ==> {self.defConfig["reputation_timer"]}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : [Proxy_scan]') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : {color_green if self.defConfig["local_scan"] == 1 else color_red}local_scan{color_black} ==> {self.defConfig["local_scan"]}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : {color_green if self.defConfig["psutil_scan"] == 1 else color_red}psutil_scan{color_black} ==> {self.defConfig["psutil_scan"]}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : {color_green if self.defConfig["abuseipdb_scan"] == 1 else color_red}abuseipdb_scan{color_black} ==> {self.defConfig["abuseipdb_scan"]}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : {color_green if self.defConfig["freeipapi_scan"] == 1 else color_red}freeipapi_scan{color_black} ==> {self.defConfig["freeipapi_scan"]}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : {color_green if self.defConfig["cloudfilt_scan"] == 1 else color_red}cloudfilt_scan{color_black} ==> {self.defConfig["cloudfilt_scan"]}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : [{color_green if self.defConfig["flood"] == 1 else color_red}Flood{color_black}] ==> {self.defConfig["flood"]}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : flood_action ==> Coming soon') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : flood_message ==> {self.defConfig["flood_message"]}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : flood_time ==> {self.defConfig["flood_time"]}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : flood_timer ==> {self.defConfig["flood_timer"]}') except KeyError as ke: self.Irc.debug(f"Key Error : {ke}") case 'join': try: channel = cmd[1] self.Irc.send2socket(f':{service_id} JOIN {channel}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : {dnickname} JOINED {channel}') self.add_defender_channel(channel) except IndexError: self.Irc.debug('_hcmd join: out of index') case 'part': try: channel = cmd[1] if channel == dchanlog: self.Irc.send2socket(f":{dnickname} NOTICE {fromuser} : {dnickname} CAN'T LEFT {channel} AS IT IS LOG CHANNEL") return False self.Irc.send2socket(f':{service_id} PART {channel}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : {dnickname} LEFT {channel}') self.delete_defender_channel(channel) except IndexError: self.Irc.debug('_hcmd part: out of index') case 'op' | 'o': # /mode #channel +o user # .op #channel user # [':adator', 'PRIVMSG', '#services', ':.o', '#services', 'dktmb'] try: print(cmd) channel = cmd[1] nickname = cmd[2] self.Irc.send2socket(f":{service_id} MODE {channel} +o {nickname}") except IndexError as e: self.Irc.debug(f'_hcmd OP: {str(e)}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} op [#SALON] [NICKNAME]') case 'deop' | 'do': # /mode #channel -o user # .deop #channel user try: channel = cmd[1] nickname = cmd[2] self.Irc.send2socket(f":{service_id} MODE {channel} -o {nickname}") except IndexError as e: self.Irc.debug(f'_hcmd DEOP: {str(e)}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} deop [#SALON] [NICKNAME]') case 'owner' | 'q': # /mode #channel +q user # .owner #channel user try: channel = cmd[1] nickname = cmd[2] self.Irc.send2socket(f":{service_id} MODE {channel} +q {nickname}") except IndexError as e: self.Irc.debug(f'_hcmd OWNER: {str(e)}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} owner [#SALON] [NICKNAME]') case 'deowner' | 'dq': # /mode #channel -q user # .deowner #channel user try: channel = cmd[1] nickname = cmd[2] self.Irc.send2socket(f":{service_id} MODE {channel} -q {nickname}") except IndexError as e: self.Irc.debug(f'_hcmd DEOWNER: {str(e)}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} deowner [#SALON] [NICKNAME]') case 'halfop' | 'h': # /mode #channel +h user # .halfop #channel user try: channel = cmd[1] nickname = cmd[2] self.Irc.send2socket(f":{service_id} MODE {channel} +h {nickname}") except IndexError as e: self.Irc.debug(f'_hcmd halfop: {str(e)}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} halfop [#SALON] [NICKNAME]') case 'dehalfop' | 'dh': # /mode #channel -h user # .dehalfop #channel user try: channel = cmd[1] nickname = cmd[2] self.Irc.send2socket(f":{service_id} MODE {channel} -h {nickname}") except IndexError as e: self.Irc.debug(f'_hcmd DEHALFOP: {str(e)}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} dehalfop [#SALON] [NICKNAME]') case 'voice' | 'v': # /mode #channel +v user # .voice #channel user try: channel = cmd[1] nickname = cmd[2] self.Irc.send2socket(f":{service_id} MODE {channel} +v {nickname}") except IndexError as e: self.Irc.debug(f'_hcmd VOICE: {str(e)}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} voice [#SALON] [NICKNAME]') case 'devoice' | 'dv': # /mode #channel -v user # .devoice #channel user try: channel = cmd[1] nickname = cmd[2] self.Irc.send2socket(f":{service_id} MODE {channel} -v {nickname}") except IndexError as e: self.Irc.debug(f'_hcmd DEVOICE: {str(e)}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} devoice [#SALON] [NICKNAME]') case 'ban' | 'b': # .ban #channel nickname try: channel = cmd[1] nickname = cmd[2] self.Irc.send2socket(f":{service_id} MODE {channel} +b {nickname}!*@*") self.Irc.debug(f'{fromuser} has banned {nickname} from {channel}') except IndexError as e: self.Irc.debug(f'_hcmd BAN: {str(e)}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} ban [#SALON] [NICKNAME]') case 'unban' | 'ub': # .unban #channel nickname try: channel = cmd[1] nickname = cmd[2] self.Irc.send2socket(f":{service_id} MODE {channel} -b {nickname}!*@*") self.Irc.debug(f'{fromuser} has unbanned {nickname} from {channel}') except IndexError as e: self.Irc.debug(f'_hcmd UNBAN: {str(e)}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} unban [#SALON] [NICKNAME]') case 'kick' | 'k': # .kick #channel nickname reason try: channel = cmd[1] nickname = cmd[2] reason = [] for i in range(3, len(cmd)): reason.append(cmd[i]) final_reason = ' '.join(reason) self.Irc.send2socket(f":{service_id} KICK {channel} {nickname} {final_reason}") self.Irc.debug(f'{fromuser} has kicked {nickname} from {channel} : {final_reason}') except IndexError as e: self.Irc.debug(f'_hcmd KICK: {str(e)}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} kick [#SALON] [NICKNAME] [REASON]') case 'kickban' | 'kb': # .kickban #channel nickname reason try: channel = cmd[1] nickname = cmd[2] reason = [] for i in range(3, len(cmd)): reason.append(cmd[i]) final_reason = ' '.join(reason) self.Irc.send2socket(f":{service_id} KICK {channel} {nickname} {final_reason}") self.Irc.send2socket(f":{service_id} MODE {channel} +b {nickname}!*@*") self.Irc.debug(f'{fromuser} has kicked and banned {nickname} from {channel} : {final_reason}') except IndexError as e: self.Irc.debug(f'_hcmd KICKBAN: {str(e)}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} kickban [#SALON] [NICKNAME] [REASON]') case 'info': try: nickoruid = cmd[1] uid_query = None nickname_query = None if not self.Irc.get_nickname(nickoruid) is None: nickname_query = self.Irc.get_nickname(nickoruid) if not self.Irc.get_uid(nickoruid) is None: uid_query = self.Irc.get_uid(nickoruid) if nickname_query is None and uid_query is None: self.Irc.send2socket(f":{dnickname} NOTICE {fromuser} : This user {nickoruid} doesn't exist") else: self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : UID : {uid_query}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : NICKNAME : {self.Irc.db_uid[uid_query]["nickname"]}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : USERNAME : {self.Irc.db_uid[uid_query]["username"]}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : HOSTNAME : {self.Irc.db_uid[uid_query]["hostname"]}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : VHOST : {self.Irc.db_uid[uid_query]["vhost"]}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : MODES : {self.Irc.db_uid[uid_query]["umodes"]}') self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : CONNECTION TIME : {self.Irc.db_uid[uid_query]["datetime"]}') except KeyError as ke: self.Irc.debug(f"Key error info user : {ke}") case 'show_users': for uid, infousers in self.Irc.db_uid.items(): # print(uid + " " + str(infousers)) for info in infousers: if info == 'nickname': self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :UID : {uid} - isWebirc: {infousers['isWebirc']} - {info}: {infousers[info]}")