diff --git a/core/base.py b/core/base.py index 5e15add..e62689c 100644 --- a/core/base.py +++ b/core/base.py @@ -1,6 +1,8 @@ +import importlib import os import re import json +import sys import time import random import socket @@ -8,12 +10,12 @@ import hashlib import logging import threading import ipaddress - import ast +from pathlib import Path +from types import ModuleType import requests - from dataclasses import fields -from typing import Union, Literal, TYPE_CHECKING +from typing import Union, TYPE_CHECKING from base64 import b64decode, b64encode from datetime import datetime, timedelta, timezone from sqlalchemy import create_engine, Engine, Connection, CursorResult @@ -155,15 +157,41 @@ class Base: currentdate = datetime.now().strftime('%d-%m-%Y %H:%M:%S') return currentdate - def get_all_modules(self) -> list: + def get_all_modules(self) -> list[str]: + """Get list of all main modules + using this pattern mod_*.py - all_files = os.listdir('mods/') - all_modules: list = [] - for module in all_files: - if module.endswith('.py') and not module == '__init__.py': - all_modules.append(module.replace('.py', '').lower()) + Returns: + list[str]: List of module names. + """ + base_path = Path('mods') + return [file.name.replace('.py', '') for file in base_path.rglob('mod_*.py')] - return all_modules + def reload_modules_with_dependencies(self, prefix: str = 'mods'): + """ + Reload all modules in sys.modules that start with the given prefix. + Useful for reloading a full package during development. + """ + modules_to_reload = [] + + # Collect target modules + for name, module in sys.modules.items(): + if ( + isinstance(module, ModuleType) + and module is not None + and name.startswith(prefix) + ): + modules_to_reload.append((name, module)) + + # Sort to reload submodules before parent modules + for name, module in sorted(modules_to_reload, key=lambda x: x[0], reverse=True): + try: + if 'mod_' not in name: + importlib.reload(module) + self.logs.debug(f'[LOAD_MODULE] Module {module} success') + + except Exception: + self.logs.error(f'[LOAD_MODULE] Module {module} failed [!]') def create_log(self, log_message: str) -> None: """Enregiste les logs diff --git a/core/classes/admin.py b/core/classes/admin.py index 9cf22b2..b1bd63f 100644 --- a/core/classes/admin.py +++ b/core/classes/admin.py @@ -1,7 +1,6 @@ from typing import Union -import core.definition as df from core.base import Base - +import core.definition as df class Admin: diff --git a/core/classes/channel.py b/core/classes/channel.py index 608b051..77b2001 100644 --- a/core/classes/channel.py +++ b/core/classes/channel.py @@ -1,7 +1,5 @@ from re import findall -from typing import Union, Literal, TYPE_CHECKING -from dataclasses import asdict - +from typing import Any, Optional, Literal, TYPE_CHECKING from core.classes import user if TYPE_CHECKING: @@ -170,25 +168,22 @@ class Channel: except Exception as err: self.Logs.error(f'{err}') - def get_Channel(self, channel_name: str) -> Union['MChannel', None]: - - Channel = None + def get_Channel(self, channel_name: str) -> Optional['MChannel']: for record in self.UID_CHANNEL_DB: if record.name == channel_name: - Channel = record + return record - return Channel + return None - def get_Channel_AsDict(self, chan_name: str) -> Union[dict[str, any], None]: + def get_channel_asdict(self, chan_name: str) -> Optional[dict[str, Any]]: - chanObj = self.get_Channel(chan_name=chan_name) + channel_obj: Optional['MChannel'] = self.get_Channel(chan_name=chan_name) - if not chanObj is None: - chan_as_dict = asdict(chanObj) - return chan_as_dict - else: + if channel_obj is None: return None + + return channel_obj.to_dict() def Is_Channel(self, channelToCheck: str) -> bool: """Check if the string has the # caractere and return True if this is a channel diff --git a/core/classes/client.py b/core/classes/client.py index fd54ff0..93e5b76 100644 --- a/core/classes/client.py +++ b/core/classes/client.py @@ -1,5 +1,5 @@ from re import sub -from typing import Union, TYPE_CHECKING +from typing import Any, Optional, Union, TYPE_CHECKING from dataclasses import asdict if TYPE_CHECKING: @@ -169,7 +169,7 @@ class Client: return userObj.nickname - def get_Client_AsDict(self, uidornickname: str) -> Union[dict[str, any], None]: + def get_client_asdict(self, uidornickname: str) -> Optional[dict[str, Any]]: """Transform User Object to a dictionary Args: @@ -178,12 +178,12 @@ class Client: Returns: Union[dict[str, any], None]: User Object as a dictionary or None """ - userObj = self.get_Client(uidornickname=uidornickname) + client_obj = self.get_Client(uidornickname=uidornickname) - if userObj is None: + if client_obj is None: return None - return asdict(userObj) + return client_obj.to_dict() def is_exist(self, uidornikname: str) -> bool: """Check if the UID or the nickname exist in the USER DB diff --git a/core/classes/clone.py b/core/classes/clone.py index 7315822..846750b 100644 --- a/core/classes/clone.py +++ b/core/classes/clone.py @@ -1,6 +1,5 @@ -from dataclasses import asdict from core.definition import MClone -from typing import Union +from typing import Any, Optional from core.base import Base class Clone: @@ -74,13 +73,11 @@ class Clone: Returns: bool: True if the nickname exist """ - response = False - - for cloneObject in self.UID_CLONE_DB: - if cloneObject.nickname == nickname: - response = True - - return response + clone = self.get_Clone(nickname) + if isinstance(clone, MClone): + return True + + return False def uid_exists(self, uid: str) -> bool: """Check if the nickname exist @@ -91,15 +88,13 @@ class Clone: Returns: bool: True if the nickname exist """ - response = False + clone = self.get_Clone(uid) + if isinstance(clone, MClone): + return True + + return False - for cloneObject in self.UID_CLONE_DB: - if cloneObject.uid == uid: - response = True - - return response - - def get_Clone(self, uidornickname: str) -> Union[MClone, None]: + def get_Clone(self, uidornickname: str) -> Optional[MClone]: """Get MClone object or None Args: @@ -108,17 +103,15 @@ class Clone: Returns: Union[MClone, None]: Return MClone object or None """ - cloneObj = None - for clone in self.UID_CLONE_DB: if clone.uid == uidornickname: - cloneObj = clone + return clone if clone.nickname == uidornickname: - cloneObj = clone + return clone - return cloneObj + return None - def get_uid(self, uidornickname: str) -> Union[str, None]: + def get_uid(self, uidornickname: str) -> Optional[str]: """Get the UID of the clone starting from the UID or the Nickname Args: @@ -127,27 +120,22 @@ class Clone: Returns: str|None: Return the UID """ - uid = None for record in self.UID_CLONE_DB: if record.uid == uidornickname: - uid = record.uid + return record.uid if record.nickname == uidornickname: - uid = record.uid + return record.uid - # if not uid is None: - # self.Logs.debug(f'The UID that you are looking for {uidornickname} has been found {uid}') + return None - return uid + def get_Clone_AsDict(self, uidornickname: str) -> Optional[dict[str, Any]]: - def get_Clone_AsDict(self, uidornickname: str) -> Union[dict[str, any], None]: + clone_obj = self.get_Clone(uidornickname=uidornickname) - cloneObj = self.get_Clone(uidornickname=uidornickname) - - if not cloneObj is None: - cloneObj_as_dict = asdict(cloneObj) - return cloneObj_as_dict - else: + if clone_obj is None: return None + + return clone_obj.to_dict() def kill(self, nickname:str) -> bool: diff --git a/core/classes/user.py b/core/classes/user.py index 17451de..9d2951e 100644 --- a/core/classes/user.py +++ b/core/classes/user.py @@ -1,6 +1,7 @@ from re import sub -from typing import Union, TYPE_CHECKING +from typing import Any, Optional, Union, TYPE_CHECKING from dataclasses import asdict +from datetime import datetime if TYPE_CHECKING: from core.base import Base @@ -169,7 +170,7 @@ class User: return userObj.nickname - def get_User_AsDict(self, uidornickname: str) -> Union[dict[str, any], None]: + def get_user_asdict(self, uidornickname: str) -> Optional[dict[str, Any]]: """Transform User Object to a dictionary Args: @@ -183,7 +184,7 @@ class User: if userObj is None: return None - return asdict(userObj) + return userObj.to_dict() def is_exist(self, uidornikname: str) -> bool: """Check if the UID or the nickname exist in the USER DB @@ -217,4 +218,35 @@ class User: if not parsed_UID: return None - return parsed_UID \ No newline at end of file + return parsed_UID + + def get_user_uptime_in_minutes(self, uidornickname: str) -> float: + """Retourne depuis quand l'utilisateur est connecté (in minutes). + + Args: + uid (str): The uid or le nickname + + Returns: + int: How long in minutes has the user been connected? + """ + + get_user = self.get_User(uidornickname) + if get_user is None: + return 0 + + # Convertir la date enregistrée dans UID_DB en un objet {datetime} + connected_time_string = get_user.connexion_datetime + + if isinstance(connected_time_string, datetime): + connected_time = connected_time_string + else: + connected_time = datetime.strptime(connected_time_string, "%Y-%m-%d %H:%M:%S.%f") + + # What time is it ? + current_datetime = datetime.now() + + uptime = current_datetime - connected_time + convert_to_minutes = uptime.seconds / 60 + uptime_minutes = round(number=convert_to_minutes, ndigits=2) + + return uptime_minutes diff --git a/core/irc.py b/core/irc.py index 35b3a33..7c17e10 100644 --- a/core/irc.py +++ b/core/irc.py @@ -616,10 +616,13 @@ class Irc: module_folder = module_name.split('_')[1].lower() # ==> defender class_name = module_name.split('_')[1].capitalize() # ==> Defender - if 'mods.' + module_name in sys.modules: + if f'mods.{module_folder}.{module_name}' in sys.modules: self.Logs.info('Unload the module ...') self.loaded_classes[class_name].unload() self.Logs.info('Module Already Loaded ... reloading the module ...') + + # Load dependencies + self.Base.reload_modules_with_dependencies(f'mods.{module_folder}') the_module = sys.modules[f'mods.{module_folder}.{module_name}'] importlib.reload(the_module) @@ -683,7 +686,7 @@ class Irc: if self.User.get_User(uid) is None: return None - getUser = self.User.get_User_AsDict(uid) + getUser = self.User.get_user_asdict(uid) level = int(level) diff --git a/core/utils.py b/core/utils.py index 9289f13..10bb365 100644 --- a/core/utils.py +++ b/core/utils.py @@ -1,3 +1,6 @@ +import sys +import importlib +from types import ModuleType from typing import Literal, Union from datetime import datetime from time import time diff --git a/mods/defender/mod_defender.py b/mods/defender/mod_defender.py index d989f11..250864f 100644 --- a/mods/defender/mod_defender.py +++ b/mods/defender/mod_defender.py @@ -1,14 +1,8 @@ -import socket -import json -import time -import re -import psutil -import requests -from datetime import datetime -from typing import Union, TYPE_CHECKING - -import core.definition as df +import traceback import mods.defender.schemas as schemas +import mods.defender.utils as utils +import mods.defender.threads as thds +from typing import TYPE_CHECKING if TYPE_CHECKING: from core.irc import Irc @@ -50,6 +44,9 @@ class Defender(): # Add module schemas self.Schemas = schemas + # Add utils functions + self.Utils = utils + # Create module commands (Mandatory) self.Irc.build_command(0, self.module_name, 'code', 'Display the code or key for access') self.Irc.build_command(1, self.module_name, 'info', 'Provide information about the channel or server') @@ -80,11 +77,11 @@ class Defender(): self.timeout = self.Config.API_TIMEOUT # Listes qui vont contenir les ip a scanner avec les différentes API - self.abuseipdb_UserModel: list[df.MUser] = [] - self.freeipapi_UserModel: list[df.MUser] = [] - self.cloudfilt_UserModel: list[df.MUser] = [] - self.psutil_UserModel: list[df.MUser] = [] - self.localscan_UserModel: list[df.MUser] = [] + self.Schemas.DB_ABUSEIPDB_USERS = [] + self.Schemas.DB_FREEIPAPI_USERS = [] + self.Schemas.DB_CLOUDFILT_USERS = [] + self.Schemas.DB_PSUTIL_USERS = [] + self.Schemas.DB_LOCALSCAN_USERS = [] # Variables qui indique que les threads sont en cours d'éxecutions self.abuseipdb_isRunning:bool = True @@ -106,15 +103,16 @@ class Defender(): self.cloudfilt_key = 'r1gEtjtfgRQjtNBDMxsg' # Démarrer les threads pour démarrer les api - self.Base.create_thread(func=self.thread_freeipapi_scan) - self.Base.create_thread(func=self.thread_cloudfilt_scan) - self.Base.create_thread(func=self.thread_abuseipdb_scan) - self.Base.create_thread(func=self.thread_local_scan) - self.Base.create_thread(func=self.thread_psutil_scan) - self.Base.create_thread(func=self.thread_reputation_timer) + self.Base.create_thread(func=thds.thread_freeipapi_scan, func_args=(self, )) + self.Base.create_thread(func=thds.thread_cloudfilt_scan, func_args=(self, )) + self.Base.create_thread(func=thds.thread_abuseipdb_scan, func_args=(self, )) + self.Base.create_thread(func=thds.thread_local_scan, func_args=(self, )) + self.Base.create_thread(func=thds.thread_psutil_scan, func_args=(self, )) + + self.Base.create_thread(func=thds.thread_apply_reputation_sanctions, func_args=(self, )) if self.ModConfig.autolimit == 1: - self.Base.create_thread(func=self.thread_autolimit) + self.Base.create_thread(func=thds.thread_autolimit, func_args=(self, )) if self.ModConfig.reputation == 1: self.Protocol.sjoin(self.Config.SALON_JAIL) @@ -164,11 +162,11 @@ class Defender(): """Cette methode sera executée a chaque désactivation ou rechargement de module """ - self.abuseipdb_UserModel: list[df.MUser] = [] - self.freeipapi_UserModel: list[df.MUser] = [] - self.cloudfilt_UserModel: list[df.MUser] = [] - self.psutil_UserModel: list[df.MUser] = [] - self.localscan_UserModel: list[df.MUser] = [] + self.Schemas.DB_ABUSEIPDB_USERS = [] + self.Schemas.DB_FREEIPAPI_USERS = [] + self.Schemas.DB_CLOUDFILT_USERS = [] + self.Schemas.DB_PSUTIL_USERS = [] + self.Schemas.DB_LOCALSCAN_USERS = [] self.abuseipdb_isRunning:bool = False self.freeipapi_isRunning:bool = False @@ -177,6 +175,7 @@ class Defender(): self.localscan_isRunning:bool = False self.reputationTimer_isRunning:bool = False self.autolimit_isRunning: bool = False + return None def insert_db_trusted(self, uid: str, nickname:str) -> None: @@ -218,226 +217,6 @@ class Defender(): except Exception as err: self.Logs.error(f"General Error: {err}") - def get_user_uptime_in_minutes(self, uidornickname:str) -> float: - """Retourne depuis quand l'utilisateur est connecté (en secondes ). - - Args: - uid (str): le uid ou le nickname de l'utilisateur - - Returns: - int: Temps de connexion de l'utilisateur en secondes - """ - - get_user = self.User.get_User(uidornickname) - if get_user is None: - return 0 - - # Convertir la date enregistrée dans UID_DB en un objet {datetime} - connected_time_string = get_user.connexion_datetime - - if isinstance(connected_time_string, datetime): - connected_time = connected_time_string - else: - connected_time = datetime.strptime(connected_time_string, "%Y-%m-%d %H:%M:%S.%f") - - # Quelle heure est-il ? - current_datetime = datetime.now() - - uptime = current_datetime - connected_time - convert_to_minutes = uptime.seconds / 60 - uptime_minutes = round(number=convert_to_minutes, ndigits=2) - - return uptime_minutes - - def system_reputation(self, uid: str)-> None: - # Reputation security - # - Activation ou désactivation du système --> OK - # - Le user sera en mesure de changer la limite de la réputation --> OK - # - Defender devra envoyer l'utilisateur sur un salon défini dans la configuration, {jail_chan} - # - Defender devra bloquer cet utilisateur sur le salon qui sera en mode (+m) - # - Defender devra envoyer un message du type "Merci de taper cette comande /msg {nomdudefender} {un code généré aléatoirement} - # - Defender devra reconnaître le code - # - Defender devra libérer l'utilisateur et l'envoyer vers un salon défini dans la configuration {welcome_chan} - # - Defender devra intégrer une liste d'IDs (pseudo/host) exemptés de 'Reputation security' malgré un score de rép. faible et un pseudo non enregistré. - try: - - get_reputation = self.Reputation.get_Reputation(uid) - - if get_reputation is None: - self.Logs.error(f'UID {uid} has not been found') - return False - - salon_logs = self.Config.SERVICE_CHANLOG - salon_jail = self.Config.SALON_JAIL - - code = get_reputation.secret_code - jailed_nickname = get_reputation.nickname - jailed_score = get_reputation.score_connexion - - color_red = self.Config.COLORS.red - color_black = self.Config.COLORS.black - color_bold = self.Config.COLORS.bold - nogc = self.Config.COLORS.nogc - service_id = self.Config.SERVICE_ID - service_prefix = self.Config.SERVICE_PREFIX - reputation_ban_all_chan = self.ModConfig.reputation_ban_all_chan - - if not get_reputation.isWebirc: - # Si le user ne vient pas de webIrc - - self.Protocol.send_sajoin(nick_to_sajoin=jailed_nickname, channel_name=salon_jail) - self.Protocol.send_priv_msg(nick_from=self.Config.SERVICE_NICKNAME, - msg=f" [{color_red} REPUTATION {nogc}] : Connexion de {jailed_nickname} ({jailed_score}) ==> {salon_jail}", - channel=salon_logs - ) - self.Protocol.send_notice( - nick_from=self.Config.SERVICE_NICKNAME, - nick_to=jailed_nickname, - msg=f"[{color_red} {jailed_nickname} {color_black}] : Merci de tapez la commande suivante {color_bold}{service_prefix}code {code}{color_bold}" - ) - if reputation_ban_all_chan == 1: - for chan in self.Channel.UID_CHANNEL_DB: - if chan.name != salon_jail: - self.Protocol.send2socket(f":{service_id} MODE {chan.name} +b {jailed_nickname}!*@*") - self.Protocol.send2socket(f":{service_id} KICK {chan.name} {jailed_nickname}") - - self.Logs.info(f"system_reputation : {jailed_nickname} à été capturé par le système de réputation") - # self.Irc.create_ping_timer(int(self.ModConfig.reputation_timer) * 60, 'Defender', 'system_reputation_timer') - # self.Base.create_timer(int(self.ModConfig.reputation_timer) * 60, self.system_reputation_timer) - else: - self.Logs.info(f"system_reputation : {jailed_nickname} à été supprimé du système de réputation car connecté via WebIrc ou il est dans la 'Trusted list'") - self.Reputation.delete(uid) - - except IndexError as e: - self.Logs.error(f"system_reputation : {str(e)}") - - def system_reputation_timer(self) -> None: - try: - reputation_flag = self.ModConfig.reputation - reputation_timer = self.ModConfig.reputation_timer - reputation_seuil = self.ModConfig.reputation_seuil - ban_all_chan = self.ModConfig.reputation_ban_all_chan - service_id = self.Config.SERVICE_ID - dchanlog = self.Config.SERVICE_CHANLOG - color_red = self.Config.COLORS.red - nogc = self.Config.COLORS.nogc - salon_jail = self.Config.SALON_JAIL - - if reputation_flag == 0: - return None - elif reputation_timer == 0: - return None - - uid_to_clean = [] - - for user in self.Reputation.UID_REPUTATION_DB: - if not user.isWebirc: # Si il ne vient pas de WebIRC - if self.get_user_uptime_in_minutes(user.uid) >= reputation_timer and int(user.score_connexion) <= int(reputation_seuil): - self.Protocol.send_priv_msg( - nick_from=service_id, - msg=f"[{color_red} REPUTATION {nogc}] : Action sur {user.nickname} aprés {str(reputation_timer)} minutes d'inactivité", - channel=dchanlog - ) - self.Protocol.send2socket(f":{service_id} KILL {user.nickname} After {str(reputation_timer)} minutes of inactivity you should reconnect and type the password code") - self.Protocol.send2socket(f":{self.Config.SERVEUR_LINK} REPUTATION {user.remote_ip} 0") - - self.Logs.info(f"Nickname: {user.nickname} KILLED after {str(reputation_timer)} minutes of inactivity") - uid_to_clean.append(user.uid) - - for uid in uid_to_clean: - # Suppression des éléments dans {UID_DB} et {REPUTATION_DB} - for chan in self.Channel.UID_CHANNEL_DB: - if chan.name != salon_jail and ban_all_chan == 1: - get_user_reputation = self.Reputation.get_Reputation(uid) - self.Protocol.send2socket(f":{service_id} MODE {chan.name} -b {get_user_reputation.nickname}!*@*") - - # Lorsqu'un utilisateur quitte, il doit être supprimé de {UID_DB}. - self.Channel.delete_user_from_all_channel(uid) - self.Reputation.delete(uid) - self.User.delete(uid) - - except AssertionError as ae: - self.Logs.error(f'Assertion Error -> {ae}') - - def thread_reputation_timer(self) -> None: - try: - while self.reputationTimer_isRunning: - self.system_reputation_timer() - time.sleep(5) - - return None - except ValueError as ve: - self.Logs.error(f"thread_reputation_timer Error : {ve}") - - def _execute_flood_action(self, action:str, channel:str) -> None: - """DO NOT EXECUTE THIS FUNCTION WITHOUT THREADING - - Args: - action (str): _description_ - channel (str): _description_ - - Returns: - _type_: _description_ - """ - service_id = self.Config.SERVICE_ID - match action: - case 'mode-m': - # Action -m sur le salon - self.Protocol.send2socket(f":{service_id} MODE {channel} -m") - case _: - pass - - return None - - def flood(self, detected_user:str, channel:str) -> None: - - if self.ModConfig.flood == 0: - return None - - if not self.Channel.Is_Channel(channelToCheck=channel): - return None - - flood_time = self.ModConfig.flood_time - flood_message = self.ModConfig.flood_message - flood_timer = self.ModConfig.flood_timer - service_id = self.Config.SERVICE_ID - dnickname = self.Config.SERVICE_NICKNAME - color_red = self.Config.COLORS.red - color_bold = self.Config.COLORS.bold - - get_detected_uid = self.User.get_uid(detected_user) - get_detected_nickname = self.User.get_nickname(detected_user) - - unixtime = self.Base.get_unixtime() - get_diff_secondes = 0 - - if get_detected_uid not in self.flood_system: - self.flood_system[get_detected_uid] = { - 'nbr_msg': 0, - 'first_msg_time': unixtime - } - - self.flood_system[get_detected_uid]['nbr_msg'] += 1 - get_diff_secondes = unixtime - self.flood_system[get_detected_uid]['first_msg_time'] - if get_diff_secondes > flood_time: - self.flood_system[get_detected_uid]['first_msg_time'] = unixtime - self.flood_system[get_detected_uid]['nbr_msg'] = 0 - get_diff_secondes = unixtime - self.flood_system[get_detected_uid]['first_msg_time'] - - elif self.flood_system[get_detected_uid]['nbr_msg'] > flood_message: - self.Irc.Base.logs.info('system de flood detecté') - self.Protocol.send_priv_msg( - nick_from=dnickname, - msg=f"{color_red} {color_bold} Flood detected. Apply the +m mode (Ô_o)", - channel=channel - ) - self.Protocol.send2socket(f":{service_id} MODE {channel} +m") - self.Irc.Base.logs.info(f'FLOOD Détecté sur {get_detected_nickname} mode +m appliqué sur le salon {channel}') - self.flood_system[get_detected_uid]['nbr_msg'] = 0 - self.flood_system[get_detected_uid]['first_msg_time'] = unixtime - - self.Base.create_timer(flood_timer, self._execute_flood_action, ('mode-m', channel)) - def run_db_action_timer(self, wait_for: float = 0) -> None: query = f"SELECT param_key FROM {self.Config.TABLE_CONFIG}" @@ -461,676 +240,49 @@ class Defender(): return None - def scan_ports(self, userModel: df.MUser) -> None: - """local_scan - - Args: - userModel (UserModel): _description_ - """ - User = userModel - remote_ip = User.remote_ip - username = User.username - hostname = User.hostname - nickname = User.nickname - fullname = f'{nickname}!{username}@{hostname}' - - if remote_ip in self.Config.WHITELISTED_IP: - return None - - for port in self.Config.PORTS_TO_SCAN: - try: - newSocket = '' - newSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM or socket.SOCK_NONBLOCK) - newSocket.settimeout(0.5) - - connection = (remote_ip, self.Base.int_if_possible(port)) - newSocket.connect(connection) - - self.Protocol.send_priv_msg( - nick_from=self.Config.SERVICE_NICKNAME, - msg=f"[ {self.Config.COLORS.red}PROXY_SCAN{self.Config.COLORS.nogc} ] {fullname} ({remote_ip}) : Port [{str(port)}] ouvert sur l'adresse ip [{remote_ip}]", - channel=self.Config.SERVICE_CHANLOG - ) - # print(f"=======> Le port {str(port)} est ouvert !!") - self.Base.running_sockets.append(newSocket) - # print(newSocket) - newSocket.shutdown(socket.SHUT_RDWR) - newSocket.close() - - except (socket.timeout, ConnectionRefusedError): - self.Logs.info(f"Le port {remote_ip}:{str(port)} est fermé") - except AttributeError as ae: - self.Logs.warning(f"AttributeError ({remote_ip}): {ae}") - except socket.gaierror as err: - self.Logs.warning(f"Address Info Error ({remote_ip}): {err}") - finally: - # newSocket.shutdown(socket.SHUT_RDWR) - newSocket.close() - self.Logs.info('=======> Fermeture de la socket') - - def thread_local_scan(self) -> None: - try: - while self.localscan_isRunning: - - list_to_remove:list = [] - for user in self.localscan_UserModel: - self.scan_ports(user) - list_to_remove.append(user) - time.sleep(1) - - for user_model in list_to_remove: - self.localscan_UserModel.remove(user_model) - - time.sleep(1) - - return None - except ValueError as ve: - self.Logs.warning(f"thread_local_scan Error : {ve}") - - def get_ports_connexion(self, userModel: df.MUser) -> list[int]: - """psutil_scan for Linux (should be run on the same location as the unrealircd server) - - Args: - userModel (UserModel): The User Model Object - - Returns: - list[int]: list of ports - """ - try: - User = userModel - remote_ip = User.remote_ip - username = User.username - hostname = User.hostname - nickname = User.nickname - - if remote_ip in self.Config.WHITELISTED_IP: - return None - - connections = psutil.net_connections(kind='inet') - fullname = f'{nickname}!{username}@{hostname}' - - matching_ports = [conn.raddr.port for conn in connections if conn.raddr and conn.raddr.ip == remote_ip] - self.Logs.info(f"Connexion of {fullname} ({remote_ip}) using ports : {str(matching_ports)}") - - if matching_ports: - self.Protocol.send_priv_msg( - nick_from=self.Config.SERVICE_NICKNAME, - msg=f"[ {self.Config.COLORS.red}PSUTIL_SCAN{self.Config.COLORS.black} ] {fullname} ({remote_ip}) : is using ports {matching_ports}", - channel=self.Config.SERVICE_CHANLOG - ) - - return matching_ports - - except psutil.AccessDenied as ad: - self.Logs.critical(f'psutil_scan: Permission error: {ad}') - - def thread_psutil_scan(self) -> None: - try: - - while self.psutil_isRunning: - - list_to_remove:list = [] - for user in self.psutil_UserModel: - self.get_ports_connexion(user) - list_to_remove.append(user) - time.sleep(1) - - for user_model in list_to_remove: - self.psutil_UserModel.remove(user_model) - - time.sleep(1) - - return None - except ValueError as ve: - self.Logs.warning(f"thread_psutil_scan Error : {ve}") - - def abuseipdb_scan(self, userModel: df.MUser) -> Union[dict[str, any], None]: - """Analyse l'ip avec AbuseIpDB - Cette methode devra etre lancer toujours via un thread ou un timer. - Args: - userModel (UserModel): l'objet User qui contient l'ip - - Returns: - dict[str, any] | None: les informations du provider - keys : 'score', 'country', 'isTor', 'totalReports' - """ - User = userModel - remote_ip = User.remote_ip - username = User.username - hostname = User.hostname - nickname = User.nickname - - if remote_ip in self.Config.WHITELISTED_IP: - return None - if self.ModConfig.abuseipdb_scan == 0: - return None - - if self.abuseipdb_key == '': - return None - - url = 'https://api.abuseipdb.com/api/v2/check' - querystring = { - 'ipAddress': remote_ip, - 'maxAgeInDays': '90' - } - - headers = { - 'Accept': 'application/json', - 'Key': self.abuseipdb_key - } - - try: - response = requests.request(method='GET', url=url, headers=headers, params=querystring, timeout=self.timeout) - - # Formatted output - decodedResponse = json.loads(response.text) - - if 'data' not in decodedResponse: - return None - - result = { - 'score': decodedResponse['data']['abuseConfidenceScore'], - 'country': decodedResponse['data']['countryCode'], - 'isTor': decodedResponse['data']['isTor'], - 'totalReports': decodedResponse['data']['totalReports'] - } - - service_id = self.Config.SERVICE_ID - service_chanlog = self.Config.SERVICE_CHANLOG - color_red = self.Config.COLORS.red - nogc = self.Config.COLORS.nogc - - # pseudo!ident@host - fullname = f'{nickname}!{username}@{hostname}' - - self.Protocol.send_priv_msg( - nick_from=service_id, - msg=f"[ {color_red}ABUSEIPDB_SCAN{nogc} ] : Connexion de {fullname} ({remote_ip}) ==> Score: {str(result['score'])} | Country : {result['country']} | Tor : {str(result['isTor'])} | Total Reports : {str(result['totalReports'])}", - channel=service_chanlog - ) - - if result['isTor']: - self.Protocol.send2socket(f":{service_id} GLINE +*@{remote_ip} {self.Config.GLINE_DURATION} This server do not allow Tor connexions {str(result['isTor'])} - Detected by Abuseipdb") - elif result['score'] >= 95: - self.Protocol.send2socket(f":{service_id} GLINE +*@{remote_ip} {self.Config.GLINE_DURATION} You were banned from this server because your abuse score is = {str(result['score'])} - Detected by Abuseipdb") - - response.close() - - return result - except KeyError as ke: - self.Logs.error(f"AbuseIpDb KeyError : {ke}") - except requests.ReadTimeout as rt: - self.Logs.error(f"AbuseIpDb Timeout : {rt}") - except requests.ConnectionError as ce: - self.Logs.error(f"AbuseIpDb Connection Error : {ce}") - except Exception as err: - self.Logs.error(f"General Error Abuseipdb : {err}") - - def thread_abuseipdb_scan(self) -> None: - try: - - while self.abuseipdb_isRunning: - - list_to_remove: list = [] - for user in self.abuseipdb_UserModel: - self.abuseipdb_scan(user) - list_to_remove.append(user) - time.sleep(1) - - for user_model in list_to_remove: - self.abuseipdb_UserModel.remove(user_model) - - time.sleep(1) - - return None - except ValueError as ve: - self.Logs.error(f"thread_abuseipdb_scan Error : {ve}") - - def freeipapi_scan(self, userModel: df.MUser) -> Union[dict[str, any], None]: - """Analyse l'ip avec Freeipapi - Cette methode devra etre lancer toujours via un thread ou un timer. - Args: - remote_ip (_type_): l'ip a analyser - - Returns: - dict[str, any] | None: les informations du provider - keys : 'countryCode', 'isProxy' - """ - User = userModel - remote_ip = User.remote_ip - username = User.username - hostname = User.hostname - nickname = User.nickname - - if remote_ip in self.Config.WHITELISTED_IP: - return None - if self.ModConfig.freeipapi_scan == 0: - return None - - service_id = self.Config.SERVICE_ID - service_chanlog = self.Config.SERVICE_CHANLOG - color_red = self.Config.COLORS.red - nogc = self.Config.COLORS.nogc - - url = f'https://freeipapi.com/api/json/{remote_ip}' - - headers = { - 'Accept': 'application/json', - } - - try: - response = requests.request(method='GET', url=url, headers=headers, timeout=self.timeout) - - # Formatted output - decodedResponse = json.loads(response.text) - - status_code = response.status_code - if status_code == 429: - self.Logs.warning('Too Many Requests - The rate limit for the API has been exceeded.') - return None - elif status_code != 200: - self.Logs.warning(f'status code = {str(status_code)}') - return None - - result = { - 'countryCode': decodedResponse['countryCode'] if 'countryCode' in decodedResponse else None, - 'isProxy': decodedResponse['isProxy'] if 'isProxy' in decodedResponse else None - } - - # pseudo!ident@host - fullname = f'{nickname}!{username}@{hostname}' - - self.Protocol.send_priv_msg( - nick_from=service_id, - msg=f"[ {color_red}FREEIPAPI_SCAN{nogc} ] : Connexion de {fullname} ({remote_ip}) ==> Proxy: {str(result['isProxy'])} | Country : {str(result['countryCode'])}", - channel=service_chanlog - ) - - if result['isProxy']: - self.Protocol.send2socket(f":{service_id} GLINE +*@{remote_ip} {self.Config.GLINE_DURATION} This server do not allow proxy connexions {str(result['isProxy'])} - detected by freeipapi") - response.close() - - return result - except KeyError as ke: - self.Logs.error(f"FREEIPAPI_SCAN KeyError : {ke}") - except Exception as err: - self.Logs.error(f"General Error Freeipapi : {err}") - - def thread_freeipapi_scan(self) -> None: - try: - - while self.freeipapi_isRunning: - - list_to_remove: list[df.MUser] = [] - for user in self.freeipapi_UserModel: - self.freeipapi_scan(user) - list_to_remove.append(user) - time.sleep(1) - - for user_model in list_to_remove: - self.freeipapi_UserModel.remove(user_model) - - time.sleep(1) - - return None - except ValueError as ve: - self.Logs.error(f"thread_freeipapi_scan Error : {ve}") - - def cloudfilt_scan(self, userModel: df.MUser) -> Union[dict[str, any], None]: - """Analyse l'ip avec cloudfilt - Cette methode devra etre lancer toujours via un thread ou un timer. - Args: - remote_ip (_type_): l'ip a analyser - - Returns: - dict[str, any] | None: les informations du provider - keys : 'countryCode', 'isProxy' - """ - User = userModel - remote_ip = User.remote_ip - username = User.username - hostname = User.hostname - nickname = User.nickname - - if remote_ip in self.Config.WHITELISTED_IP: - return None - if self.ModConfig.cloudfilt_scan == 0: - return None - if self.cloudfilt_key == '': - return None - - service_id = self.Config.SERVICE_ID - service_chanlog = self.Config.SERVICE_CHANLOG - color_red = self.Config.COLORS.red - nogc = self.Config.COLORS.nogc - - url = "https://developers18334.cloudfilt.com/" - - data = { - 'ip': remote_ip, - 'key': self.cloudfilt_key - } - - try: - response = requests.post(url=url, data=data) - # Formatted output - decodedResponse = json.loads(response.text) - status_code = response.status_code - if status_code != 200: - self.Logs.warning(f'Error connecting to cloudfilt API | Code: {str(status_code)}') - return None - - result = { - 'countryiso': decodedResponse['countryiso'] if 'countryiso' in decodedResponse else None, - 'listed': decodedResponse['listed'] if 'listed' in decodedResponse else None, - 'listed_by': decodedResponse['listed_by'] if 'listed_by' in decodedResponse else None, - 'host': decodedResponse['host'] if 'host' in decodedResponse else None - } - - # pseudo!ident@host - fullname = f'{nickname}!{username}@{hostname}' - - self.Protocol.send_priv_msg( - nick_from=service_id, - msg=f"[ {color_red}CLOUDFILT_SCAN{nogc} ] : Connexion de {fullname} ({remote_ip}) ==> Host: {str(result['host'])} | country: {str(result['countryiso'])} | listed: {str(result['listed'])} | listed by : {str(result['listed_by'])}", - channel=service_chanlog - ) - - if result['listed']: - self.Protocol.send2socket(f":{service_id} GLINE +*@{remote_ip} {self.Config.GLINE_DURATION} You connexion is listed as dangerous {str(result['listed'])} {str(result['listed_by'])} - detected by cloudfilt") - - response.close() - - return result - except KeyError as ke: - self.Logs.error(f"CLOUDFILT_SCAN KeyError : {ke}") - return None - - def thread_cloudfilt_scan(self) -> None: - try: - - while self.cloudfilt_isRunning: - - list_to_remove:list = [] - for user in self.cloudfilt_UserModel: - self.cloudfilt_scan(user) - list_to_remove.append(user) - time.sleep(1) - - for user_model in list_to_remove: - self.cloudfilt_UserModel.remove(user_model) - - time.sleep(1) - - return None - except ValueError as ve: - self.Logs.error(f"Thread_cloudfilt_scan Error : {ve}") - - def thread_autolimit(self) -> None: - - if self.ModConfig.autolimit == 0: - self.Logs.debug("autolimit deactivated ... canceling the thread") - return None - - while self.Irc.autolimit_started: - time.sleep(0.2) - - self.Irc.autolimit_started = True - init_amount = self.ModConfig.autolimit_amount - INIT = 1 - - # Copy Channels to a list of dict - chanObj_copy: list[dict[str, int]] = [{"name": c.name, "uids_count": len(c.uids)} for c in self.Channel.UID_CHANNEL_DB] - chan_list: list[str] = [c.name for c in self.Channel.UID_CHANNEL_DB] - - while self.autolimit_isRunning: - - if self.ModConfig.autolimit == 0: - self.Logs.debug("autolimit deactivated ... stopping the current thread") - break - - for chan in self.Channel.UID_CHANNEL_DB: - for chan_copy in chanObj_copy: - if chan_copy["name"] == chan.name and len(chan.uids) != chan_copy["uids_count"]: - self.Protocol.send2socket(f":{self.Config.SERVICE_ID} MODE {chan.name} +l {len(chan.uids) + self.ModConfig.autolimit_amount}") - chan_copy["uids_count"] = len(chan.uids) - - if chan.name not in chan_list: - chan_list.append(chan.name) - chanObj_copy.append({"name": chan.name, "uids_count": 0}) - - # Verifier si un salon a été vidé - current_chan_in_list = [d.name for d in self.Channel.UID_CHANNEL_DB] - for c in chan_list: - if c not in current_chan_in_list: - chan_list.remove(c) - - # Si c'est la premiere execution - if INIT == 1: - for chan in self.Channel.UID_CHANNEL_DB: - self.Protocol.send2socket(f":{self.Config.SERVICE_ID} MODE {chan.name} +l {len(chan.uids) + self.ModConfig.autolimit_amount}") - - # Si le nouveau amount est différent de l'initial - if init_amount != self.ModConfig.autolimit_amount: - init_amount = self.ModConfig.autolimit_amount - for chan in self.Channel.UID_CHANNEL_DB: - self.Protocol.send2socket(f":{self.Config.SERVICE_ID} MODE {chan.name} +l {len(chan.uids) + self.ModConfig.autolimit_amount}") - - INIT = 0 - - if self.autolimit_isRunning: - time.sleep(self.ModConfig.autolimit_interval) - - for chan in self.Channel.UID_CHANNEL_DB: - self.Protocol.send2socket(f":{self.Config.SERVICE_ID} MODE {chan.name} -l") - - self.Irc.autolimit_started = False - - return None - def cmd(self, data: list[str]) -> None: try: if not data or len(data) < 2: return None - service_id = self.Config.SERVICE_ID - p = self.Protocol cmd = data.copy() if isinstance(data, list) else list(data).copy() - index, command = p.get_ircd_protocol_poisition(cmd) + index, command = self.Irc.Protocol.get_ircd_protocol_poisition(cmd) if index == -1: return None match command: case 'REPUTATION': - # :001 REPUTATION 8.8.8.8 118 - try: - self.reputation_first_connexion['ip'] = cmd[2] - self.reputation_first_connexion['score'] = cmd[3] - if str(cmd[3]).find('*') != -1: - # If the reputation changed, we do not need to scan the IP - return None - - if not self.Base.is_valid_ip(cmd[2]): - return None - - # Possibilité de déclancher les bans a ce niveau. - except IndexError as ie: - self.Logs.error(f'cmd reputation: index error: {ie}') + self.Utils.handle_on_reputation(self, cmd) + return None case 'MODE': - # ['...', ':001XSCU0Q', 'MODE', '#jail', '+b', '~security-group:unknown-users'] - # ['@unrealircd.org/...', ':001C0MF01', 'MODE', '#services', '+l', '1'] - - channel = str(cmd[3]) - mode = str(cmd[4]) - group_to_check = str(cmd[5:]) - group_to_unban = '~security-group:unknown-users' - - if self.ModConfig.autolimit == 1: - if mode == '+l' or mode == '-l': - chan = self.Channel.get_Channel(channel) - p.send2socket(f":{self.Config.SERVICE_ID} MODE {chan.name} +l {len(chan.uids) + self.ModConfig.autolimit_amount}") - - if self.Config.SALON_JAIL == channel: - if mode == '+b' and group_to_unban in group_to_check: - p.send2socket(f":{service_id} MODE {self.Config.SALON_JAIL} -b ~security-group:unknown-users") - p.send2socket(f":{service_id} MODE {self.Config.SALON_JAIL} -eee ~security-group:webirc-users ~security-group:known-users ~security-group:websocket-users") - + self.Utils.handle_on_mode(self, cmd) return None case 'PRIVMSG': - # ['@mtag....',':python', 'PRIVMSG', '#defender', ':zefzefzregreg', 'regg', 'aerg'] - user_trigger = str(cmd[1]).replace(':','') - channel = cmd[3] - find_nickname = self.User.get_nickname(user_trigger) - self.flood(find_nickname, channel) + self.Utils.handle_on_privmsg(self, cmd) return None case 'UID': - # If Init then do nothing - if self.Config.DEFENDER_INIT == 1: - return None - - # Supprimer la premiere valeur et finir le code normalement - cmd.pop(0) - - # Get User information - _User = self.User.get_User(str(cmd[7])) - - if _User is None: - self.Logs.critical(f'This UID: [{cmd[7]}] is not available please check why') - return None - - # If user is not service or IrcOp then scan them - if not re.match(r'^.*[S|o?].*$', _User.umodes): - self.abuseipdb_UserModel.append(_User) if self.ModConfig.abuseipdb_scan == 1 and _User.remote_ip not in self.Config.WHITELISTED_IP else None - self.freeipapi_UserModel.append(_User) if self.ModConfig.freeipapi_scan == 1 and _User.remote_ip not in self.Config.WHITELISTED_IP else None - self.cloudfilt_UserModel.append(_User) if self.ModConfig.cloudfilt_scan == 1 and _User.remote_ip not in self.Config.WHITELISTED_IP else None - self.psutil_UserModel.append(_User) if self.ModConfig.psutil_scan == 1 and _User.remote_ip not in self.Config.WHITELISTED_IP else None - self.localscan_UserModel.append(_User) if self.ModConfig.local_scan == 1 and _User.remote_ip not in self.Config.WHITELISTED_IP else None - - reputation_flag = self.ModConfig.reputation - reputation_seuil = self.ModConfig.reputation_seuil - - if self.Config.DEFENDER_INIT == 0: - # Si le user n'es pas un service ni un IrcOP - if not re.match(r'^.*[S|o?].*$', _User.umodes): - if reputation_flag == 1 and _User.score_connexion <= reputation_seuil: - # currentDateTime = self.Base.get_datetime() - self.Reputation.insert( - self.Loader.Definition.MReputation( - **_User.to_dict(), - secret_code=self.Base.get_random(8) - ) - ) - if self.Reputation.is_exist(_User.uid): - if reputation_flag == 1 and _User.score_connexion <= reputation_seuil: - self.system_reputation(_User.uid) - self.Logs.info('Démarrer le systeme de reputation') - + self.Utils.handle_on_uid(self, cmd) return None case 'SJOIN': - # ['@msgid=F9B7JeHL5pj9nN57cJ5pEr..', ':001', 'SJOIN', '1702138958', '#welcome', ':0015L1AHL'] - try: - parsed_chan = cmd[4] if self.Channel.Is_Channel(cmd[4]) else None - parsed_UID = self.User.clean_uid(cmd[5]) - - if self.ModConfig.reputation == 1: - get_reputation = self.Reputation.get_Reputation(parsed_UID) - - if parsed_chan != self.Config.SALON_JAIL: - p.send2socket(f":{service_id} MODE {parsed_chan} +b ~security-group:unknown-users") - p.send2socket(f":{service_id} MODE {parsed_chan} +eee ~security-group:webirc-users ~security-group:known-users ~security-group:websocket-users") - - if get_reputation is not None: - isWebirc = get_reputation.isWebirc - - if not isWebirc: - if parsed_chan != self.Config.SALON_JAIL: - p.send_sapart(nick_to_sapart=get_reputation.nickname, channel_name=parsed_chan) - - if self.ModConfig.reputation_ban_all_chan == 1 and not isWebirc: - if parsed_chan != self.Config.SALON_JAIL: - p.send2socket(f":{service_id} MODE {parsed_chan} +b {get_reputation.nickname}!*@*") - p.send2socket(f":{service_id} KICK {parsed_chan} {get_reputation.nickname}") - - self.Logs.debug(f'SJOIN parsed_uid : {parsed_UID}') - - return None - - except KeyError as ke: - self.Logs.error(f"key error SJOIN : {ke}") - - case 'SLOG': - ['@unrealircd...', ':001', 'SLOG', 'info', 'blacklist', 'BLACKLIST_HIT', ':[Blacklist]', 'IP', '162.x.x.x', 'matches', 'blacklist', 'dronebl', '(dnsbl.dronebl.org/reply=6)'] - - if not self.Base.is_valid_ip(cmd[8]): - return None + self.Utils.handle_on_sjoin(self, cmd) return None - # if self.ModConfig.local_scan == 1 and not cmd[7] in self.Config.WHITELISTED_IP: - # self.localscan_remote_ip.append(cmd[7]) - - # if self.ModConfig.psutil_scan == 1 and not cmd[7] in self.Config.WHITELISTED_IP: - # self.psutil_remote_ip.append(cmd[7]) - - # if self.ModConfig.abuseipdb_scan == 1 and not cmd[7] in self.Config.WHITELISTED_IP: - # self.abuseipdb_remote_ip.append(cmd[7]) - - # if self.ModConfig.freeipapi_scan == 1 and not cmd[7] in self.Config.WHITELISTED_IP: - # self.freeipapi_remote_ip.append(cmd[7]) - - # if self.ModConfig.cloudfilt_scan == 1 and not cmd[7] in self.Config.WHITELISTED_IP: - # self.cloudfilt_remote_ip.append(cmd[7]) + case 'SLOG': + self.Utils.handle_on_slog(self, cmd) + return None case 'NICK': - # ['@unrealircd.org...', ':001MZQ0RB', 'NICK', 'halow', '1754663712'] - # Changement de nickname - try: - uid = self.User.clean_uid(str(cmd[1])) - get_Reputation = self.Reputation.get_Reputation(uid) - jail_salon = self.Config.SALON_JAIL - service_id = self.Config.SERVICE_ID - - if get_Reputation is None: - self.Logs.debug(f'This UID: {uid} is not listed in the reputation dataclass') - return None - - # Update the new nickname - oldnick = get_Reputation.nickname - newnickname = cmd[3] - get_Reputation.nickname = newnickname - - # If ban in all channel is ON then unban old nickname an ban the new nickname - if self.ModConfig.reputation_ban_all_chan == 1: - for chan in self.Channel.UID_CHANNEL_DB: - if chan.name != jail_salon: - p.send2socket(f":{service_id} MODE {chan.name} -b {oldnick}!*@*") - p.send2socket(f":{service_id} MODE {chan.name} +b {newnickname}!*@*") - - return None - - except KeyError as ke: - self.Logs.error(f'cmd - NICK - KeyError: {ke}') + self.Utils.handle_on_nick(self, cmd) + return None case 'QUIT': - # ['@unrealircd.org...', ':001MZQ0RB', 'QUIT', ':Quit:', '....'] - ban_all_chan = self.Base.int_if_possible(self.ModConfig.reputation_ban_all_chan) - final_UID = self.User.clean_uid(str(cmd[1])) - jail_salon = self.Config.SALON_JAIL - service_id = self.Config.SERVICE_ID - get_user_reputation = self.Reputation.get_Reputation(final_UID) - - if get_user_reputation is not None: - final_nickname = get_user_reputation.nickname - for chan in self.Channel.UID_CHANNEL_DB: - if chan.name != jail_salon and ban_all_chan == 1: - p.send2socket(f":{service_id} MODE {chan.name} -b {final_nickname}!*@*") - self.Reputation.delete(final_UID) - + self.Utils.handle_on_quit(self, cmd) return None case _: @@ -1142,6 +294,7 @@ class Defender(): self.Logs.error(f"{ie} / {cmd} / length {str(len(cmd))}") except Exception as err: self.Logs.error(f"General Error: {err}") + traceback.print_exc() def hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None: @@ -1252,7 +405,7 @@ class Defender(): if self.ModConfig.autolimit == 0: self.__update_configuration('autolimit', 1) self.autolimit_isRunning = True - self.Base.create_thread(self.thread_autolimit) + self.Base.create_thread(func=thds.thread_autolimit, func_args=(self, )) self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[{self.Config.COLORS.green}AUTOLIMIT{self.Config.COLORS.nogc}] Activated", channel=self.Config.SERVICE_CHANLOG) else: self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[{self.Config.COLORS.red}AUTOLIMIT{self.Config.COLORS.nogc}] Already activated", channel=self.Config.SERVICE_CHANLOG) diff --git a/mods/defender/schemas.py b/mods/defender/schemas.py index 1a91ae6..94e004f 100644 --- a/mods/defender/schemas.py +++ b/mods/defender/schemas.py @@ -1,4 +1,4 @@ -from core.definition import MainModel, dataclass +from core.definition import MainModel, dataclass, MUser @dataclass class ModConfModel(MainModel): @@ -19,4 +19,17 @@ class ModConfModel(MainModel): flood_timer: int = 20 autolimit: int = 0 autolimit_amount: int = 3 - autolimit_interval: int = 3 \ No newline at end of file + autolimit_interval: int = 3 + +@dataclass +class FloodUser: + uid: str = None + nbr_msg: int = 0 + first_msg_time: int = 0 + +DB_FLOOD_USERS: list[FloodUser] = [] +DB_ABUSEIPDB_USERS: list[MUser] = [] +DB_FREEIPAPI_USERS: list[MUser] = [] +DB_CLOUDFILT_USERS: list[MUser] = [] +DB_PSUTIL_USERS: list[MUser] = [] +DB_LOCALSCAN_USERS: list[MUser] = [] \ No newline at end of file diff --git a/mods/defender/threads.py b/mods/defender/threads.py new file mode 100644 index 0000000..eacaa49 --- /dev/null +++ b/mods/defender/threads.py @@ -0,0 +1,167 @@ +from typing import TYPE_CHECKING +from time import sleep + +if TYPE_CHECKING: + from mods.defender.mod_defender import Defender + +def thread_apply_reputation_sanctions(uplink: 'Defender'): + while uplink.reputationTimer_isRunning: + uplink.Utils.action_apply_reputation_santions(uplink) + sleep(5) + +def thread_cloudfilt_scan(uplink: 'Defender'): + + while uplink.cloudfilt_isRunning: + list_to_remove:list = [] + for user in uplink.Schemas.DB_CLOUDFILT_USERS: + uplink.Utils.action_scan_client_with_cloudfilt(uplink, user) + list_to_remove.append(user) + sleep(1) + + for user_model in list_to_remove: + uplink.Schemas.DB_CLOUDFILT_USERS.remove(user_model) + + sleep(1) + +def thread_freeipapi_scan(uplink: 'Defender'): + + while uplink.freeipapi_isRunning: + + list_to_remove: list = [] + for user in uplink.Schemas.DB_FREEIPAPI_USERS: + uplink.Utils.action_scan_client_with_freeipapi(uplink, user) + list_to_remove.append(user) + sleep(1) + + for user_model in list_to_remove: + uplink.Schemas.DB_FREEIPAPI_USERS.remove(user_model) + + sleep(1) + +def thread_abuseipdb_scan(uplink: 'Defender'): + + while uplink.abuseipdb_isRunning: + + list_to_remove: list = [] + for user in uplink.Schemas.DB_ABUSEIPDB_USERS: + uplink.Utils.action_scan_client_with_abuseipdb(uplink, user) + list_to_remove.append(user) + sleep(1) + + for user_model in list_to_remove: + uplink.Schemas.DB_ABUSEIPDB_USERS.remove(user_model) + + sleep(1) + +def thread_local_scan(uplink: 'Defender'): + + while uplink.localscan_isRunning: + list_to_remove:list = [] + for user in uplink.Schemas.DB_LOCALSCAN_USERS: + uplink.Utils.action_scan_client_with_local_socket(uplink, user) + list_to_remove.append(user) + sleep(1) + + for user_model in list_to_remove: + uplink.Schemas.DB_LOCALSCAN_USERS.remove(user_model) + + sleep(1) + +def thread_psutil_scan(uplink: 'Defender'): + + while uplink.psutil_isRunning: + + list_to_remove:list = [] + for user in uplink.Schemas.DB_PSUTIL_USERS: + uplink.Utils.action_scan_client_with_psutil(uplink, user) + list_to_remove.append(user) + sleep(1) + + for user_model in list_to_remove: + uplink.Schemas.DB_PSUTIL_USERS.remove(user_model) + + sleep(1) + +def thread_autolimit(uplink: 'Defender'): + + if uplink.ModConfig.autolimit == 0: + uplink.Logs.debug("autolimit deactivated ... canceling the thread") + return None + + while uplink.Irc.autolimit_started: + sleep(0.2) + + uplink.Irc.autolimit_started = True + init_amount = uplink.ModConfig.autolimit_amount + p = uplink.Protocol + INIT = 1 + + # Copy Channels to a list of dict + chanObj_copy: list[dict[str, int]] = [{"name": c.name, "uids_count": len(c.uids)} for c in uplink.Channel.UID_CHANNEL_DB] + chan_list: list[str] = [c.name for c in uplink.Channel.UID_CHANNEL_DB] + + while uplink.autolimit_isRunning: + + if uplink.ModConfig.autolimit == 0: + uplink.Logs.debug("autolimit deactivated ... stopping the current thread") + break + + for chan in uplink.Channel.UID_CHANNEL_DB: + for chan_copy in chanObj_copy: + if chan_copy["name"] == chan.name and len(chan.uids) != chan_copy["uids_count"]: + p.send2socket(f":{uplink.Config.SERVICE_ID} MODE {chan.name} +l {len(chan.uids) + uplink.ModConfig.autolimit_amount}") + chan_copy["uids_count"] = len(chan.uids) + + if chan.name not in chan_list: + chan_list.append(chan.name) + chanObj_copy.append({"name": chan.name, "uids_count": 0}) + + # Verifier si un salon a été vidé + current_chan_in_list = [d.name for d in uplink.Channel.UID_CHANNEL_DB] + for c in chan_list: + if c not in current_chan_in_list: + chan_list.remove(c) + + # Si c'est la premiere execution + if INIT == 1: + for chan in uplink.Channel.UID_CHANNEL_DB: + p.send2socket(f":{uplink.Config.SERVICE_ID} MODE {chan.name} +l {len(chan.uids) + uplink.ModConfig.autolimit_amount}") + + # Si le nouveau amount est différent de l'initial + if init_amount != uplink.ModConfig.autolimit_amount: + init_amount = uplink.ModConfig.autolimit_amount + for chan in uplink.Channel.UID_CHANNEL_DB: + p.send2socket(f":{uplink.Config.SERVICE_ID} MODE {chan.name} +l {len(chan.uids) + uplink.ModConfig.autolimit_amount}") + + INIT = 0 + + if uplink.autolimit_isRunning: + sleep(uplink.ModConfig.autolimit_interval) + + for chan in uplink.Channel.UID_CHANNEL_DB: + p.send2socket(f":{uplink.Config.SERVICE_ID} MODE {chan.name} -l") + + uplink.Irc.autolimit_started = False + + return None + +def timer_release_mode_mute(uplink: 'Defender', action: str, channel: str): + """DO NOT EXECUTE THIS FUNCTION WITHOUT THREADING + + Args: + action (str): _description_ + channel (str): The related channel + + """ + service_id = uplink.Config.SERVICE_ID + + if not uplink.Channel.Is_Channel(channel): + uplink.Logs.debug(f"Channel is not valid {channel}") + return + + match action: + case 'mode-m': + # Action -m sur le salon + uplink.Protocol.send2socket(f":{service_id} MODE {channel} -m") + case _: + pass diff --git a/mods/defender/utils.py b/mods/defender/utils.py index e69de29..7b2bf07 100644 --- a/mods/defender/utils.py +++ b/mods/defender/utils.py @@ -0,0 +1,709 @@ +import socket +import psutil +import requests +import mods.defender.threads as dthreads +from json import loads +from re import match +from typing import TYPE_CHECKING, Optional +from mods.defender.schemas import FloodUser + +if TYPE_CHECKING: + from core.definition import MUser + from mods.defender.mod_defender import Defender + +def handle_on_reputation(uplink: 'Defender', srvmsg: list[str]): + """Handle reputation server message + >>> srvmsg = [':001', 'REPUTATION', '128.128.128.128', '0'] + >>> srvmsg = [':001', 'REPUTATION', '128.128.128.128', '*0'] + Args: + irc_instance (Irc): The Irc instance + srvmsg (list[str]): The Server MSG + """ + ip = srvmsg[2] + score = srvmsg[3] + + if str(ip).find('*') != -1: + # If the reputation changed, we do not need to scan the IP + return + + # Possibilité de déclancher les bans a ce niveau. + if not uplink.Base.is_valid_ip(ip): + return + +def handle_on_mode(uplink: 'Defender', srvmsg: list[str]): + """_summary_ + >>> srvmsg = ['@unrealircd.org/...', ':001C0MF01', 'MODE', '#services', '+l', '1'] + >>> srvmsg = ['...', ':001XSCU0Q', 'MODE', '#jail', '+b', '~security-group:unknown-users'] + Args: + irc_instance (Irc): The Irc instance + srvmsg (list[str]): The Server MSG + confmodel (ModConfModel): The Module Configuration + """ + irc = uplink.Irc + gconfig = uplink.Config + p = uplink.Protocol + confmodel = uplink.ModConfig + + channel = str(srvmsg[3]) + mode = str(srvmsg[4]) + group_to_check = str(srvmsg[5:]) + group_to_unban = '~security-group:unknown-users' + + if confmodel.autolimit == 1: + if mode == '+l' or mode == '-l': + chan = irc.Channel.get_Channel(channel) + p.send2socket(f":{gconfig.SERVICE_ID} MODE {chan.name} +l {len(chan.uids) + confmodel.autolimit_amount}") + + if gconfig.SALON_JAIL == channel: + if mode == '+b' and group_to_unban in group_to_check: + p.send2socket(f":{gconfig.SERVICE_ID} MODE {gconfig.SALON_JAIL} -b ~security-group:unknown-users") + p.send2socket(f":{gconfig.SERVICE_ID} MODE {gconfig.SALON_JAIL} -eee ~security-group:webirc-users ~security-group:known-users ~security-group:websocket-users") + +def handle_on_privmsg(uplink: 'Defender', srvmsg: list[str]): + # ['@mtag....',':python', 'PRIVMSG', '#defender', ':zefzefzregreg', 'regg', 'aerg'] + action_on_flood(uplink, srvmsg) + return None + +def handle_on_sjoin(uplink: 'Defender', srvmsg: list[str]): + """If Joining a new channel, it applies group bans. + + >>> srvmsg = ['@msgid..', ':001', 'SJOIN', '1702138958', '#welcome', ':0015L1AHL'] + + Args: + irc_instance (Irc): The Irc instance + srvmsg (list[str]): The Server MSG + confmodel (ModConfModel): The Module Configuration + """ + irc = uplink.Irc + p = irc.Protocol + gconfig = uplink.Config + confmodel = uplink.ModConfig + + parsed_chan = srvmsg[4] if irc.Channel.Is_Channel(srvmsg[4]) else None + parsed_UID = irc.User.clean_uid(srvmsg[5]) + + if parsed_chan is None or parsed_UID is None: + return + + if confmodel.reputation == 1: + get_reputation = irc.Reputation.get_Reputation(parsed_UID) + + if parsed_chan != gconfig.SALON_JAIL: + p.send2socket(f":{gconfig.SERVICE_ID} MODE {parsed_chan} +b ~security-group:unknown-users") + p.send2socket(f":{gconfig.SERVICE_ID} MODE {parsed_chan} +eee ~security-group:webirc-users ~security-group:known-users ~security-group:websocket-users") + + if get_reputation is not None: + isWebirc = get_reputation.isWebirc + + if not isWebirc: + if parsed_chan != gconfig.SALON_JAIL: + p.send_sapart(nick_to_sapart=get_reputation.nickname, channel_name=parsed_chan) + + if confmodel.reputation_ban_all_chan == 1 and not isWebirc: + if parsed_chan != gconfig.SALON_JAIL: + p.send2socket(f":{gconfig.SERVICE_ID} MODE {parsed_chan} +b {get_reputation.nickname}!*@*") + p.send2socket(f":{gconfig.SERVICE_ID} KICK {parsed_chan} {get_reputation.nickname}") + + irc.Logs.debug(f'SJOIN parsed_uid : {parsed_UID}') + +def handle_on_slog(uplink: 'Defender', srvmsg: list[str]): + """Handling SLOG messages + >>> srvmsg = ['@unrealircd...', ':001', 'SLOG', 'info', 'blacklist', 'BLACKLIST_HIT', ':[Blacklist]', 'IP', '162.x.x.x', 'matches', 'blacklist', 'dronebl', '(dnsbl.dronebl.org/reply=6)'] + Args: + irc_instance (Irc): The Irc instance + srvmsg (list[str]): The Server MSG + confmodel (ModConfModel): The Module Configuration + """ + ['@unrealircd...', ':001', 'SLOG', 'info', 'blacklist', 'BLACKLIST_HIT', ':[Blacklist]', 'IP', '162.x.x.x', 'matches', 'blacklist', 'dronebl', '(dnsbl.dronebl.org/reply=6)'] + + if not uplink.Base.is_valid_ip(srvmsg[8]): + return None + + # if self.ModConfig.local_scan == 1 and not cmd[7] in self.Config.WHITELISTED_IP: + # self.localscan_remote_ip.append(cmd[7]) + + # if self.ModConfig.psutil_scan == 1 and not cmd[7] in self.Config.WHITELISTED_IP: + # self.psutil_remote_ip.append(cmd[7]) + + # if self.ModConfig.abuseipdb_scan == 1 and not cmd[7] in self.Config.WHITELISTED_IP: + # self.abuseipdb_remote_ip.append(cmd[7]) + + # if self.ModConfig.freeipapi_scan == 1 and not cmd[7] in self.Config.WHITELISTED_IP: + # self.freeipapi_remote_ip.append(cmd[7]) + + # if self.ModConfig.cloudfilt_scan == 1 and not cmd[7] in self.Config.WHITELISTED_IP: + # self.cloudfilt_remote_ip.append(cmd[7]) + + return None + +def handle_on_nick(uplink: 'Defender', srvmsg: list[str]): + """_summary_ + >>> srvmsg = ['@unrealircd.org...', ':001MZQ0RB', 'NICK', 'newnickname', '1754663712'] + Args: + irc_instance (Irc): The Irc instance + srvmsg (list[str]): The Server MSG + confmodel (ModConfModel): The Module Configuration + """ + uid = uplink.User.clean_uid(str(srvmsg[1])) + p = uplink.Protocol + confmodel = uplink.ModConfig + + get_reputation = uplink.Reputation.get_Reputation(uid) + jail_salon = uplink.Config.SALON_JAIL + service_id = uplink.Config.SERVICE_ID + + if get_reputation is None: + uplink.Logs.debug(f'This UID: {uid} is not listed in the reputation dataclass') + return None + + # Update the new nickname + oldnick = get_reputation.nickname + newnickname = srvmsg[3] + get_reputation.nickname = newnickname + + # If ban in all channel is ON then unban old nickname an ban the new nickname + if confmodel.reputation_ban_all_chan == 1: + for chan in uplink.Channel.UID_CHANNEL_DB: + if chan.name != jail_salon: + p.send2socket(f":{service_id} MODE {chan.name} -b {oldnick}!*@*") + p.send2socket(f":{service_id} MODE {chan.name} +b {newnickname}!*@*") + +def handle_on_quit(uplink: 'Defender', srvmsg: list[str]): + """_summary_ + >>> srvmsg = ['@unrealircd.org...', ':001MZQ0RB', 'QUIT', ':Quit:', 'quit message'] + Args: + uplink (Irc): The Defender Module instance + srvmsg (list[str]): The Server MSG + """ + p = uplink.Protocol + confmodel = uplink.ModConfig + + ban_all_chan = uplink.Base.int_if_possible(confmodel.reputation_ban_all_chan) + final_UID = uplink.User.clean_uid(str(srvmsg[1])) + jail_salon = uplink.Config.SALON_JAIL + service_id = uplink.Config.SERVICE_ID + get_user_reputation = uplink.Reputation.get_Reputation(final_UID) + + if get_user_reputation is not None: + final_nickname = get_user_reputation.nickname + for chan in uplink.Channel.UID_CHANNEL_DB: + if chan.name != jail_salon and ban_all_chan == 1: + p.send2socket(f":{service_id} MODE {chan.name} -b {final_nickname}!*@*") + uplink.Logs.debug(f"Mode -b {final_nickname} on channel {chan.name}") + + uplink.Reputation.delete(final_UID) + uplink.Logs.debug(f"Client {get_user_reputation.nickname} has been removed from Reputation local DB") + +def handle_on_uid(uplink: 'Defender', srvmsg: list[str]): + """_summary_ + >>> ['@s2s-md...', ':001', 'UID', 'nickname', '0', '1754675249', '...', '125-168-141-239.hostname.net', '001BAPN8M', + '0', '+iwx', '*', '32001BBE.25ACEFE7.429FE90D.IP', 'ZA2ic7w==', ':realname'] + + Args: + uplink (Defender): The Defender instance + srvmsg (list[str]): The Server MSG + """ + gconfig = uplink.Config + irc = uplink.Irc + confmodel = uplink.ModConfig + + # If Init then do nothing + if gconfig.DEFENDER_INIT == 1: + return None + + # Get User information + _User = irc.User.get_User(str(srvmsg[8])) + + if _User is None: + irc.Logs.warning(f'This UID: [{srvmsg[8]}] is not available please check why') + return + + # If user is not service or IrcOp then scan them + if not match(r'^.*[S|o?].*$', _User.umodes): + uplink.Schemas.DB_ABUSEIPDB_USERS.append(_User) if confmodel.abuseipdb_scan == 1 and _User.remote_ip not in gconfig.WHITELISTED_IP else None + uplink.Schemas.DB_FREEIPAPI_USERS.append(_User) if confmodel.freeipapi_scan == 1 and _User.remote_ip not in gconfig.WHITELISTED_IP else None + uplink.Schemas.DB_CLOUDFILT_USERS.append(_User) if confmodel.cloudfilt_scan == 1 and _User.remote_ip not in gconfig.WHITELISTED_IP else None + uplink.Schemas.DB_PSUTIL_USERS.append(_User) if confmodel.psutil_scan == 1 and _User.remote_ip not in gconfig.WHITELISTED_IP else None + uplink.Schemas.DB_LOCALSCAN_USERS.append(_User) if confmodel.local_scan == 1 and _User.remote_ip not in gconfig.WHITELISTED_IP else None + + reputation_flag = confmodel.reputation + reputation_seuil = confmodel.reputation_seuil + + if gconfig.DEFENDER_INIT == 0: + # Si le user n'es pas un service ni un IrcOP + if not match(r'^.*[S|o?].*$', _User.umodes): + if reputation_flag == 1 and _User.score_connexion <= reputation_seuil: + # currentDateTime = self.Base.get_datetime() + irc.Reputation.insert( + irc.Loader.Definition.MReputation( + **_User.to_dict(), + secret_code=irc.Base.get_random(8) + ) + ) + if irc.Reputation.is_exist(_User.uid): + if reputation_flag == 1 and _User.score_connexion <= reputation_seuil: + action_add_reputation_sanctions(uplink, _User.uid) + irc.Logs.info(f'[REPUTATION] Reputation system ON (Nickname: {_User.nickname}, uid: {_User.uid})') + +#################### +# ACTION FUNCTIONS # +#################### + +def action_on_flood(uplink: 'Defender', srvmsg: list[str]): + + confmodel = uplink.ModConfig + if confmodel.flood == 0: + return None + + irc = uplink.Irc + gconfig = uplink.Config + p = uplink.Protocol + flood_users = uplink.Schemas.DB_FLOOD_USERS + + user_trigger = str(srvmsg[1]).replace(':','') + channel = srvmsg[3] + User = irc.User.get_User(user_trigger) + + if User is None or not irc.Channel.Is_Channel(channelToCheck=channel): + return + + flood_time = confmodel.flood_time + flood_message = confmodel.flood_message + flood_timer = confmodel.flood_timer + service_id = gconfig.SERVICE_ID + dnickname = gconfig.SERVICE_NICKNAME + color_red = gconfig.COLORS.red + color_bold = gconfig.COLORS.bold + + get_detected_uid = User.uid + get_detected_nickname = User.nickname + unixtime = irc.Base.get_unixtime() + get_diff_secondes = 0 + + def get_flood_user(uid: str) -> Optional[FloodUser]: + for flood_user in flood_users: + if flood_user.uid == uid: + return flood_user + + fu = get_flood_user(get_detected_uid) + if fu is None: + fu = FloodUser(get_detected_uid, 0, unixtime) + flood_users.append(fu) + + fu.nbr_msg += 1 + + get_diff_secondes = unixtime - fu.first_msg_time + if get_diff_secondes > flood_time: + fu.first_msg_time = unixtime + fu.nbr_msg = 0 + get_diff_secondes = unixtime - fu.first_msg_time + elif fu.nbr_msg > flood_message: + irc.Logs.info('system de flood detecté') + p.send_priv_msg( + nick_from=dnickname, + msg=f"{color_red} {color_bold} Flood detected. Apply the +m mode (Ô_o)", + channel=channel + ) + p.send2socket(f":{service_id} MODE {channel} +m") + irc.Logs.info(f'FLOOD Détecté sur {get_detected_nickname} mode +m appliqué sur le salon {channel}') + fu.nbr_msg = 0 + fu.first_msg_time = unixtime + irc.Base.create_timer(flood_timer, dthreads.timer_release_mode_mute, (uplink, 'mode-m', channel)) + +def action_add_reputation_sanctions(uplink: 'Defender', jailed_uid: str ): + + irc = uplink.Irc + gconfig = uplink.Config + p = uplink.Protocol + confmodel = uplink.ModConfig + + get_reputation = irc.Reputation.get_Reputation(jailed_uid) + + if get_reputation is None: + irc.Logs.warning(f'UID {jailed_uid} has not been found') + return + + salon_logs = gconfig.SERVICE_CHANLOG + salon_jail = gconfig.SALON_JAIL + + code = get_reputation.secret_code + jailed_nickname = get_reputation.nickname + jailed_score = get_reputation.score_connexion + + color_red = gconfig.COLORS.red + color_black = gconfig.COLORS.black + color_bold = gconfig.COLORS.bold + nogc = gconfig.COLORS.nogc + service_id = gconfig.SERVICE_ID + service_prefix = gconfig.SERVICE_PREFIX + reputation_ban_all_chan = confmodel.reputation_ban_all_chan + + if not get_reputation.isWebirc: + # Si le user ne vient pas de webIrc + p.send_sajoin(nick_to_sajoin=jailed_nickname, channel_name=salon_jail) + p.send_priv_msg(nick_from=gconfig.SERVICE_NICKNAME, + msg=f" [{color_red} REPUTATION {nogc}] : Connexion de {jailed_nickname} ({jailed_score}) ==> {salon_jail}", + channel=salon_logs + ) + p.send_notice( + nick_from=gconfig.SERVICE_NICKNAME, + nick_to=jailed_nickname, + msg=f"[{color_red} {jailed_nickname} {color_black}] : Merci de tapez la commande suivante {color_bold}{service_prefix}code {code}{color_bold}" + ) + if reputation_ban_all_chan == 1: + for chan in irc.Channel.UID_CHANNEL_DB: + if chan.name != salon_jail: + p.send2socket(f":{service_id} MODE {chan.name} +b {jailed_nickname}!*@*") + p.send2socket(f":{service_id} KICK {chan.name} {jailed_nickname}") + + irc.Logs.info(f"[REPUTATION] {jailed_nickname} jailed (UID: {jailed_uid}, score: {jailed_score})") + else: + irc.Logs.info(f"[REPUTATION] {jailed_nickname} skipped (trusted or WebIRC)") + irc.Reputation.delete(jailed_uid) + +def action_apply_reputation_santions(uplink: 'Defender') -> None: + + irc = uplink.Irc + gconfig = uplink.Config + p = uplink.Protocol + confmodel = uplink.ModConfig + + reputation_flag = confmodel.reputation + reputation_timer = confmodel.reputation_timer + reputation_seuil = confmodel.reputation_seuil + ban_all_chan = confmodel.reputation_ban_all_chan + service_id = gconfig.SERVICE_ID + dchanlog = gconfig.SERVICE_CHANLOG + color_red = gconfig.COLORS.red + nogc = gconfig.COLORS.nogc + salon_jail = gconfig.SALON_JAIL + + if reputation_flag == 0: + return None + elif reputation_timer == 0: + return None + + uid_to_clean = [] + + for user in irc.Reputation.UID_REPUTATION_DB: + if not user.isWebirc: # Si il ne vient pas de WebIRC + if irc.User.get_user_uptime_in_minutes(user.uid) >= reputation_timer and int(user.score_connexion) <= int(reputation_seuil): + p.send_priv_msg( + nick_from=service_id, + msg=f"[{color_red} REPUTATION {nogc}] : Action sur {user.nickname} aprés {str(reputation_timer)} minutes d'inactivité", + channel=dchanlog + ) + p.send2socket(f":{service_id} KILL {user.nickname} After {str(reputation_timer)} minutes of inactivity you should reconnect and type the password code") + p.send2socket(f":{gconfig.SERVEUR_LINK} REPUTATION {user.remote_ip} 0") + + irc.Logs.info(f"Nickname: {user.nickname} KILLED after {str(reputation_timer)} minutes of inactivity") + uid_to_clean.append(user.uid) + + for uid in uid_to_clean: + # Suppression des éléments dans {UID_DB} et {REPUTATION_DB} + for chan in irc.Channel.UID_CHANNEL_DB: + if chan.name != salon_jail and ban_all_chan == 1: + get_user_reputation = irc.Reputation.get_Reputation(uid) + p.send2socket(f":{service_id} MODE {chan.name} -b {get_user_reputation.nickname}!*@*") + + # Lorsqu'un utilisateur quitte, il doit être supprimé de {UID_DB}. + irc.Channel.delete_user_from_all_channel(uid) + irc.Reputation.delete(uid) + irc.User.delete(uid) + +def action_scan_client_with_cloudfilt(uplink: 'Defender', user_model: 'MUser') -> Optional[dict[str, str]]: + """Analyse l'ip avec cloudfilt + Cette methode devra etre lancer toujours via un thread ou un timer. + Args: + uplink (Defender): Defender Instance + + Returns: + dict[str, any] | None: les informations du provider + keys : 'countryCode', 'isProxy' + """ + + remote_ip = user_model.remote_ip + username = user_model.username + hostname = user_model.hostname + nickname = user_model.nickname + p = uplink.Protocol + + if remote_ip in uplink.Config.WHITELISTED_IP: + return None + if uplink.ModConfig.cloudfilt_scan == 0: + return None + if uplink.cloudfilt_key == '': + return None + + service_id = uplink.Config.SERVICE_ID + service_chanlog = uplink.Config.SERVICE_CHANLOG + color_red = uplink.Config.COLORS.red + nogc = uplink.Config.COLORS.nogc + + url = "https://developers18334.cloudfilt.com/" + + data = { + 'ip': remote_ip, + 'key': uplink.cloudfilt_key + } + + response = requests.post(url=url, data=data) + # Formatted output + decoded_response: dict = loads(response.text) + status_code = response.status_code + if status_code != 200: + uplink.Logs.warning(f'Error connecting to cloudfilt API | Code: {str(status_code)}') + return + + result = { + 'countryiso': decoded_response.get('countryiso', None), + 'listed': decoded_response.get('listed', None), + 'listed_by': decoded_response.get('listed_by', None), + 'host': decoded_response.get('host', None) + } + + # pseudo!ident@host + fullname = f'{nickname}!{username}@{hostname}' + + p.send_priv_msg( + nick_from=service_id, + msg=f"[ {color_red}CLOUDFILT_SCAN{nogc} ] : Connexion de {fullname} ({remote_ip}) ==> Host: {str(result['host'])} | country: {str(result['countryiso'])} | listed: {str(result['listed'])} | listed by : {str(result['listed_by'])}", + channel=service_chanlog) + + uplink.Logs.debug(f"[CLOUDFILT SCAN] ({fullname}) connected from ({result['countryiso']}), Listed: {result['listed']}, by: {result['listed_by']}") + + if result['listed']: + p.send2socket(f":{service_id} GLINE +*@{remote_ip} {uplink.Config.GLINE_DURATION} Your connexion is listed as dangerous {str(result['listed'])} {str(result['listed_by'])} - detected by cloudfilt") + uplink.Logs.debug(f"[CLOUDFILT SCAN GLINE] Dangerous connection ({fullname}) from ({result['countryiso']}) Listed: {result['listed']}, by: {result['listed_by']}") + + response.close() + + return result + +def action_scan_client_with_freeipapi(uplink: 'Defender', user_model: 'MUser') -> Optional[dict[str, str]]: + """Analyse l'ip avec Freeipapi + Cette methode devra etre lancer toujours via un thread ou un timer. + Args: + uplink (Defender): The Defender object Instance + + Returns: + dict[str, any] | None: les informations du provider + keys : 'countryCode', 'isProxy' + """ + p = uplink.Protocol + remote_ip = user_model.remote_ip + username = user_model.username + hostname = user_model.hostname + nickname = user_model.nickname + + if remote_ip in uplink.Config.WHITELISTED_IP: + return None + if uplink.ModConfig.freeipapi_scan == 0: + return None + + service_id = uplink.Config.SERVICE_ID + service_chanlog = uplink.Config.SERVICE_CHANLOG + color_red = uplink.Config.COLORS.red + nogc = uplink.Config.COLORS.nogc + + url = f'https://freeipapi.com/api/json/{remote_ip}' + + headers = { + 'Accept': 'application/json', + } + + response = requests.request(method='GET', url=url, headers=headers, timeout=uplink.timeout) + + # Formatted output + decoded_response: dict = loads(response.text) + + status_code = response.status_code + if status_code == 429: + uplink.Logs.warning('Too Many Requests - The rate limit for the API has been exceeded.') + return None + elif status_code != 200: + uplink.Logs.warning(f'status code = {str(status_code)}') + return None + + result = { + 'countryCode': decoded_response.get('countryCode', None), + 'isProxy': decoded_response.get('isProxy', None) + } + + # pseudo!ident@host + fullname = f'{nickname}!{username}@{hostname}' + + p.send_priv_msg( + nick_from=service_id, + msg=f"[ {color_red}FREEIPAPI_SCAN{nogc} ] : Connexion de {fullname} ({remote_ip}) ==> Proxy: {str(result['isProxy'])} | Country : {str(result['countryCode'])}", + channel=service_chanlog) + uplink.Logs.debug(f"[FREEIPAPI SCAN] ({fullname}) connected from ({result['countryCode']}), Proxy: {result['isProxy']}") + + if result['isProxy']: + p.send2socket(f":{service_id} GLINE +*@{remote_ip} {uplink.Config.GLINE_DURATION} This server do not allow proxy connexions {str(result['isProxy'])} - detected by freeipapi") + uplink.Logs.debug(f"[FREEIPAPI SCAN GLINE] Server do not allow proxy connexions {result['isProxy']}") + + response.close() + + return result + +def action_scan_client_with_abuseipdb(uplink: 'Defender', user_model: 'MUser') -> Optional[dict[str, str]]: + """Analyse l'ip avec AbuseIpDB + Cette methode devra etre lancer toujours via un thread ou un timer. + Args: + uplink (Defender): Defender instance object + user_model (MUser): l'objet User qui contient l'ip + + Returns: + dict[str, str] | None: les informations du provider + """ + p = uplink.Protocol + remote_ip = user_model.remote_ip + username = user_model.username + hostname = user_model.hostname + nickname = user_model.nickname + + if remote_ip in uplink.Config.WHITELISTED_IP: + return None + if uplink.ModConfig.abuseipdb_scan == 0: + return None + + if uplink.abuseipdb_key == '': + return None + + url = 'https://api.abuseipdb.com/api/v2/check' + querystring = { + 'ipAddress': remote_ip, + 'maxAgeInDays': '90' + } + + headers = { + 'Accept': 'application/json', + 'Key': uplink.abuseipdb_key + } + + response = requests.request(method='GET', url=url, headers=headers, params=querystring, timeout=uplink.timeout) + + # Formatted output + decoded_response: dict[str, dict] = loads(response.text) + + if 'data' not in decoded_response: + return None + + result = { + 'score': decoded_response.get('data', {}).get('abuseConfidenceScore', 0), + 'country': decoded_response.get('data', {}).get('countryCode', None), + 'isTor': decoded_response.get('data', {}).get('isTor', None), + 'totalReports': decoded_response.get('data', {}).get('totalReports', 0) + } + + service_id = uplink.Config.SERVICE_ID + service_chanlog = uplink.Config.SERVICE_CHANLOG + color_red = uplink.Config.COLORS.red + nogc = uplink.Config.COLORS.nogc + + # pseudo!ident@host + fullname = f'{nickname}!{username}@{hostname}' + + p.send_priv_msg( + nick_from=service_id, + msg=f"[ {color_red}ABUSEIPDB_SCAN{nogc} ] : Connexion de {fullname} ({remote_ip}) ==> Score: {str(result['score'])} | Country : {result['country']} | Tor : {str(result['isTor'])} | Total Reports : {str(result['totalReports'])}", + channel=service_chanlog + ) + uplink.Logs.debug(f"[ABUSEIPDB SCAN] ({fullname}) connected from ({result['country']}), Score: {result['score']}, Tor: {result['isTor']}") + + if result['isTor']: + p.send2socket(f":{service_id} GLINE +*@{remote_ip} {uplink.Config.GLINE_DURATION} This server do not allow Tor connexions {str(result['isTor'])} - Detected by Abuseipdb") + uplink.Logs.debug(f"[ABUSEIPDB SCAN GLINE] Server do not allow Tor connections Tor: {result['isTor']}, Score: {result['score']}") + elif result['score'] >= 95: + p.send2socket(f":{service_id} GLINE +*@{remote_ip} {uplink.Config.GLINE_DURATION} You were banned from this server because your abuse score is = {str(result['score'])} - Detected by Abuseipdb") + uplink.Logs.debug(f"[ABUSEIPDB SCAN GLINE] Server do not high risk connections Country: {result['country']}, Score: {result['score']}") + + response.close() + + return result + +def action_scan_client_with_local_socket(uplink: 'Defender', user_model: 'MUser'): + """local_scan + + Args: + uplink (Defender): Defender instance object + user_model (MUser): l'objet User qui contient l'ip + """ + p = uplink.Protocol + remote_ip = user_model.remote_ip + username = user_model.username + hostname = user_model.hostname + nickname = user_model.nickname + fullname = f'{nickname}!{username}@{hostname}' + + if remote_ip in uplink.Config.WHITELISTED_IP: + return None + + for port in uplink.Config.PORTS_TO_SCAN: + try: + newSocket = '' + newSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM or socket.SOCK_NONBLOCK) + newSocket.settimeout(0.5) + + connection = (remote_ip, uplink.Base.int_if_possible(port)) + newSocket.connect(connection) + + p.send_priv_msg( + nick_from=uplink.Config.SERVICE_NICKNAME, + msg=f"[ {uplink.Config.COLORS.red}PROXY_SCAN{uplink.Config.COLORS.nogc} ] {fullname} ({remote_ip}) : Port [{str(port)}] ouvert sur l'adresse ip [{remote_ip}]", + channel=uplink.Config.SERVICE_CHANLOG + ) + # print(f"=======> Le port {str(port)} est ouvert !!") + uplink.Base.running_sockets.append(newSocket) + # print(newSocket) + newSocket.shutdown(socket.SHUT_RDWR) + newSocket.close() + + except (socket.timeout, ConnectionRefusedError): + uplink.Logs.info(f"Le port {remote_ip}:{str(port)} est fermé") + except AttributeError as ae: + uplink.Logs.warning(f"AttributeError ({remote_ip}): {ae}") + except socket.gaierror as err: + uplink.Logs.warning(f"Address Info Error ({remote_ip}): {err}") + finally: + # newSocket.shutdown(socket.SHUT_RDWR) + newSocket.close() + uplink.Logs.info('=======> Fermeture de la socket') + +def action_scan_client_with_psutil(uplink: 'Defender', user_model: 'MUser') -> list[int]: + """psutil_scan for Linux (should be run on the same location as the unrealircd server) + + Args: + userModel (UserModel): The User Model Object + + Returns: + list[int]: list of ports + """ + p = uplink.Protocol + remote_ip = user_model.remote_ip + username = user_model.username + hostname = user_model.hostname + nickname = user_model.nickname + + if remote_ip in uplink.Config.WHITELISTED_IP: + return None + + try: + connections = psutil.net_connections(kind='inet') + fullname = f'{nickname}!{username}@{hostname}' + + matching_ports = [conn.raddr.port for conn in connections if conn.raddr and conn.raddr.ip == remote_ip] + uplink.Logs.info(f"Connexion of {fullname} ({remote_ip}) using ports : {str(matching_ports)}") + + if matching_ports: + p.send_priv_msg( + nick_from=uplink.Config.SERVICE_NICKNAME, + msg=f"[ {uplink.Config.COLORS.red}PSUTIL_SCAN{uplink.Config.COLORS.black} ] {fullname} ({remote_ip}) : is using ports {matching_ports}", + channel=uplink.Config.SERVICE_CHANLOG + ) + + return matching_ports + + except psutil.AccessDenied as ad: + uplink.Logs.critical(f'psutil_scan: Permission error: {ad}') \ No newline at end of file