mirror of
https://github.com/iio612/DEFENDER.git
synced 2026-02-13 19:24:23 +00:00
1768 lines
91 KiB
Python
1768 lines
91 KiB
Python
import socket
|
|
import json
|
|
import time
|
|
import re
|
|
import psutil
|
|
import requests
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from typing import Union, TYPE_CHECKING
|
|
import core.definition as df
|
|
|
|
# Le module crée devra réspecter quelques conditions
|
|
# 1. Le nom de la classe devra toujours s'appeler comme le module. Exemple => nom de class Defender | nom du module mod_defender
|
|
# 2. la methode __init__ devra toujours avoir les parametres suivant (self, irc:object)
|
|
# 1 . Créer la variable Irc dans le module
|
|
# 2 . Récuperer la configuration dans une variable
|
|
# 3 . Définir et enregistrer les nouvelles commandes
|
|
# 4 . Créer vos tables, en utilisant toujours le nom des votre classe en minuscule ==> defender_votre-table
|
|
# 3. Methode suivantes:
|
|
# cmd(self, data:list)
|
|
# hcmds(self, user:str, cmd: list)
|
|
# unload(self)
|
|
|
|
if TYPE_CHECKING:
|
|
from core.irc import Irc
|
|
|
|
|
|
class Defender():
|
|
|
|
@dataclass
|
|
class ModConfModel:
|
|
reputation: int
|
|
reputation_timer: int
|
|
reputation_seuil: int
|
|
reputation_score_after_release: int
|
|
reputation_ban_all_chan: int
|
|
reputation_sg: int
|
|
local_scan: int
|
|
psutil_scan: int
|
|
abuseipdb_scan: int
|
|
freeipapi_scan: int
|
|
cloudfilt_scan: int
|
|
flood: int
|
|
flood_message: int
|
|
flood_time: int
|
|
flood_timer: int
|
|
autolimit: int
|
|
autolimit_amount: int
|
|
autolimit_interval: int
|
|
|
|
def __init__(self, ircInstance: 'Irc') -> None:
|
|
|
|
# Module name (Mandatory)
|
|
self.module_name = 'mod_' + str(self.__class__.__name__).lower()
|
|
|
|
# Add Irc Object to the module (Mandatory)
|
|
self.Irc = ircInstance
|
|
|
|
# Add Loader Object to the module (Mandatory)
|
|
self.Loader = ircInstance.Loader
|
|
|
|
# Add server protocol Object to the module (Mandatory)
|
|
self.Protocol = ircInstance.Protocol
|
|
|
|
# Add Global Configuration to the module (Mandatory)
|
|
self.Config = ircInstance.Config
|
|
|
|
# Add Base object to the module (Mandatory)
|
|
self.Base = ircInstance.Base
|
|
|
|
# Add logs object to the module (Mandatory)
|
|
self.Logs = ircInstance.Base.logs
|
|
|
|
# Add User object to the module (Mandatory)
|
|
self.User = ircInstance.User
|
|
|
|
# Add Channel object to the module (Mandatory)
|
|
self.Channel = ircInstance.Channel
|
|
|
|
# Add Reputation object to the module (Optional)
|
|
self.Reputation = ircInstance.Reputation
|
|
|
|
# Create module commands (Mandatory)
|
|
self.Irc.build_command(0, self.module_name, 'code', 'Display the code or key for access')
|
|
self.Irc.build_command(1, self.module_name, 'info', 'Provide information about the channel or server')
|
|
self.Irc.build_command(1, self.module_name, 'autolimit', 'Automatically set channel user limits')
|
|
self.Irc.build_command(3, self.module_name, 'reputation', 'Check or manage user reputation')
|
|
self.Irc.build_command(3, self.module_name, 'proxy_scan', 'Scan users for proxy connections')
|
|
self.Irc.build_command(3, self.module_name, 'flood', 'Handle flood detection and mitigation')
|
|
self.Irc.build_command(3, self.module_name, 'status', 'Check the status of the server or bot')
|
|
self.Irc.build_command(3, self.module_name, 'timer', 'Set or manage timers')
|
|
self.Irc.build_command(3, self.module_name, 'show_reputation', 'Display reputation information')
|
|
self.Irc.build_command(3, self.module_name, 'sentinel', 'Monitor and guard the channel or server')
|
|
|
|
# Init the module (Mandatory)
|
|
self.__init_module()
|
|
|
|
# Log the module
|
|
self.Logs.debug(f'-- Module {self.module_name} loaded ...')
|
|
|
|
def __init_module(self) -> None:
|
|
|
|
# Create you own tables if needed (Mandatory)
|
|
self.__create_tables()
|
|
|
|
# Load module configuration (Mandatory)
|
|
self.__load_module_configuration()
|
|
# End of mandatory methods you can start your customization #
|
|
|
|
self.timeout = self.Config.API_TIMEOUT
|
|
|
|
# Listes qui vont contenir les ip a scanner avec les différentes API
|
|
self.abuseipdb_UserModel: list[df.MUser] = []
|
|
self.freeipapi_UserModel: list[df.MUser] = []
|
|
self.cloudfilt_UserModel: list[df.MUser] = []
|
|
self.psutil_UserModel: list[df.MUser] = []
|
|
self.localscan_UserModel: list[df.MUser] = []
|
|
|
|
# Variables qui indique que les threads sont en cours d'éxecutions
|
|
self.abuseipdb_isRunning:bool = True
|
|
self.freeipapi_isRunning:bool = True
|
|
self.cloudfilt_isRunning:bool = True
|
|
self.psutil_isRunning:bool = True
|
|
self.localscan_isRunning:bool = True
|
|
self.reputationTimer_isRunning:bool = True
|
|
self.autolimit_isRunning: bool = True
|
|
|
|
# Variable qui va contenir les users
|
|
self.flood_system = {}
|
|
|
|
# Contient les premieres informations de connexion
|
|
self.reputation_first_connexion = {'ip': '', 'score': -1}
|
|
|
|
# Laisser vide si aucune clé
|
|
self.abuseipdb_key = '13c34603fee4d2941a2c443cc5c77fd750757ca2a2c1b304bd0f418aff80c24be12651d1a3cfe674'
|
|
self.cloudfilt_key = 'r1gEtjtfgRQjtNBDMxsg'
|
|
|
|
# Démarrer les threads pour démarrer les api
|
|
self.Base.create_thread(func=self.thread_freeipapi_scan)
|
|
self.Base.create_thread(func=self.thread_cloudfilt_scan)
|
|
self.Base.create_thread(func=self.thread_abuseipdb_scan)
|
|
self.Base.create_thread(func=self.thread_local_scan)
|
|
self.Base.create_thread(func=self.thread_psutil_scan)
|
|
self.Base.create_thread(func=self.thread_reputation_timer)
|
|
|
|
if self.ModConfig.autolimit == 1:
|
|
self.Base.create_thread(func=self.thread_autolimit)
|
|
|
|
if self.ModConfig.reputation == 1:
|
|
self.Protocol.sjoin(self.Config.SALON_JAIL)
|
|
self.Protocol.send2socket(f":{self.Config.SERVICE_NICKNAME} SAMODE {self.Config.SALON_JAIL} +o {self.Config.SERVICE_NICKNAME}")
|
|
|
|
return None
|
|
|
|
def __create_tables(self) -> None:
|
|
"""Methode qui va créer la base de donnée si elle n'existe pas.
|
|
Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module
|
|
Args:
|
|
database_name (str): Nom de la base de données ( pas d'espace dans le nom )
|
|
|
|
Returns:
|
|
None: Aucun retour n'es attendu
|
|
"""
|
|
|
|
# table_autoop = '''CREATE TABLE IF NOT EXISTS defender_autoop (
|
|
# id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
# datetime TEXT,
|
|
# nickname TEXT,
|
|
# channel TEXT
|
|
# )
|
|
# '''
|
|
|
|
# self.Base.db_execute_query(table_autoop)
|
|
# self.Base.db_execute_query(table_config)
|
|
# self.Base.db_execute_query(table_trusted)
|
|
return None
|
|
|
|
def __load_module_configuration(self) -> None:
|
|
"""### Load Module Configuration
|
|
"""
|
|
try:
|
|
# Variable qui va contenir les options de configuration du module Defender
|
|
self.ModConfig = self.ModConfModel(
|
|
reputation=0, reputation_timer=1, reputation_seuil=26, reputation_score_after_release=27,
|
|
reputation_ban_all_chan=0,reputation_sg=1,
|
|
local_scan=0, psutil_scan=0, abuseipdb_scan=0, freeipapi_scan=0, cloudfilt_scan=0,
|
|
flood=0, flood_message=5, flood_time=1, flood_timer=20,
|
|
autolimit=1, autolimit_amount=3, autolimit_interval=3
|
|
)
|
|
|
|
# Sync the configuration with core configuration (Mandatory)
|
|
self.Base.db_sync_core_config(self.module_name, self.ModConfig)
|
|
|
|
return None
|
|
|
|
except TypeError as te:
|
|
self.Logs.critical(te)
|
|
|
|
def __update_configuration(self, param_key: str, param_value: str):
|
|
|
|
self.Base.db_update_core_config(self.module_name, self.ModConfig, param_key, param_value)
|
|
|
|
def unload(self) -> None:
|
|
"""Cette methode sera executée a chaque désactivation ou
|
|
rechargement de module
|
|
"""
|
|
self.abuseipdb_UserModel: list[df.MUser] = []
|
|
self.freeipapi_UserModel: list[df.MUser] = []
|
|
self.cloudfilt_UserModel: list[df.MUser] = []
|
|
self.psutil_UserModel: list[df.MUser] = []
|
|
self.localscan_UserModel: list[df.MUser] = []
|
|
|
|
self.abuseipdb_isRunning:bool = False
|
|
self.freeipapi_isRunning:bool = False
|
|
self.cloudfilt_isRunning:bool = False
|
|
self.psutil_isRunning:bool = False
|
|
self.localscan_isRunning:bool = False
|
|
self.reputationTimer_isRunning:bool = False
|
|
self.autolimit_isRunning: bool = False
|
|
return None
|
|
|
|
def insert_db_trusted(self, uid: str, nickname:str) -> None:
|
|
|
|
uid = self.User.get_uid(uid)
|
|
nickname = self.User.get_nickname(nickname)
|
|
|
|
query = "SELECT id FROM def_trusted WHERE user = ?"
|
|
exec_query = self.Base.db_execute_query(query, {"user": nickname})
|
|
response = exec_query.fetchone()
|
|
|
|
if response is not None:
|
|
q_insert = "INSERT INTO def_trusted (datetime, user, host, vhost) VALUES (?, ?, ?, ?)"
|
|
mes_donnees = {'datetime': self.Base.get_datetime(), 'user': nickname, 'host': '*', 'vhost': '*'}
|
|
exec_query = self.Base.db_execute_query(q_insert, mes_donnees)
|
|
pass
|
|
|
|
def join_saved_channels(self) -> None:
|
|
"""_summary_
|
|
"""
|
|
try:
|
|
result = self.Base.db_execute_query(f"SELECT distinct channel_name FROM {self.Config.TABLE_CHANNEL}")
|
|
channels = result.fetchall()
|
|
jail_chan = self.Config.SALON_JAIL
|
|
jail_chan_mode = self.Config.SALON_JAIL_MODES
|
|
service_id = self.Config.SERVICE_ID
|
|
dumodes = self.Config.SERVICE_UMODES
|
|
dnickname = self.Config.SERVICE_NICKNAME
|
|
|
|
for channel in channels:
|
|
chan = channel[0]
|
|
self.Protocol.sjoin(chan)
|
|
if chan == jail_chan:
|
|
self.Protocol.send2socket(f":{service_id} SAMODE {jail_chan} +{dumodes} {dnickname}")
|
|
self.Protocol.send2socket(f":{service_id} MODE {jail_chan} +{jail_chan_mode}")
|
|
|
|
return None
|
|
|
|
except Exception as err:
|
|
self.Logs.error(f"General Error: {err}")
|
|
|
|
def get_user_uptime_in_minutes(self, uidornickname:str) -> float:
|
|
"""Retourne depuis quand l'utilisateur est connecté (en secondes ).
|
|
|
|
Args:
|
|
uid (str): le uid ou le nickname de l'utilisateur
|
|
|
|
Returns:
|
|
int: Temps de connexion de l'utilisateur en secondes
|
|
"""
|
|
|
|
get_user = self.User.get_User(uidornickname)
|
|
if get_user is None:
|
|
return 0
|
|
|
|
# Convertir la date enregistrée dans UID_DB en un objet {datetime}
|
|
connected_time_string = get_user.connexion_datetime
|
|
|
|
if isinstance(connected_time_string, datetime):
|
|
connected_time = connected_time_string
|
|
else:
|
|
connected_time = datetime.strptime(connected_time_string, "%Y-%m-%d %H:%M:%S.%f")
|
|
|
|
# Quelle heure est-il ?
|
|
current_datetime = datetime.now()
|
|
|
|
uptime = current_datetime - connected_time
|
|
convert_to_minutes = uptime.seconds / 60
|
|
uptime_minutes = round(number=convert_to_minutes, ndigits=2)
|
|
|
|
return uptime_minutes
|
|
|
|
def system_reputation(self, uid: str)-> None:
|
|
# Reputation security
|
|
# - Activation ou désactivation du système --> OK
|
|
# - Le user sera en mesure de changer la limite de la réputation --> OK
|
|
# - Defender devra envoyer l'utilisateur sur un salon défini dans la configuration, {jail_chan}
|
|
# - Defender devra bloquer cet utilisateur sur le salon qui sera en mode (+m)
|
|
# - Defender devra envoyer un message du type "Merci de taper cette comande /msg {nomdudefender} {un code généré aléatoirement}
|
|
# - Defender devra reconnaître le code
|
|
# - Defender devra libérer l'utilisateur et l'envoyer vers un salon défini dans la configuration {welcome_chan}
|
|
# - Defender devra intégrer une liste d'IDs (pseudo/host) exemptés de 'Reputation security' malgré un score de rép. faible et un pseudo non enregistré.
|
|
try:
|
|
|
|
get_reputation = self.Reputation.get_Reputation(uid)
|
|
|
|
if get_reputation is None:
|
|
self.Logs.error(f'UID {uid} has not been found')
|
|
return False
|
|
|
|
salon_logs = self.Config.SERVICE_CHANLOG
|
|
salon_jail = self.Config.SALON_JAIL
|
|
|
|
code = get_reputation.secret_code
|
|
jailed_nickname = get_reputation.nickname
|
|
jailed_score = get_reputation.score_connexion
|
|
|
|
color_red = self.Config.COLORS.red
|
|
color_black = self.Config.COLORS.black
|
|
color_bold = self.Config.COLORS.bold
|
|
nogc = self.Config.COLORS.nogc
|
|
service_id = self.Config.SERVICE_ID
|
|
service_prefix = self.Config.SERVICE_PREFIX
|
|
reputation_ban_all_chan = self.ModConfig.reputation_ban_all_chan
|
|
|
|
if not get_reputation.isWebirc:
|
|
# Si le user ne vient pas de webIrc
|
|
|
|
self.Protocol.send_sajoin(nick_to_sajoin=jailed_nickname, channel_name=salon_jail)
|
|
self.Protocol.send_priv_msg(nick_from=self.Config.SERVICE_NICKNAME,
|
|
msg=f" [{color_red} REPUTATION {nogc}] : Connexion de {jailed_nickname} ({jailed_score}) ==> {salon_jail}",
|
|
channel=salon_logs
|
|
)
|
|
self.Protocol.send_notice(
|
|
nick_from=self.Config.SERVICE_NICKNAME,
|
|
nick_to=jailed_nickname,
|
|
msg=f"[{color_red} {jailed_nickname} {color_black}] : Merci de tapez la commande suivante {color_bold}{service_prefix}code {code}{color_bold}"
|
|
)
|
|
if reputation_ban_all_chan == 1:
|
|
for chan in self.Channel.UID_CHANNEL_DB:
|
|
if chan.name != salon_jail:
|
|
self.Protocol.send2socket(f":{service_id} MODE {chan.name} +b {jailed_nickname}!*@*")
|
|
self.Protocol.send2socket(f":{service_id} KICK {chan.name} {jailed_nickname}")
|
|
|
|
self.Logs.info(f"system_reputation : {jailed_nickname} à été capturé par le système de réputation")
|
|
# self.Irc.create_ping_timer(int(self.ModConfig.reputation_timer) * 60, 'Defender', 'system_reputation_timer')
|
|
# self.Base.create_timer(int(self.ModConfig.reputation_timer) * 60, self.system_reputation_timer)
|
|
else:
|
|
self.Logs.info(f"system_reputation : {jailed_nickname} à été supprimé du système de réputation car connecté via WebIrc ou il est dans la 'Trusted list'")
|
|
self.Reputation.delete(uid)
|
|
|
|
except IndexError as e:
|
|
self.Logs.error(f"system_reputation : {str(e)}")
|
|
|
|
def system_reputation_timer(self) -> None:
|
|
try:
|
|
reputation_flag = self.ModConfig.reputation
|
|
reputation_timer = self.ModConfig.reputation_timer
|
|
reputation_seuil = self.ModConfig.reputation_seuil
|
|
ban_all_chan = self.ModConfig.reputation_ban_all_chan
|
|
service_id = self.Config.SERVICE_ID
|
|
dchanlog = self.Config.SERVICE_CHANLOG
|
|
color_red = self.Config.COLORS.red
|
|
nogc = self.Config.COLORS.nogc
|
|
salon_jail = self.Config.SALON_JAIL
|
|
|
|
if reputation_flag == 0:
|
|
return None
|
|
elif reputation_timer == 0:
|
|
return None
|
|
|
|
uid_to_clean = []
|
|
|
|
for user in self.Reputation.UID_REPUTATION_DB:
|
|
if not user.isWebirc: # Si il ne vient pas de WebIRC
|
|
if self.get_user_uptime_in_minutes(user.uid) >= reputation_timer and int(user.score_connexion) <= int(reputation_seuil):
|
|
self.Protocol.send_priv_msg(
|
|
nick_from=service_id,
|
|
msg=f"[{color_red} REPUTATION {nogc}] : Action sur {user.nickname} aprés {str(reputation_timer)} minutes d'inactivité",
|
|
channel=dchanlog
|
|
)
|
|
self.Protocol.send2socket(f":{service_id} KILL {user.nickname} After {str(reputation_timer)} minutes of inactivity you should reconnect and type the password code")
|
|
self.Protocol.send2socket(f":{self.Config.SERVEUR_LINK} REPUTATION {user.remote_ip} 0")
|
|
|
|
self.Logs.info(f"Nickname: {user.nickname} KILLED after {str(reputation_timer)} minutes of inactivity")
|
|
uid_to_clean.append(user.uid)
|
|
|
|
for uid in uid_to_clean:
|
|
# Suppression des éléments dans {UID_DB} et {REPUTATION_DB}
|
|
for chan in self.Channel.UID_CHANNEL_DB:
|
|
if chan.name != salon_jail and ban_all_chan == 1:
|
|
get_user_reputation = self.Reputation.get_Reputation(uid)
|
|
self.Protocol.send2socket(f":{service_id} MODE {chan.name} -b {get_user_reputation.nickname}!*@*")
|
|
|
|
# Lorsqu'un utilisateur quitte, il doit être supprimé de {UID_DB}.
|
|
self.Channel.delete_user_from_all_channel(uid)
|
|
self.Reputation.delete(uid)
|
|
self.User.delete(uid)
|
|
|
|
except AssertionError as ae:
|
|
self.Logs.error(f'Assertion Error -> {ae}')
|
|
|
|
def thread_reputation_timer(self) -> None:
|
|
try:
|
|
while self.reputationTimer_isRunning:
|
|
self.system_reputation_timer()
|
|
time.sleep(5)
|
|
|
|
return None
|
|
except ValueError as ve:
|
|
self.Logs.error(f"thread_reputation_timer Error : {ve}")
|
|
|
|
def _execute_flood_action(self, action:str, channel:str) -> None:
|
|
"""DO NOT EXECUTE THIS FUNCTION WITHOUT THREADING
|
|
|
|
Args:
|
|
action (str): _description_
|
|
timer (int): _description_
|
|
nickname (str): _description_
|
|
channel (str): _description_
|
|
|
|
Returns:
|
|
_type_: _description_
|
|
"""
|
|
service_id = self.Config.SERVICE_ID
|
|
match action:
|
|
case 'mode-m':
|
|
# Action -m sur le salon
|
|
self.Protocol.send2socket(f":{service_id} MODE {channel} -m")
|
|
case _:
|
|
pass
|
|
|
|
return None
|
|
|
|
def flood(self, detected_user:str, channel:str) -> None:
|
|
|
|
if self.ModConfig.flood == 0:
|
|
return None
|
|
|
|
if not self.Channel.Is_Channel(channelToCheck=channel):
|
|
return None
|
|
|
|
flood_time = self.ModConfig.flood_time
|
|
flood_message = self.ModConfig.flood_message
|
|
flood_timer = self.ModConfig.flood_timer
|
|
service_id = self.Config.SERVICE_ID
|
|
dnickname = self.Config.SERVICE_NICKNAME
|
|
color_red = self.Config.COLORS.red
|
|
color_bold = self.Config.COLORS.bold
|
|
|
|
get_detected_uid = self.User.get_uid(detected_user)
|
|
get_detected_nickname = self.User.get_nickname(detected_user)
|
|
|
|
unixtime = self.Base.get_unixtime()
|
|
get_diff_secondes = 0
|
|
|
|
if get_detected_uid not in self.flood_system:
|
|
self.flood_system[get_detected_uid] = {
|
|
'nbr_msg': 0,
|
|
'first_msg_time': unixtime
|
|
}
|
|
|
|
self.flood_system[get_detected_uid]['nbr_msg'] += 1
|
|
get_diff_secondes = unixtime - self.flood_system[get_detected_uid]['first_msg_time']
|
|
if get_diff_secondes > flood_time:
|
|
self.flood_system[get_detected_uid]['first_msg_time'] = unixtime
|
|
self.flood_system[get_detected_uid]['nbr_msg'] = 0
|
|
get_diff_secondes = unixtime - self.flood_system[get_detected_uid]['first_msg_time']
|
|
|
|
elif self.flood_system[get_detected_uid]['nbr_msg'] > flood_message:
|
|
self.Irc.Base.logs.info('system de flood detecté')
|
|
self.Protocol.send_priv_msg(
|
|
nick_from=dnickname,
|
|
msg=f"{color_red} {color_bold} Flood detected. Apply the +m mode (Ô_o)",
|
|
channel=channel
|
|
)
|
|
self.Protocol.send2socket(f":{service_id} MODE {channel} +m")
|
|
self.Irc.Base.logs.info(f'FLOOD Détecté sur {get_detected_nickname} mode +m appliqué sur le salon {channel}')
|
|
self.flood_system[get_detected_uid]['nbr_msg'] = 0
|
|
self.flood_system[get_detected_uid]['first_msg_time'] = unixtime
|
|
|
|
self.Base.create_timer(flood_timer, self._execute_flood_action, ('mode-m', channel))
|
|
|
|
def run_db_action_timer(self, wait_for: float = 0) -> None:
|
|
|
|
query = f"SELECT param_key FROM {self.Config.TABLE_CONFIG}"
|
|
res = self.Base.db_execute_query(query)
|
|
service_id = self.Config.SERVICE_ID
|
|
dchanlog = self.Config.SERVICE_CHANLOG
|
|
|
|
for param in res.fetchall():
|
|
if param[0] == 'reputation':
|
|
self.Protocol.send_priv_msg(
|
|
nick_from=service_id,
|
|
msg=f" ===> {param[0]}",
|
|
channel=dchanlog
|
|
)
|
|
else:
|
|
self.Protocol.send_priv_msg(
|
|
nick_from=service_id,
|
|
msg=f"{param[0]}",
|
|
channel=dchanlog
|
|
)
|
|
|
|
return None
|
|
|
|
def scan_ports(self, userModel: df.MUser) -> None:
|
|
"""local_scan
|
|
|
|
Args:
|
|
userModel (UserModel): _description_
|
|
"""
|
|
User = userModel
|
|
remote_ip = User.remote_ip
|
|
username = User.username
|
|
hostname = User.hostname
|
|
nickname = User.nickname
|
|
fullname = f'{nickname}!{username}@{hostname}'
|
|
|
|
if remote_ip in self.Config.WHITELISTED_IP:
|
|
return None
|
|
|
|
for port in self.Config.PORTS_TO_SCAN:
|
|
try:
|
|
newSocket = ''
|
|
newSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM or socket.SOCK_NONBLOCK)
|
|
newSocket.settimeout(0.5)
|
|
|
|
connection = (remote_ip, self.Base.int_if_possible(port))
|
|
newSocket.connect(connection)
|
|
|
|
self.Protocol.send_priv_msg(
|
|
nick_from=self.Config.SERVICE_NICKNAME,
|
|
msg=f"[ {self.Config.COLORS.red}PROXY_SCAN{self.Config.COLORS.nogc} ] {fullname} ({remote_ip}) : Port [{str(port)}] ouvert sur l'adresse ip [{remote_ip}]",
|
|
channel=self.Config.SERVICE_CHANLOG
|
|
)
|
|
# print(f"=======> Le port {str(port)} est ouvert !!")
|
|
self.Base.running_sockets.append(newSocket)
|
|
# print(newSocket)
|
|
newSocket.shutdown(socket.SHUT_RDWR)
|
|
newSocket.close()
|
|
|
|
except (socket.timeout, ConnectionRefusedError):
|
|
self.Logs.info(f"Le port {remote_ip}:{str(port)} est fermé")
|
|
except AttributeError as ae:
|
|
self.Logs.warning(f"AttributeError ({remote_ip}): {ae}")
|
|
except socket.gaierror as err:
|
|
self.Logs.warning(f"Address Info Error ({remote_ip}): {err}")
|
|
finally:
|
|
# newSocket.shutdown(socket.SHUT_RDWR)
|
|
newSocket.close()
|
|
self.Logs.info('=======> Fermeture de la socket')
|
|
|
|
def thread_local_scan(self) -> None:
|
|
try:
|
|
while self.localscan_isRunning:
|
|
|
|
list_to_remove:list = []
|
|
for user in self.localscan_UserModel:
|
|
self.scan_ports(user)
|
|
list_to_remove.append(user)
|
|
time.sleep(1)
|
|
|
|
for user_model in list_to_remove:
|
|
self.localscan_UserModel.remove(user_model)
|
|
|
|
time.sleep(1)
|
|
|
|
return None
|
|
except ValueError as ve:
|
|
self.Logs.warning(f"thread_local_scan Error : {ve}")
|
|
|
|
def get_ports_connexion(self, userModel: df.MUser) -> list[int]:
|
|
"""psutil_scan for Linux (should be run on the same location as the unrealircd server)
|
|
|
|
Args:
|
|
userModel (UserModel): The User Model Object
|
|
|
|
Returns:
|
|
list[int]: list of ports
|
|
"""
|
|
try:
|
|
User = userModel
|
|
remote_ip = User.remote_ip
|
|
username = User.username
|
|
hostname = User.hostname
|
|
nickname = User.nickname
|
|
|
|
if remote_ip in self.Config.WHITELISTED_IP:
|
|
return None
|
|
|
|
connections = psutil.net_connections(kind='inet')
|
|
fullname = f'{nickname}!{username}@{hostname}'
|
|
|
|
matching_ports = [conn.raddr.port for conn in connections if conn.raddr and conn.raddr.ip == remote_ip]
|
|
self.Logs.info(f"Connexion of {fullname} ({remote_ip}) using ports : {str(matching_ports)}")
|
|
|
|
if matching_ports:
|
|
self.Protocol.send_priv_msg(
|
|
nick_from=self.Config.SERVICE_NICKNAME,
|
|
msg=f"[ {self.Config.COLORS.red}PSUTIL_SCAN{self.Config.COLORS.black} ] {fullname} ({remote_ip}) : is using ports {matching_ports}",
|
|
channel=self.Config.SERVICE_CHANLOG
|
|
)
|
|
|
|
return matching_ports
|
|
|
|
except psutil.AccessDenied as ad:
|
|
self.Logs.critical(f'psutil_scan: Permission error: {ad}')
|
|
|
|
def thread_psutil_scan(self) -> None:
|
|
try:
|
|
|
|
while self.psutil_isRunning:
|
|
|
|
list_to_remove:list = []
|
|
for user in self.psutil_UserModel:
|
|
self.get_ports_connexion(user)
|
|
list_to_remove.append(user)
|
|
time.sleep(1)
|
|
|
|
for user_model in list_to_remove:
|
|
self.psutil_UserModel.remove(user_model)
|
|
|
|
time.sleep(1)
|
|
|
|
return None
|
|
except ValueError as ve:
|
|
self.Logs.warning(f"thread_psutil_scan Error : {ve}")
|
|
|
|
def abuseipdb_scan(self, userModel: df.MUser) -> Union[dict[str, any], None]:
|
|
"""Analyse l'ip avec AbuseIpDB
|
|
Cette methode devra etre lancer toujours via un thread ou un timer.
|
|
Args:
|
|
userModel (UserModel): l'objet User qui contient l'ip
|
|
|
|
Returns:
|
|
dict[str, any] | None: les informations du provider
|
|
keys : 'score', 'country', 'isTor', 'totalReports'
|
|
"""
|
|
User = userModel
|
|
remote_ip = User.remote_ip
|
|
username = User.username
|
|
hostname = User.hostname
|
|
nickname = User.nickname
|
|
|
|
if remote_ip in self.Config.WHITELISTED_IP:
|
|
return None
|
|
if self.ModConfig.abuseipdb_scan == 0:
|
|
return None
|
|
|
|
if self.abuseipdb_key == '':
|
|
return None
|
|
|
|
url = 'https://api.abuseipdb.com/api/v2/check'
|
|
querystring = {
|
|
'ipAddress': remote_ip,
|
|
'maxAgeInDays': '90'
|
|
}
|
|
|
|
headers = {
|
|
'Accept': 'application/json',
|
|
'Key': self.abuseipdb_key
|
|
}
|
|
|
|
try:
|
|
response = requests.request(method='GET', url=url, headers=headers, params=querystring, timeout=self.timeout)
|
|
|
|
# Formatted output
|
|
decodedResponse = json.loads(response.text)
|
|
|
|
if 'data' not in decodedResponse:
|
|
return None
|
|
|
|
result = {
|
|
'score': decodedResponse['data']['abuseConfidenceScore'],
|
|
'country': decodedResponse['data']['countryCode'],
|
|
'isTor': decodedResponse['data']['isTor'],
|
|
'totalReports': decodedResponse['data']['totalReports']
|
|
}
|
|
|
|
service_id = self.Config.SERVICE_ID
|
|
service_chanlog = self.Config.SERVICE_CHANLOG
|
|
color_red = self.Config.COLORS.red
|
|
nogc = self.Config.COLORS.nogc
|
|
|
|
# pseudo!ident@host
|
|
fullname = f'{nickname}!{username}@{hostname}'
|
|
|
|
self.Protocol.send_priv_msg(
|
|
nick_from=service_id,
|
|
msg=f"[ {color_red}ABUSEIPDB_SCAN{nogc} ] : Connexion de {fullname} ({remote_ip}) ==> Score: {str(result['score'])} | Country : {result['country']} | Tor : {str(result['isTor'])} | Total Reports : {str(result['totalReports'])}",
|
|
channel=service_chanlog
|
|
)
|
|
|
|
if result['isTor']:
|
|
self.Protocol.send2socket(f":{service_id} GLINE +*@{remote_ip} {self.Config.GLINE_DURATION} This server do not allow Tor connexions {str(result['isTor'])} - Detected by Abuseipdb")
|
|
elif result['score'] >= 95:
|
|
self.Protocol.send2socket(f":{service_id} GLINE +*@{remote_ip} {self.Config.GLINE_DURATION} You were banned from this server because your abuse score is = {str(result['score'])} - Detected by Abuseipdb")
|
|
|
|
response.close()
|
|
|
|
return result
|
|
except KeyError as ke:
|
|
self.Logs.error(f"AbuseIpDb KeyError : {ke}")
|
|
except requests.ReadTimeout as rt:
|
|
self.Logs.error(f"AbuseIpDb Timeout : {rt}")
|
|
except requests.ConnectionError as ce:
|
|
self.Logs.error(f"AbuseIpDb Connection Error : {ce}")
|
|
except Exception as err:
|
|
self.Logs.error(f"General Error Abuseipdb : {err}")
|
|
|
|
def thread_abuseipdb_scan(self) -> None:
|
|
try:
|
|
|
|
while self.abuseipdb_isRunning:
|
|
|
|
list_to_remove: list = []
|
|
for user in self.abuseipdb_UserModel:
|
|
self.abuseipdb_scan(user)
|
|
list_to_remove.append(user)
|
|
time.sleep(1)
|
|
|
|
for user_model in list_to_remove:
|
|
self.abuseipdb_UserModel.remove(user_model)
|
|
|
|
time.sleep(1)
|
|
|
|
return None
|
|
except ValueError as ve:
|
|
self.Logs.error(f"thread_abuseipdb_scan Error : {ve}")
|
|
|
|
def freeipapi_scan(self, userModel: df.MUser) -> Union[dict[str, any], None]:
|
|
"""Analyse l'ip avec Freeipapi
|
|
Cette methode devra etre lancer toujours via un thread ou un timer.
|
|
Args:
|
|
remote_ip (_type_): l'ip a analyser
|
|
|
|
Returns:
|
|
dict[str, any] | None: les informations du provider
|
|
keys : 'countryCode', 'isProxy'
|
|
"""
|
|
User = userModel
|
|
remote_ip = User.remote_ip
|
|
username = User.username
|
|
hostname = User.hostname
|
|
nickname = User.nickname
|
|
|
|
if remote_ip in self.Config.WHITELISTED_IP:
|
|
return None
|
|
if self.ModConfig.freeipapi_scan == 0:
|
|
return None
|
|
|
|
service_id = self.Config.SERVICE_ID
|
|
service_chanlog = self.Config.SERVICE_CHANLOG
|
|
color_red = self.Config.COLORS.red
|
|
nogc = self.Config.COLORS.nogc
|
|
|
|
url = f'https://freeipapi.com/api/json/{remote_ip}'
|
|
|
|
headers = {
|
|
'Accept': 'application/json',
|
|
}
|
|
|
|
try:
|
|
response = requests.request(method='GET', url=url, headers=headers, timeout=self.timeout)
|
|
|
|
# Formatted output
|
|
decodedResponse = json.loads(response.text)
|
|
|
|
status_code = response.status_code
|
|
if status_code == 429:
|
|
self.Logs.warning('Too Many Requests - The rate limit for the API has been exceeded.')
|
|
return None
|
|
elif status_code != 200:
|
|
self.Logs.warning(f'status code = {str(status_code)}')
|
|
return None
|
|
|
|
result = {
|
|
'countryCode': decodedResponse['countryCode'] if 'countryCode' in decodedResponse else None,
|
|
'isProxy': decodedResponse['isProxy'] if 'isProxy' in decodedResponse else None
|
|
}
|
|
|
|
# pseudo!ident@host
|
|
fullname = f'{nickname}!{username}@{hostname}'
|
|
|
|
self.Protocol.send_priv_msg(
|
|
nick_from=service_id,
|
|
msg=f"[ {color_red}FREEIPAPI_SCAN{nogc} ] : Connexion de {fullname} ({remote_ip}) ==> Proxy: {str(result['isProxy'])} | Country : {str(result['countryCode'])}",
|
|
channel=service_chanlog
|
|
)
|
|
|
|
if result['isProxy']:
|
|
self.Protocol.send2socket(f":{service_id} GLINE +*@{remote_ip} {self.Config.GLINE_DURATION} This server do not allow proxy connexions {str(result['isProxy'])} - detected by freeipapi")
|
|
response.close()
|
|
|
|
return result
|
|
except KeyError as ke:
|
|
self.Logs.error(f"FREEIPAPI_SCAN KeyError : {ke}")
|
|
except Exception as err:
|
|
self.Logs.error(f"General Error Freeipapi : {err}")
|
|
|
|
def thread_freeipapi_scan(self) -> None:
|
|
try:
|
|
|
|
while self.freeipapi_isRunning:
|
|
|
|
list_to_remove: list[df.MUser] = []
|
|
for user in self.freeipapi_UserModel:
|
|
self.freeipapi_scan(user)
|
|
list_to_remove.append(user)
|
|
time.sleep(1)
|
|
|
|
for user_model in list_to_remove:
|
|
self.freeipapi_UserModel.remove(user_model)
|
|
|
|
time.sleep(1)
|
|
|
|
return None
|
|
except ValueError as ve:
|
|
self.Logs.error(f"thread_freeipapi_scan Error : {ve}")
|
|
|
|
def cloudfilt_scan(self, userModel: df.MUser) -> Union[dict[str, any], None]:
|
|
"""Analyse l'ip avec cloudfilt
|
|
Cette methode devra etre lancer toujours via un thread ou un timer.
|
|
Args:
|
|
remote_ip (_type_): l'ip a analyser
|
|
|
|
Returns:
|
|
dict[str, any] | None: les informations du provider
|
|
keys : 'countryCode', 'isProxy'
|
|
"""
|
|
User = userModel
|
|
remote_ip = User.remote_ip
|
|
username = User.username
|
|
hostname = User.hostname
|
|
nickname = User.nickname
|
|
|
|
if remote_ip in self.Config.WHITELISTED_IP:
|
|
return None
|
|
if self.ModConfig.cloudfilt_scan == 0:
|
|
return None
|
|
if self.cloudfilt_key == '':
|
|
return None
|
|
|
|
service_id = self.Config.SERVICE_ID
|
|
service_chanlog = self.Config.SERVICE_CHANLOG
|
|
color_red = self.Config.COLORS.red
|
|
nogc = self.Config.COLORS.nogc
|
|
|
|
url = "https://developers18334.cloudfilt.com/"
|
|
|
|
data = {
|
|
'ip': remote_ip,
|
|
'key': self.cloudfilt_key
|
|
}
|
|
|
|
try:
|
|
response = requests.post(url=url, data=data)
|
|
# Formatted output
|
|
decodedResponse = json.loads(response.text)
|
|
status_code = response.status_code
|
|
if status_code != 200:
|
|
self.Logs.warning(f'Error connecting to cloudfilt API | Code: {str(status_code)}')
|
|
return None
|
|
|
|
result = {
|
|
'countryiso': decodedResponse['countryiso'] if 'countryiso' in decodedResponse else None,
|
|
'listed': decodedResponse['listed'] if 'listed' in decodedResponse else None,
|
|
'listed_by': decodedResponse['listed_by'] if 'listed_by' in decodedResponse else None,
|
|
'host': decodedResponse['host'] if 'host' in decodedResponse else None
|
|
}
|
|
|
|
# pseudo!ident@host
|
|
fullname = f'{nickname}!{username}@{hostname}'
|
|
|
|
self.Protocol.send_priv_msg(
|
|
nick_from=service_id,
|
|
msg=f"[ {color_red}CLOUDFILT_SCAN{nogc} ] : Connexion de {fullname} ({remote_ip}) ==> Host: {str(result['host'])} | country: {str(result['countryiso'])} | listed: {str(result['listed'])} | listed by : {str(result['listed_by'])}",
|
|
channel=service_chanlog
|
|
)
|
|
|
|
if result['listed']:
|
|
self.Protocol.send2socket(f":{service_id} GLINE +*@{remote_ip} {self.Config.GLINE_DURATION} You connexion is listed as dangerous {str(result['listed'])} {str(result['listed_by'])} - detected by cloudfilt")
|
|
|
|
response.close()
|
|
|
|
return result
|
|
except KeyError as ke:
|
|
self.Logs.error(f"CLOUDFILT_SCAN KeyError : {ke}")
|
|
return None
|
|
|
|
def thread_cloudfilt_scan(self) -> None:
|
|
try:
|
|
|
|
while self.cloudfilt_isRunning:
|
|
|
|
list_to_remove:list = []
|
|
for user in self.cloudfilt_UserModel:
|
|
self.cloudfilt_scan(user)
|
|
list_to_remove.append(user)
|
|
time.sleep(1)
|
|
|
|
for user_model in list_to_remove:
|
|
self.cloudfilt_UserModel.remove(user_model)
|
|
|
|
time.sleep(1)
|
|
|
|
return None
|
|
except ValueError as ve:
|
|
self.Logs.error(f"Thread_cloudfilt_scan Error : {ve}")
|
|
|
|
def thread_autolimit(self) -> None:
|
|
|
|
if self.ModConfig.autolimit == 0:
|
|
self.Logs.debug("autolimit deactivated ... canceling the thread")
|
|
return None
|
|
|
|
while self.Irc.autolimit_started:
|
|
time.sleep(0.2)
|
|
|
|
self.Irc.autolimit_started = True
|
|
init_amount = self.ModConfig.autolimit_amount
|
|
INIT = 1
|
|
|
|
# Copy Channels to a list of dict
|
|
chanObj_copy: list[dict[str, int]] = [{"name": c.name, "uids_count": len(c.uids)} for c in self.Channel.UID_CHANNEL_DB]
|
|
chan_list: list[str] = [c.name for c in self.Channel.UID_CHANNEL_DB]
|
|
|
|
while self.autolimit_isRunning:
|
|
|
|
if self.ModConfig.autolimit == 0:
|
|
self.Logs.debug("autolimit deactivated ... stopping the current thread")
|
|
break
|
|
|
|
for chan in self.Channel.UID_CHANNEL_DB:
|
|
for chan_copy in chanObj_copy:
|
|
if chan_copy["name"] == chan.name and len(chan.uids) != chan_copy["uids_count"]:
|
|
self.Protocol.send2socket(f":{self.Config.SERVICE_ID} MODE {chan.name} +l {len(chan.uids) + self.ModConfig.autolimit_amount}")
|
|
chan_copy["uids_count"] = len(chan.uids)
|
|
|
|
if chan.name not in chan_list:
|
|
chan_list.append(chan.name)
|
|
chanObj_copy.append({"name": chan.name, "uids_count": 0})
|
|
|
|
# Verifier si un salon a été vidé
|
|
current_chan_in_list = [d.name for d in self.Channel.UID_CHANNEL_DB]
|
|
for c in chan_list:
|
|
if c not in current_chan_in_list:
|
|
chan_list.remove(c)
|
|
|
|
# Si c'est la premiere execution
|
|
if INIT == 1:
|
|
for chan in self.Channel.UID_CHANNEL_DB:
|
|
self.Protocol.send2socket(f":{self.Config.SERVICE_ID} MODE {chan.name} +l {len(chan.uids) + self.ModConfig.autolimit_amount}")
|
|
|
|
# Si le nouveau amount est différent de l'initial
|
|
if init_amount != self.ModConfig.autolimit_amount:
|
|
init_amount = self.ModConfig.autolimit_amount
|
|
for chan in self.Channel.UID_CHANNEL_DB:
|
|
self.Protocol.send2socket(f":{self.Config.SERVICE_ID} MODE {chan.name} +l {len(chan.uids) + self.ModConfig.autolimit_amount}")
|
|
|
|
INIT = 0
|
|
|
|
if self.autolimit_isRunning:
|
|
time.sleep(self.ModConfig.autolimit_interval)
|
|
|
|
for chan in self.Channel.UID_CHANNEL_DB:
|
|
self.Protocol.send2socket(f":{self.Config.SERVICE_ID} MODE {chan.name} -l")
|
|
|
|
self.Irc.autolimit_started = False
|
|
|
|
return None
|
|
|
|
def cmd(self, data: list[str]) -> None:
|
|
try:
|
|
service_id = self.Config.SERVICE_ID # Defender serveur id
|
|
cmd = list(data).copy()
|
|
|
|
match cmd[1]:
|
|
|
|
case 'REPUTATION':
|
|
# :001 REPUTATION 8.8.8.8 118
|
|
try:
|
|
self.reputation_first_connexion['ip'] = cmd[2]
|
|
self.reputation_first_connexion['score'] = cmd[3]
|
|
if str(cmd[3]).find('*') != -1:
|
|
# If the reputation changed, we do not need to scan the IP
|
|
return None
|
|
|
|
if not self.Base.is_valid_ip(cmd[2]):
|
|
return None
|
|
|
|
# Possibilité de déclancher les bans a ce niveau.
|
|
except IndexError as ie:
|
|
self.Logs.error(f'cmd reputation: index error: {ie}')
|
|
|
|
if len(cmd) < 3:
|
|
return None
|
|
|
|
match cmd[2]:
|
|
|
|
case 'MODE':
|
|
# ['...', ':001XSCU0Q', 'MODE', '#jail', '+b', '~security-group:unknown-users']
|
|
channel = str(cmd[3])
|
|
mode = str(cmd[4])
|
|
group_to_check = str(cmd[5:])
|
|
group_to_unban = '~security-group:unknown-users'
|
|
|
|
if self.Config.SALON_JAIL == channel:
|
|
if mode == '+b' and group_to_unban in group_to_check:
|
|
self.Protocol.send2socket(f":{service_id} MODE {self.Config.SALON_JAIL} -b ~security-group:unknown-users")
|
|
self.Protocol.send2socket(f":{service_id} MODE {self.Config.SALON_JAIL} -eee ~security-group:webirc-users ~security-group:known-users ~security-group:websocket-users")
|
|
|
|
case 'PRIVMSG':
|
|
cmd.pop(0)
|
|
user_trigger = str(cmd[0]).replace(':','')
|
|
channel = cmd[2]
|
|
find_nickname = self.User.get_nickname(user_trigger)
|
|
self.flood(find_nickname, channel)
|
|
|
|
case 'UID':
|
|
# If Init then do nothing
|
|
if self.Config.DEFENDER_INIT == 1:
|
|
return None
|
|
|
|
# Supprimer la premiere valeur et finir le code normalement
|
|
cmd.pop(0)
|
|
|
|
# Get User information
|
|
_User = self.User.get_User(str(cmd[7]))
|
|
|
|
# If user is not service or IrcOp then scan them
|
|
if not re.match(r'^.*[S|o?].*$', _User.umodes):
|
|
self.abuseipdb_UserModel.append(_User) if self.ModConfig.abuseipdb_scan == 1 and _User.remote_ip not in self.Config.WHITELISTED_IP else None
|
|
self.freeipapi_UserModel.append(_User) if self.ModConfig.freeipapi_scan == 1 and _User.remote_ip not in self.Config.WHITELISTED_IP else None
|
|
self.cloudfilt_UserModel.append(_User) if self.ModConfig.cloudfilt_scan == 1 and _User.remote_ip not in self.Config.WHITELISTED_IP else None
|
|
self.psutil_UserModel.append(_User) if self.ModConfig.psutil_scan == 1 and _User.remote_ip not in self.Config.WHITELISTED_IP else None
|
|
self.localscan_UserModel.append(_User) if self.ModConfig.local_scan == 1 and _User.remote_ip not in self.Config.WHITELISTED_IP else None
|
|
|
|
if _User is None:
|
|
self.Logs.critical(f'This UID: [{cmd[7]}] is not available please check why')
|
|
return None
|
|
|
|
reputation_flag = self.ModConfig.reputation
|
|
reputation_seuil = self.ModConfig.reputation_seuil
|
|
|
|
if self.Config.DEFENDER_INIT == 0:
|
|
# Si le user n'es pas un service ni un IrcOP
|
|
if not re.match(r'^.*[S|o?].*$', _User.umodes):
|
|
if reputation_flag == 1 and _User.score_connexion <= reputation_seuil:
|
|
# currentDateTime = self.Base.get_datetime()
|
|
self.Reputation.insert(
|
|
self.Loader.Definition.MReputation(
|
|
**_User.__dict__,
|
|
secret_code=self.Base.get_random(8)
|
|
# uid=_User.uid, nickname=_User.nickname, username=_User.username, realname=_User.realname,
|
|
# hostname=_User.hostname, umodes=_User.umodes, vhost=_User.vhost, ip=_User.remote_ip, score=_User.score_connexion,
|
|
# secret_code=self.Base.get_random(8), isWebirc=_User.isWebirc, isWebsocket=_User.isWebsocket, connected_datetime=currentDateTime,
|
|
# updated_datetime=currentDateTime
|
|
)
|
|
)
|
|
if self.Reputation.is_exist(_User.uid):
|
|
if reputation_flag == 1 and _User.score_connexion <= reputation_seuil:
|
|
self.system_reputation(_User.uid)
|
|
self.Logs.info('Démarrer le systeme de reputation')
|
|
|
|
case 'SJOIN':
|
|
# ['@msgid=F9B7JeHL5pj9nN57cJ5pEr;time=2023-12-28T20:47:24.305Z', ':001', 'SJOIN', '1702138958', '#welcome', ':0015L1AHL']
|
|
try:
|
|
cmd.pop(0)
|
|
parsed_chan = cmd[3] if self.Channel.Is_Channel(cmd[3]) else None
|
|
|
|
if self.ModConfig.reputation == 1:
|
|
parsed_UID = self.User.clean_uid(cmd[4])
|
|
get_reputation = self.Reputation.get_Reputation(parsed_UID)
|
|
|
|
if parsed_chan != self.Config.SALON_JAIL:
|
|
self.Protocol.send2socket(f":{service_id} MODE {parsed_chan} +b ~security-group:unknown-users")
|
|
self.Protocol.send2socket(f":{service_id} MODE {parsed_chan} +eee ~security-group:webirc-users ~security-group:known-users ~security-group:websocket-users")
|
|
|
|
if get_reputation is not None:
|
|
isWebirc = get_reputation.isWebirc
|
|
|
|
if not isWebirc:
|
|
if parsed_chan != self.Config.SALON_JAIL:
|
|
self.Protocol.send_sapart(nick_to_sapart=get_reputation.nickname, channel_name=parsed_chan)
|
|
|
|
if self.ModConfig.reputation_ban_all_chan == 1 and not isWebirc:
|
|
if parsed_chan != self.Config.SALON_JAIL:
|
|
self.Protocol.send2socket(f":{service_id} MODE {parsed_chan} +b {get_reputation.nickname}!*@*")
|
|
self.Protocol.send2socket(f":{service_id} KICK {parsed_chan} {get_reputation.nickname}")
|
|
|
|
self.Logs.debug(f'SJOIN parsed_uid : {parsed_UID}')
|
|
|
|
except KeyError as ke:
|
|
self.Logs.error(f"key error SJOIN : {ke}")
|
|
|
|
case 'SLOG':
|
|
# self.Base.scan_ports(cmd[7])
|
|
cmd.pop(0)
|
|
|
|
if not self.Base.is_valid_ip(cmd[7]):
|
|
return None
|
|
|
|
# if self.ModConfig.local_scan == 1 and not cmd[7] in self.Config.WHITELISTED_IP:
|
|
# self.localscan_remote_ip.append(cmd[7])
|
|
|
|
# if self.ModConfig.psutil_scan == 1 and not cmd[7] in self.Config.WHITELISTED_IP:
|
|
# self.psutil_remote_ip.append(cmd[7])
|
|
|
|
# if self.ModConfig.abuseipdb_scan == 1 and not cmd[7] in self.Config.WHITELISTED_IP:
|
|
# self.abuseipdb_remote_ip.append(cmd[7])
|
|
|
|
# if self.ModConfig.freeipapi_scan == 1 and not cmd[7] in self.Config.WHITELISTED_IP:
|
|
# self.freeipapi_remote_ip.append(cmd[7])
|
|
|
|
# if self.ModConfig.cloudfilt_scan == 1 and not cmd[7] in self.Config.WHITELISTED_IP:
|
|
# self.cloudfilt_remote_ip.append(cmd[7])
|
|
|
|
case 'NICK':
|
|
# :0010BS24L NICK [NEWNICK] 1697917711
|
|
# Changement de nickname
|
|
try:
|
|
cmd.pop(0)
|
|
uid = str(cmd[0]).replace(':','')
|
|
get_Reputation = self.Reputation.get_Reputation(uid)
|
|
jail_salon = self.Config.SALON_JAIL
|
|
service_id = self.Config.SERVICE_ID
|
|
|
|
if get_Reputation is None:
|
|
self.Logs.debug(f'This UID: {uid} is not listed in the reputation dataclass')
|
|
return None
|
|
|
|
# Update the new nickname
|
|
oldnick = get_Reputation.nickname
|
|
newnickname = cmd[2]
|
|
get_Reputation.nickname = newnickname
|
|
|
|
# If ban in all channel is ON then unban old nickname an ban the new nickname
|
|
if self.ModConfig.reputation_ban_all_chan == 1:
|
|
for chan in self.Channel.UID_CHANNEL_DB:
|
|
if chan.name != jail_salon:
|
|
self.Protocol.send2socket(f":{service_id} MODE {chan.name} -b {oldnick}!*@*")
|
|
self.Protocol.send2socket(f":{service_id} MODE {chan.name} +b {newnickname}!*@*")
|
|
|
|
except KeyError as ke:
|
|
self.Logs.error(f'cmd - NICK - KeyError: {ke}')
|
|
|
|
case 'QUIT':
|
|
# :001N1WD7L QUIT :Quit: free_znc_1
|
|
cmd.pop(0)
|
|
ban_all_chan = self.Base.int_if_possible(self.ModConfig.reputation_ban_all_chan)
|
|
user_id = str(cmd[0]).replace(':','')
|
|
final_UID = user_id
|
|
|
|
jail_salon = self.Config.SALON_JAIL
|
|
service_id = self.Config.SERVICE_ID
|
|
|
|
get_user_reputation = self.Reputation.get_Reputation(final_UID)
|
|
|
|
if get_user_reputation is not None:
|
|
final_nickname = get_user_reputation.nickname
|
|
for chan in self.Channel.UID_CHANNEL_DB:
|
|
if chan.name != jail_salon and ban_all_chan == 1:
|
|
self.Protocol.send2socket(f":{service_id} MODE {chan.name} -b {final_nickname}!*@*")
|
|
self.Reputation.delete(final_UID)
|
|
|
|
except KeyError as ke:
|
|
self.Logs.error(f"{ke} / {cmd} / length {str(len(cmd))}")
|
|
except IndexError as ie:
|
|
self.Logs.error(f"{ie} / {cmd} / length {str(len(cmd))}")
|
|
except Exception as err:
|
|
self.Logs.error(f"General Error: {err}")
|
|
|
|
def hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
|
|
|
|
command = str(cmd[0]).lower()
|
|
fromuser = user
|
|
fromchannel = channel if self.Channel.Is_Channel(channel) else None
|
|
channel = fromchannel
|
|
|
|
dnickname = self.Config.SERVICE_NICKNAME # Defender nickname
|
|
dchanlog = self.Config.SERVICE_CHANLOG # Defender chan log
|
|
dumodes = self.Config.SERVICE_UMODES # Les modes de Defender
|
|
service_id = self.Config.SERVICE_ID # Defender serveur id
|
|
jail_chan = self.Config.SALON_JAIL # Salon pot de miel
|
|
jail_chan_mode = self.Config.SALON_JAIL_MODES # Mode du salon "pot de miel"
|
|
|
|
match command:
|
|
|
|
case 'timer':
|
|
try:
|
|
timer_sent = self.Base.int_if_possible(cmd[1])
|
|
timer_sent = int(timer_sent)
|
|
self.Base.create_timer(timer_sent, self.run_db_action_timer)
|
|
|
|
except TypeError as te:
|
|
self.Logs.error(f"Type Error -> {te}")
|
|
except ValueError as ve:
|
|
self.Logs.error(f"Value Error -> {ve}")
|
|
|
|
case 'show_reputation':
|
|
|
|
if not self.Reputation.UID_REPUTATION_DB:
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="No one is suspected")
|
|
|
|
for suspect in self.Reputation.UID_REPUTATION_DB:
|
|
self.Protocol.send_notice(nick_from=dnickname,
|
|
nick_to=fromuser,
|
|
msg=f" Uid: {suspect.uid} | Nickname: {suspect.nickname} | Reputation: {suspect.score_connexion} | Secret code: {suspect.secret_code} | Connected on: {suspect.connexion_datetime}")
|
|
|
|
case 'code':
|
|
try:
|
|
release_code = cmd[1]
|
|
jailed_nickname = self.User.get_nickname(fromuser)
|
|
jailed_UID = self.User.get_uid(fromuser)
|
|
get_reputation = self.Reputation.get_Reputation(jailed_UID)
|
|
|
|
if get_reputation is None:
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=" No code is requested ...")
|
|
return False
|
|
|
|
jailed_IP = get_reputation.remote_ip
|
|
jailed_salon = self.Config.SALON_JAIL
|
|
reputation_seuil = self.ModConfig.reputation_seuil
|
|
welcome_salon = self.Config.SALON_LIBERER
|
|
|
|
self.Logs.debug(f"IP de {jailed_nickname} : {jailed_IP}")
|
|
link = self.Config.SERVEUR_LINK
|
|
color_green = self.Config.COLORS.green
|
|
color_black = self.Config.COLORS.black
|
|
|
|
if release_code == get_reputation.secret_code:
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg="Bon mot de passe. Allez du vent !", channel=jailed_salon)
|
|
|
|
if self.ModConfig.reputation_ban_all_chan == 1:
|
|
for chan in self.Channel.UID_CHANNEL_DB:
|
|
if chan.name != jailed_salon:
|
|
self.Protocol.send2socket(f":{service_id} MODE {chan.name} -b {jailed_nickname}!*@*")
|
|
|
|
self.Reputation.delete(jailed_UID)
|
|
self.Logs.debug(f'{jailed_UID} - {jailed_nickname} removed from REPUTATION_DB')
|
|
self.Protocol.send_sapart(nick_to_sapart=jailed_nickname, channel_name=jailed_salon)
|
|
self.Protocol.send_sajoin(nick_to_sajoin=jailed_nickname, channel_name=welcome_salon)
|
|
self.Protocol.send2socket(f":{link} REPUTATION {jailed_IP} {self.ModConfig.reputation_score_after_release}")
|
|
self.User.get_User(jailed_UID).score_connexion = reputation_seuil + 1
|
|
self.Protocol.send_priv_msg(nick_from=dnickname,
|
|
msg=f"[{color_green} MOT DE PASS CORRECT {color_black}] : You have now the right to enjoy the network !",
|
|
nick_to=jailed_nickname)
|
|
|
|
else:
|
|
self.Protocol.send_priv_msg(
|
|
nick_from=dnickname,
|
|
msg="Mauvais password",
|
|
channel=jailed_salon
|
|
)
|
|
self.Protocol.send_priv_msg(
|
|
nick_from=dnickname,
|
|
msg=f"[{color_green} MAUVAIS PASSWORD {color_black}] You have typed a wrong code. for recall your password is: {self.Config.SERVICE_PREFIX}code {get_reputation.secret_code}",
|
|
nick_to=jailed_nickname
|
|
)
|
|
|
|
except IndexError as ie:
|
|
self.Logs.error(f'Index Error: {ie}')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} code [code]")
|
|
except KeyError as ke:
|
|
self.Logs.error(f'_hcmd code: KeyError {ke}')
|
|
|
|
case 'autolimit':
|
|
try:
|
|
# autolimit on
|
|
# autolimit set [amount] [interval]
|
|
if len(cmd) < 2:
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} ON")
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} SET [AMOUNT] [INTERVAL]")
|
|
return None
|
|
|
|
arg = str(cmd[1]).lower()
|
|
|
|
match arg:
|
|
case 'on':
|
|
if self.ModConfig.autolimit == 0:
|
|
self.__update_configuration('autolimit', 1)
|
|
self.autolimit_isRunning = True
|
|
self.Base.create_thread(self.thread_autolimit)
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[{self.Config.COLORS.green}AUTOLIMIT{self.Config.COLORS.nogc}] Activated", channel=self.Config.SERVICE_CHANLOG)
|
|
else:
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[{self.Config.COLORS.red}AUTOLIMIT{self.Config.COLORS.nogc}] Already activated", channel=self.Config.SERVICE_CHANLOG)
|
|
|
|
case 'off':
|
|
if self.ModConfig.autolimit == 1:
|
|
self.__update_configuration('autolimit', 0)
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[{self.Config.COLORS.green}AUTOLIMIT{self.Config.COLORS.nogc}] Deactivated", channel=self.Config.SERVICE_CHANLOG)
|
|
else:
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[{self.Config.COLORS.red}AUTOLIMIT{self.Config.COLORS.nogc}] Already Deactivated", channel=self.Config.SERVICE_CHANLOG)
|
|
|
|
case 'set':
|
|
amount = int(cmd[2])
|
|
interval = int(cmd[3])
|
|
|
|
self.__update_configuration('autolimit_amount', amount)
|
|
self.__update_configuration('autolimit_interval', interval)
|
|
self.Protocol.send_priv_msg(
|
|
nick_from=dnickname,
|
|
msg=f"[{self.Config.COLORS.green}AUTOLIMIT{self.Config.COLORS.nogc}] Amount set to ({amount}) | Interval set to ({interval})",
|
|
channel=self.Config.SERVICE_CHANLOG
|
|
)
|
|
|
|
case _:
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} ON")
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} SET [AMOUNT] [INTERVAL]")
|
|
|
|
except Exception as err:
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} ON")
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} SET [AMOUNT] [INTERVAL]")
|
|
self.Logs.error(f"Value Error -> {err}")
|
|
|
|
case 'reputation':
|
|
# .reputation [on/off] --> activate or deactivate reputation system
|
|
# .reputation set banallchan [on/off] --> activate or deactivate ban in all channel
|
|
# .reputation set limit [xxxx] --> change the reputation threshold
|
|
# .reputation [arg1] [arg2] [arg3]
|
|
try:
|
|
len_cmd = len(cmd)
|
|
activation = str(cmd[1]).lower()
|
|
|
|
# Nous sommes dans l'activation ON / OFF
|
|
if len_cmd == 2:
|
|
key = 'reputation'
|
|
if activation == 'on':
|
|
|
|
if self.ModConfig.reputation == 1:
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}REPUTATION{self.Config.COLORS.black} ] : Already activated", channel=dchanlog)
|
|
return False
|
|
|
|
# self.update_db_configuration('reputation', 1)
|
|
self.__update_configuration(key, 1)
|
|
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}REPUTATION{self.Config.COLORS.black} ] : Activated by {fromuser}", channel=dchanlog)
|
|
|
|
self.Protocol.send_join_chan(uidornickname=dnickname, channel=jail_chan)
|
|
self.Protocol.send2socket(f":{service_id} SAMODE {jail_chan} +{dumodes} {dnickname}")
|
|
self.Protocol.send2socket(f":{service_id} MODE {jail_chan} +{jail_chan_mode}")
|
|
|
|
if self.ModConfig.reputation_sg == 1:
|
|
for chan in self.Channel.UID_CHANNEL_DB:
|
|
if chan.name != jail_chan:
|
|
self.Protocol.send2socket(f":{service_id} MODE {chan.name} +b ~security-group:unknown-users")
|
|
self.Protocol.send2socket(f":{service_id} MODE {chan.name} +eee ~security-group:webirc-users ~security-group:known-users ~security-group:websocket-users")
|
|
|
|
self.Channel.db_query_channel('add', self.module_name, jail_chan)
|
|
|
|
if activation == 'off':
|
|
|
|
if self.ModConfig.reputation == 0:
|
|
self.Protocol.send_priv_msg(
|
|
nick_from=dnickname,
|
|
msg=f"[ {self.Config.COLORS.green}REPUTATION{self.Config.COLORS.black} ] : Already deactivated",
|
|
channel=dchanlog
|
|
)
|
|
return False
|
|
|
|
self.__update_configuration(key, 0)
|
|
|
|
self.Protocol.send_priv_msg(
|
|
nick_from=dnickname,
|
|
msg=f"[ {self.Config.COLORS.red}REPUTATION{self.Config.COLORS.black} ] : Deactivated by {fromuser}",
|
|
channel=dchanlog
|
|
)
|
|
self.Protocol.send2socket(f":{service_id} SAMODE {jail_chan} -{dumodes} {dnickname}")
|
|
self.Protocol.send2socket(f":{service_id} MODE {jail_chan} -sS")
|
|
self.Protocol.send2socket(f":{service_id} PART {jail_chan}")
|
|
|
|
for chan in self.Channel.UID_CHANNEL_DB:
|
|
if chan.name != jail_chan:
|
|
self.Protocol.send2socket(f":{service_id} MODE {chan.name} -b ~security-group:unknown-users")
|
|
self.Protocol.send2socket(f":{service_id} MODE {chan.name} -eee ~security-group:webirc-users ~security-group:known-users ~security-group:websocket-users")
|
|
|
|
self.Channel.db_query_channel('del', self.module_name, jail_chan)
|
|
|
|
if len_cmd == 4:
|
|
get_set = str(cmd[1]).lower()
|
|
|
|
if get_set != 'set':
|
|
raise IndexError('Showing help')
|
|
|
|
get_options = str(cmd[2]).lower()
|
|
|
|
match get_options:
|
|
|
|
case 'banallchan':
|
|
key = 'reputation_ban_all_chan'
|
|
get_value = str(cmd[3]).lower()
|
|
if get_value == 'on':
|
|
|
|
if self.ModConfig.reputation_ban_all_chan == 1:
|
|
self.Protocol.send_priv_msg(
|
|
nick_from=dnickname,
|
|
msg=f"[ {self.Config.COLORS.red}BAN ON ALL CHANS{self.Config.COLORS.black} ] : Already activated",
|
|
channel=dchanlog
|
|
)
|
|
return False
|
|
|
|
# self.update_db_configuration(key, 1)
|
|
self.__update_configuration(key, 1)
|
|
|
|
self.Protocol.send_priv_msg(
|
|
nick_from=dnickname,
|
|
msg=f"[ {self.Config.COLORS.green}BAN ON ALL CHANS{self.Config.COLORS.black} ] : Activated by {fromuser}",
|
|
channel=dchanlog
|
|
)
|
|
|
|
elif get_value == 'off':
|
|
if self.ModConfig.reputation_ban_all_chan == 0:
|
|
self.Protocol.send_priv_msg(
|
|
nick_from=dnickname,
|
|
msg=f"[ {self.Config.COLORS.red}BAN ON ALL CHANS{self.Config.COLORS.black} ] : Already deactivated",
|
|
channel=dchanlog
|
|
)
|
|
return False
|
|
|
|
# self.update_db_configuration(key, 0)
|
|
self.__update_configuration(key, 0)
|
|
|
|
self.Protocol.send_priv_msg(
|
|
nick_from=dnickname,
|
|
msg=f"[ {self.Config.COLORS.green}BAN ON ALL CHANS{self.Config.COLORS.black} ] : Deactivated by {fromuser}",
|
|
channel=dchanlog
|
|
)
|
|
|
|
case 'limit':
|
|
reputation_seuil = int(cmd[3])
|
|
key = 'reputation_seuil'
|
|
|
|
# self.update_db_configuration(key, reputation_seuil)
|
|
self.__update_configuration(key, reputation_seuil)
|
|
|
|
self.Protocol.send_priv_msg(
|
|
nick_from=dnickname,
|
|
msg=f"[ {self.Config.COLORS.green}REPUTATION SEUIL{self.Config.COLORS.black} ] : Limit set to {str(reputation_seuil)} by {fromuser}",
|
|
channel=dchanlog
|
|
)
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Reputation set to {reputation_seuil}")
|
|
|
|
case 'timer':
|
|
reputation_timer = int(cmd[3])
|
|
key = 'reputation_timer'
|
|
self.__update_configuration(key, reputation_timer)
|
|
|
|
self.Protocol.send_priv_msg(
|
|
nick_from=dnickname,
|
|
msg=f"[ {self.Config.COLORS.green}REPUTATION TIMER{self.Config.COLORS.black} ] : Timer set to {str(reputation_timer)} minute(s) by {fromuser}",
|
|
channel=dchanlog
|
|
)
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Reputation set to {reputation_timer}")
|
|
|
|
case 'score_after_release':
|
|
reputation_score_after_release = int(cmd[3])
|
|
key = 'reputation_score_after_release'
|
|
self.__update_configuration(key, reputation_score_after_release)
|
|
|
|
self.Protocol.send_priv_msg(
|
|
nick_from=dnickname,
|
|
msg=f"[ {self.Config.COLORS.green}REPUTATION SCORE AFTER RELEASE{self.Config.COLORS.black} ] : Reputation score after release set to {str(reputation_score_after_release)} by {fromuser}",
|
|
channel=dchanlog
|
|
)
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Reputation score after release set to {reputation_score_after_release}")
|
|
|
|
case 'security_group':
|
|
reputation_sg = int(cmd[3])
|
|
key = 'reputation_sg'
|
|
self.__update_configuration(key, reputation_sg)
|
|
|
|
self.Protocol.send_priv_msg(
|
|
nick_from=dnickname,
|
|
msg=f"[ {self.Config.COLORS.green}REPUTATION SECURITY-GROUP{self.Config.COLORS.black} ] : Reputation Security-group set to {str(reputation_sg)} by {fromuser}",
|
|
channel=dchanlog
|
|
)
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Reputation score after release set to {reputation_sg}")
|
|
|
|
case _:
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation [ON/OFF]")
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set banallchan [ON/OFF]")
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set limit [1234]")
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set score_after_release [1234]")
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set timer [1234]")
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set action [kill|None]")
|
|
|
|
except IndexError as ie:
|
|
self.Logs.warning(f'{ie}')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation [ON/OFF]")
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set banallchan [ON/OFF]")
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set limit [1234]")
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set score_after_release [1234]")
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set timer [1234]")
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set action [kill|None]")
|
|
|
|
except ValueError as ve:
|
|
self.Logs.warning(f'{ve}')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=" La valeur devrait etre un entier >= 0")
|
|
|
|
case 'proxy_scan':
|
|
|
|
# .proxy_scan set local_scan on/off --> Va activer le scan des ports
|
|
# .proxy_scan set psutil_scan on/off --> Active les informations de connexion a la machine locale
|
|
# .proxy_scan set abuseipdb_scan on/off --> Active le scan via l'api abuseipdb
|
|
len_cmd = len(cmd)
|
|
color_green = self.Config.COLORS.green
|
|
color_red = self.Config.COLORS.red
|
|
color_black = self.Config.COLORS.black
|
|
|
|
if len_cmd == 4:
|
|
set_key = str(cmd[1]).lower()
|
|
|
|
if set_key != 'set':
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set local_scan [ON/OFF]')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set psutil_scan [ON/OFF]')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set abuseipdb_scan [ON/OFF]')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set freeipapi_scan [ON/OFF]')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set cloudfilt_scan [ON/OFF]')
|
|
|
|
option = str(cmd[2]).lower() # => local_scan, psutil_scan, abuseipdb_scan
|
|
action = str(cmd[3]).lower() # => on / off
|
|
|
|
match option:
|
|
case 'local_scan':
|
|
if action == 'on':
|
|
if self.ModConfig.local_scan == 1:
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
|
|
return None
|
|
|
|
self.__update_configuration(option, 1)
|
|
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
|
|
elif action == 'off':
|
|
if self.ModConfig.local_scan == 0:
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog)
|
|
return None
|
|
|
|
self.__update_configuration(option, 0)
|
|
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
|
|
|
|
case 'psutil_scan':
|
|
if action == 'on':
|
|
if self.ModConfig.psutil_scan == 1:
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
|
|
return None
|
|
|
|
self.__update_configuration(option, 1)
|
|
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
|
|
elif action == 'off':
|
|
if self.ModConfig.psutil_scan == 0:
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog)
|
|
return None
|
|
|
|
self.__update_configuration(option, 0)
|
|
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
|
|
|
|
case 'abuseipdb_scan':
|
|
if action == 'on':
|
|
if self.ModConfig.abuseipdb_scan == 1:
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
|
|
return None
|
|
|
|
self.__update_configuration(option, 1)
|
|
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
|
|
elif action == 'off':
|
|
if self.ModConfig.abuseipdb_scan == 0:
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog)
|
|
return None
|
|
|
|
self.__update_configuration(option, 0)
|
|
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
|
|
|
|
case 'freeipapi_scan':
|
|
if action == 'on':
|
|
if self.ModConfig.freeipapi_scan == 1:
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
|
|
return None
|
|
|
|
self.__update_configuration(option, 1)
|
|
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
|
|
elif action == 'off':
|
|
if self.ModConfig.freeipapi_scan == 0:
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog)
|
|
return None
|
|
|
|
self.__update_configuration(option, 0)
|
|
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
|
|
|
|
case 'cloudfilt_scan':
|
|
if action == 'on':
|
|
if self.ModConfig.cloudfilt_scan == 1:
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
|
|
return None
|
|
|
|
self.__update_configuration(option, 1)
|
|
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
|
|
elif action == 'off':
|
|
if self.ModConfig.cloudfilt_scan == 0:
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog)
|
|
return None
|
|
|
|
self.__update_configuration(option, 0)
|
|
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
|
|
|
|
case _:
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set local_scan [ON/OFF]')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set psutil_scan [ON/OFF]')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set abuseipdb_scan [ON/OFF]')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set freeipapi_scan [ON/OFF]')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set cloudfilt_scan [ON/OFF]')
|
|
else:
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set local_scan [ON/OFF]')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set psutil_scan [ON/OFF]')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set abuseipdb_scan [ON/OFF]')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set freeipapi_scan [ON/OFF]')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set cloudfilt_scan [ON/OFF]')
|
|
|
|
case 'flood':
|
|
# .flood on/off
|
|
# .flood set flood_message 5
|
|
# .flood set flood_time 1
|
|
# .flood set flood_timer 20
|
|
try:
|
|
len_cmd = len(cmd)
|
|
|
|
if len_cmd == 2:
|
|
activation = str(cmd[1]).lower()
|
|
key = 'flood'
|
|
if activation == 'on':
|
|
if self.ModConfig.flood == 1:
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Already activated", channel=dchanlog)
|
|
return False
|
|
|
|
self.__update_configuration(key, 1)
|
|
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Activated by {fromuser}", channel=dchanlog)
|
|
|
|
if activation == 'off':
|
|
if self.ModConfig.flood == 0:
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.red}FLOOD{self.Config.COLORS.black} ] : Already Deactivated", channel=dchanlog)
|
|
return False
|
|
|
|
self.__update_configuration(key, 0)
|
|
|
|
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Deactivated by {fromuser}", channel=dchanlog)
|
|
|
|
if len_cmd == 4:
|
|
set_key = str(cmd[2]).lower()
|
|
|
|
if str(cmd[1]).lower() == 'set':
|
|
match set_key:
|
|
case 'flood_message':
|
|
key = 'flood_message'
|
|
set_value = int(cmd[3])
|
|
self.__update_configuration(key, set_value)
|
|
|
|
self.Protocol.send_priv_msg(nick_from=dnickname,
|
|
msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Flood message set to {set_value} by {fromuser}",
|
|
channel=dchanlog)
|
|
|
|
case 'flood_time':
|
|
key = 'flood_time'
|
|
set_value = int(cmd[3])
|
|
self.__update_configuration(key, set_value)
|
|
|
|
self.Protocol.send_priv_msg(nick_from=dnickname,
|
|
msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Flood time set to {set_value} by {fromuser}",
|
|
channel=dchanlog)
|
|
|
|
case 'flood_timer':
|
|
key = 'flood_timer'
|
|
set_value = int(cmd[3])
|
|
self.__update_configuration(key, set_value)
|
|
|
|
self.Protocol.send_priv_msg(nick_from=dnickname,
|
|
msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Flood timer set to {set_value} by {fromuser}",
|
|
channel=dchanlog)
|
|
|
|
case _:
|
|
pass
|
|
|
|
except ValueError as ve:
|
|
self.Logs.error(f"{self.__class__.__name__} Value Error : {ve}")
|
|
|
|
case 'status':
|
|
color_green = self.Config.COLORS.green
|
|
color_red = self.Config.COLORS.red
|
|
color_black = self.Config.COLORS.black
|
|
nogc = self.Config.COLORS.nogc
|
|
try:
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' [{color_green if self.ModConfig.reputation == 1 else color_red}Reputation{nogc}] ==> {self.ModConfig.reputation}')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' reputation_seuil ==> {self.ModConfig.reputation_seuil}')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' reputation_after_release ==> {self.ModConfig.reputation_score_after_release}')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' reputation_ban_all_chan ==> {self.ModConfig.reputation_ban_all_chan}')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' reputation_timer ==> {self.ModConfig.reputation_timer}')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=' [Proxy_scan]')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' {color_green if self.ModConfig.local_scan == 1 else color_red}local_scan{nogc} ==> {self.ModConfig.local_scan}')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' {color_green if self.ModConfig.psutil_scan == 1 else color_red}psutil_scan{nogc} ==> {self.ModConfig.psutil_scan}')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' {color_green if self.ModConfig.abuseipdb_scan == 1 else color_red}abuseipdb_scan{nogc} ==> {self.ModConfig.abuseipdb_scan}')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' {color_green if self.ModConfig.freeipapi_scan == 1 else color_red}freeipapi_scan{nogc} ==> {self.ModConfig.freeipapi_scan}')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' {color_green if self.ModConfig.cloudfilt_scan == 1 else color_red}cloudfilt_scan{nogc} ==> {self.ModConfig.cloudfilt_scan}')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' [{color_green if self.ModConfig.flood == 1 else color_red}Flood{nogc}] ==> {self.ModConfig.flood}')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=' flood_action ==> Coming soon')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' flood_message ==> {self.ModConfig.flood_message}')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' flood_time ==> {self.ModConfig.flood_time}')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' flood_timer ==> {self.ModConfig.flood_timer}')
|
|
except KeyError as ke:
|
|
self.Logs.error(f"Key Error : {ke}")
|
|
|
|
case 'info':
|
|
try:
|
|
nickoruid = cmd[1]
|
|
UserObject = self.User.get_User(nickoruid)
|
|
|
|
if UserObject is not None:
|
|
channels: list = []
|
|
for chan in self.Channel.UID_CHANNEL_DB:
|
|
for uid_in_chan in chan.uids:
|
|
if self.Base.clean_uid(uid_in_chan) == UserObject.uid:
|
|
channels.append(chan.name)
|
|
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' UID : {UserObject.uid}')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' NICKNAME : {UserObject.nickname}')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' USERNAME : {UserObject.username}')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' REALNAME : {UserObject.realname}')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' HOSTNAME : {UserObject.hostname}')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' VHOST : {UserObject.vhost}')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' IP : {UserObject.remote_ip}')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Country : {UserObject.geoip}')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' WebIrc : {UserObject.isWebirc}')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' WebWebsocket : {UserObject.isWebsocket}')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' REPUTATION : {UserObject.score_connexion}')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' MODES : {UserObject.umodes}')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' CHANNELS : {channels}')
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' CONNECTION TIME : {UserObject.connexion_datetime}')
|
|
else:
|
|
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f":{dnickname} NOTICE {fromuser} : This user {nickoruid} doesn't exist")
|
|
|
|
except KeyError as ke:
|
|
self.Logs.warning(f"Key error info user : {ke}")
|
|
|
|
case 'sentinel':
|
|
# .sentinel on
|
|
activation = str(cmd[1]).lower()
|
|
service_id = self.Config.SERVICE_ID
|
|
|
|
channel_to_dont_quit = [self.Config.SALON_JAIL, self.Config.SERVICE_CHANLOG]
|
|
|
|
if activation == 'on':
|
|
for chan in self.Channel.UID_CHANNEL_DB:
|
|
if chan.name not in channel_to_dont_quit:
|
|
self.Protocol.send_join_chan(uidornickname=dnickname, channel=chan.name)
|
|
if activation == 'off':
|
|
for chan in self.Channel.UID_CHANNEL_DB:
|
|
if chan.name not in channel_to_dont_quit:
|
|
self.Protocol.part(uidornickname=dnickname, channel=chan.name)
|
|
self.join_saved_channels()
|