Refactoring the Code, clean unsused methods. avoid multi calls to get_user, get_nickname ...

This commit is contained in:
adator
2025-11-09 23:35:27 +01:00
parent a6cf11ae2a
commit 371645149d

View File

@@ -1,4 +1,5 @@
from dataclasses import dataclass
from typing import Any
from core.classes.interfaces.imodule import IModule
import mods.defender.schemas as schemas
import mods.defender.utils as utils
@@ -53,7 +54,6 @@ class Defender(IModule):
self.Irc.build_command(3, self.module_name, 'proxy_scan', 'Scan users for proxy connections')
self.Irc.build_command(3, self.module_name, 'flood', 'Handle flood detection and mitigation')
self.Irc.build_command(3, self.module_name, 'status', 'Check the status of the server or bot')
self.Irc.build_command(3, self.module_name, 'timer', 'Set or manage timers')
self.Irc.build_command(3, self.module_name, 'show_reputation', 'Display reputation information')
self.Irc.build_command(3, self.module_name, 'sentinel', 'Monitor and guard the channel or server')
@@ -64,7 +64,7 @@ class Defender(IModule):
self.Schemas.DB_PSUTIL_USERS = self.Schemas.DB_LOCALSCAN_USERS = []
# Variables qui indique que les threads sont en cours d'éxecutions
self.abuseipdb_isRunning = self.freeipapi_isRunning= self.cloudfilt_isRunning = True
self.abuseipdb_isRunning = self.freeipapi_isRunning = self.cloudfilt_isRunning = True
self.psutil_isRunning = self.localscan_isRunning = self.reputationTimer_isRunning = True
self.autolimit_isRunning = True
@@ -84,7 +84,6 @@ class Defender(IModule):
self.Base.create_thread(func=thds.thread_abuseipdb_scan, func_args=(self, ))
self.Base.create_thread(func=thds.thread_local_scan, func_args=(self, ))
self.Base.create_thread(func=thds.thread_psutil_scan, func_args=(self, ))
self.Base.create_thread(func=thds.thread_apply_reputation_sanctions, func_args=(self, ))
if self.ModConfig.autolimit == 1:
@@ -140,9 +139,12 @@ class Defender(IModule):
return None
def insert_db_trusted(self, uid: str, nickname:str) -> None:
u = self.User.get_user(uid)
if u is None:
return None
uid = self.User.get_uid(uid)
nickname = self.User.get_nickname(nickname)
uid = u.uid
nickname = u.nickname
query = "SELECT id FROM def_trusted WHERE user = ?"
exec_query = self.Base.db_execute_query(query, {"user": nickname})
@@ -178,29 +180,6 @@ class Defender(IModule):
except Exception as err:
self.Logs.error(f"General Error: {err}")
def run_db_action_timer(self, wait_for: float = 0) -> None:
query = f"SELECT param_key FROM {self.Config.TABLE_CONFIG}"
res = self.Base.db_execute_query(query)
service_id = self.Config.SERVICE_ID
dchanlog = self.Config.SERVICE_CHANLOG
for param in res.fetchall():
if param[0] == 'reputation':
self.Protocol.send_priv_msg(
nick_from=service_id,
msg=f" ===> {param[0]}",
channel=dchanlog
)
else:
self.Protocol.send_priv_msg(
nick_from=service_id,
msg=f"{param[0]}",
channel=dchanlog
)
return None
def cmd(self, data: list[str]) -> None:
if not data or len(data) < 2:
@@ -227,6 +206,7 @@ class Defender(IModule):
return None
case 'UID':
print(f"{self.module_name} - {cmd}")
self.Utils.handle_on_uid(self, cmd)
return None
@@ -257,10 +237,13 @@ class Defender(IModule):
except Exception as err:
self.Logs.error(f"General Error: {err}", exc_info=True)
def hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
def hcmds(self, user: str, channel: Any, cmd: list, fullcmd: list = []) -> None:
u = self.User.get_user(user)
if u is None:
return None
command = str(cmd[0]).lower()
fromuser = user
fromuser = u.nickname
channel = fromchannel = channel if self.Channel.is_valid_channel(channel) else None
dnickname = self.Config.SERVICE_NICKNAME # Defender nickname
@@ -272,17 +255,6 @@ class Defender(IModule):
match command:
case 'timer':
try:
timer_sent = self.Base.int_if_possible(cmd[1])
timer_sent = int(timer_sent)
self.Base.create_timer(timer_sent, self.run_db_action_timer)
except TypeError as te:
self.Logs.error(f"Type Error -> {te}")
except ValueError as ve:
self.Logs.error(f"Value Error -> {ve}")
case 'show_reputation':
if not self.Reputation.UID_REPUTATION_DB:
@@ -296,8 +268,8 @@ class Defender(IModule):
case 'code':
try:
release_code = cmd[1]
jailed_nickname = self.User.get_nickname(fromuser)
jailed_UID = self.User.get_uid(fromuser)
jailed_nickname = u.nickname
jailed_UID = u.uid
get_reputation = self.Reputation.get_reputation(jailed_UID)
if get_reputation is None:
@@ -327,7 +299,7 @@ class Defender(IModule):
self.Protocol.send_sapart(nick_to_sapart=jailed_nickname, channel_name=jailed_salon)
self.Protocol.send_sajoin(nick_to_sajoin=jailed_nickname, channel_name=welcome_salon)
self.Protocol.send2socket(f":{link} REPUTATION {jailed_IP} {self.ModConfig.reputation_score_after_release}")
self.User.get_user(jailed_UID).score_connexion = reputation_seuil + 1
u.score_connexion = reputation_seuil + 1
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[{color_green} MOT DE PASS CORRECT {color_black}] : You have now the right to enjoy the network !",
nick_to=jailed_nickname)