Code Refactoring

This commit is contained in:
adator
2025-11-23 18:22:54 +01:00
parent 5938a1511b
commit cbe527d7d9
10 changed files with 558 additions and 398 deletions

View File

@@ -1,4 +1,5 @@
from dataclasses import dataclass
import logging
from typing import Any, TYPE_CHECKING, Optional
from core.classes.interfaces.imodule import IModule
import mods.defender.schemas as schemas
@@ -26,6 +27,8 @@ class Defender(IModule):
def __init__(self, context: 'Loader') -> None:
super().__init__(context)
self._mod_config: Optional[schemas.ModConfModel] = None
self.Schemas = schemas.RepDB()
self.Threads = thds
@property
def mod_config(self) -> ModConfModel:
@@ -55,40 +58,43 @@ class Defender(IModule):
return None
async def load(self):
# Variable qui va contenir les options de configuration du module Defender
self._mod_config: schemas.ModConfModel = self.ModConfModel()
# sync the database with local variable (Mandatory)
await self.sync_db()
# Add module schemas
self.Schemas = schemas
# Add module utils functions
self.mod_utils = utils
# Create module commands (Mandatory)
self.ctx.Irc.build_command(0, self.module_name, 'code', 'Display the code or key for access')
self.ctx.Irc.build_command(1, self.module_name, 'info', 'Provide information about the channel or server')
self.ctx.Irc.build_command(1, self.module_name, 'autolimit', 'Automatically set channel user limits')
self.ctx.Irc.build_command(3, self.module_name, 'reputation', 'Check or manage user reputation')
self.ctx.Irc.build_command(3, self.module_name, 'proxy_scan', 'Scan users for proxy connections')
self.ctx.Irc.build_command(3, self.module_name, 'flood', 'Handle flood detection and mitigation')
self.ctx.Irc.build_command(3, self.module_name, 'status', 'Check the status of the server or bot')
self.ctx.Irc.build_command(3, self.module_name, 'show_reputation', 'Display reputation information')
self.ctx.Irc.build_command(3, self.module_name, 'sentinel', 'Monitor and guard the channel or server')
self.ctx.Commands.build_command(0, self.module_name, 'code', 'Display the code or key for access')
self.ctx.Commands.build_command(1, self.module_name, 'info', 'Provide information about the channel or server')
self.ctx.Commands.build_command(1, self.module_name, 'autolimit', 'Automatically set channel user limits')
self.ctx.Commands.build_command(3, self.module_name, 'reputation', 'Check or manage user reputation')
self.ctx.Commands.build_command(3, self.module_name, 'proxy_scan', 'Scan users for proxy connections')
self.ctx.Commands.build_command(3, self.module_name, 'flood', 'Handle flood detection and mitigation')
self.ctx.Commands.build_command(3, self.module_name, 'status', 'Check the status of the server or bot')
self.ctx.Commands.build_command(3, self.module_name, 'show_reputation', 'Display reputation information')
self.ctx.Commands.build_command(3, self.module_name, 'sentinel', 'Monitor and guard the channel or server')
self.timeout = self.ctx.Config.API_TIMEOUT
# Listes qui vont contenir les ip a scanner avec les différentes API
self.Schemas.DB_ABUSEIPDB_USERS = self.Schemas.DB_FREEIPAPI_USERS = self.Schemas.DB_CLOUDFILT_USERS = []
self.Schemas.DB_PSUTIL_USERS = self.Schemas.DB_LOCALSCAN_USERS = []
self.Schemas.DB_ABUSEIPDB_USERS = []
self.Schemas.DB_FREEIPAPI_USERS = []
self.Schemas.DB_CLOUDFILT_USERS = []
self.Schemas.DB_PSUTIL_USERS = []
self.Schemas.DB_LOCALSCAN_USERS = []
# Variables qui indique que les threads sont en cours d'éxecutions
self.abuseipdb_isRunning = self.freeipapi_isRunning = self.cloudfilt_isRunning = True
self.psutil_isRunning = self.localscan_isRunning = self.reputationTimer_isRunning = True
self.autolimit_isRunning = True
self.abuseipdb_isRunning = True if self.mod_config.abuseipdb_scan == 1 else False
self.freeipapi_isRunning = True if self.mod_config.freeipapi_scan == 1 else False
self.cloudfilt_isRunning = True if self.mod_config.cloudfilt_scan == 1 else False
self.psutil_isRunning = True if self.mod_config.psutil_scan == 1 else False
self.localscan_isRunning = True if self.mod_config.local_scan == 1 else False
self.reputationTimer_isRunning = True if self.mod_config.reputation == 1 else False
self.autolimit_isRunning = True if self.mod_config.autolimit == 1 else False
# Variable qui va contenir les users
self.flood_system = {}
@@ -101,19 +107,21 @@ class Defender(IModule):
self.cloudfilt_key = 'r1gEtjtfgRQjtNBDMxsg'
# Démarrer les threads pour démarrer les api
self.ctx.Base.create_asynctask(thds.coro_freeipapi_scan(self))
self.ctx.Base.create_asynctask(thds.coro_cloudfilt_scan(self))
self.ctx.Base.create_asynctask(thds.coro_abuseipdb_scan(self))
self.ctx.Base.create_asynctask(thds.coro_local_scan(self))
self.ctx.Base.create_asynctask(thds.coro_psutil_scan(self))
self.ctx.Base.create_asynctask(thds.coro_apply_reputation_sanctions(self))
if self.mod_config.autolimit == 1:
self.ctx.Base.create_asynctask(thds.coro_autolimit(self))
self.ctx.Base.create_asynctask(self.Threads.coro_freeipapi_scan(self)) if self.mod_config.freeipapi_scan == 1 else None
self.ctx.Base.create_asynctask(self.Threads.coro_cloudfilt_scan(self)) if self.mod_config.cloudfilt_scan == 1 else None
self.ctx.Base.create_asynctask(self.Threads.coro_abuseipdb_scan(self)) if self.mod_config.abuseipdb_scan == 1 else None
self.ctx.Base.create_asynctask(self.Threads.coro_local_scan(self)) if self.mod_config.local_scan == 1 else None
self.ctx.Base.create_asynctask(self.Threads.coro_psutil_scan(self)) if self.mod_config.psutil_scan == 1 else None
self.ctx.Base.create_asynctask(self.Threads.coro_apply_reputation_sanctions(self)) if self.mod_config.reputation == 1 else None
self.ctx.Base.create_asynctask(self.Threads.coro_autolimit(self)) if self.mod_config.autolimit == 1 else None
if self.mod_config.reputation == 1:
await self.ctx.Irc.Protocol.send_sjoin(self.ctx.Config.SALON_JAIL)
await self.ctx.Irc.Protocol.send2socket(f":{self.ctx.Config.SERVICE_NICKNAME} SAMODE {self.ctx.Config.SALON_JAIL} +o {self.ctx.Config.SERVICE_NICKNAME}")
for chan in self.ctx.Channel.UID_CHANNEL_DB:
if chan.name != self.ctx.Config.SALON_JAIL:
await self.ctx.Irc.Protocol.send_set_mode('+b', channel_name=chan.name, params='~security-group:unknown-users')
await self.ctx.Irc.Protocol.send_set_mode('+eee', channel_name=chan.name, params='~security-group:webirc-users ~security-group:known-users ~security-group:websocket-users')
def __onload(self):
@@ -138,7 +146,7 @@ class Defender(IModule):
if localscan:
self.Schemas.DB_LOCALSCAN_USERS = localscan
def unload(self) -> None:
async def unload(self) -> None:
"""Cette methode sera executée a chaque désactivation ou
rechargement de module
"""
@@ -158,6 +166,13 @@ class Defender(IModule):
self.ctx.Commands.drop_command_by_module(self.module_name)
if self.mod_config.reputation == 1:
await self.ctx.Irc.Protocol.send_part_chan(self.ctx.Config.SERVICE_ID, self.ctx.Config.SALON_JAIL)
for chan in self.ctx.Channel.UID_CHANNEL_DB:
if chan.name != self.ctx.Config.SALON_JAIL:
await self.ctx.Irc.Protocol.send_set_mode('-b', channel_name=chan.name, params='~security-group:unknown-users')
await self.ctx.Irc.Protocol.send_set_mode('-eee', channel_name=chan.name, params='~security-group:webirc-users ~security-group:known-users ~security-group:websocket-users')
return None
async def insert_db_trusted(self, uid: str, nickname:str) -> None:
@@ -411,22 +426,26 @@ class Defender(IModule):
if self.mod_config.reputation == 1:
await self.ctx.Irc.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.ctx.Config.COLORS.green}REPUTATION{self.ctx.Config.COLORS.black} ] : Already activated", channel=dchanlog)
return False
return None
# self.update_db_configuration('reputation', 1)
await self.update_configuration(key, 1)
self.ctx.Base.create_asynctask(self.Threads.coro_apply_reputation_sanctions(self))
await self.ctx.Irc.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.ctx.Config.COLORS.green}REPUTATION{self.ctx.Config.COLORS.black} ] : Activated by {fromuser}", channel=dchanlog)
await self.ctx.Irc.Protocol.send_join_chan(uidornickname=dnickname, channel=jail_chan)
await self.ctx.Irc.Protocol.send2socket(f":{service_id} SAMODE {jail_chan} +{dumodes} {dnickname}")
await self.ctx.Irc.Protocol.send2socket(f":{service_id} MODE {jail_chan} +{jail_chan_mode}")
await self.ctx.Irc.Protocol.send_set_mode(f'+{jail_chan_mode}', channel_name=jail_chan)
if self.mod_config.reputation_sg == 1:
for chan in self.ctx.Channel.UID_CHANNEL_DB:
if chan.name != jail_chan:
await self.ctx.Irc.Protocol.send2socket(f":{service_id} MODE {chan.name} +b ~security-group:unknown-users")
await self.ctx.Irc.Protocol.send2socket(f":{service_id} MODE {chan.name} +eee ~security-group:webirc-users ~security-group:known-users ~security-group:websocket-users")
await self.ctx.Irc.Protocol.send_set_mode('+b', channel_name=chan.name, params='~security-group:unknown-users')
await self.ctx.Irc.Protocol.send_set_mode(
'+eee',
channel_name=chan.name,
params='~security-group:webirc-users ~security-group:known-users ~security-group:websocket-users'
)
await self.ctx.Channel.db_query_channel('add', self.module_name, jail_chan)
@@ -441,20 +460,26 @@ class Defender(IModule):
return False
await self.update_configuration(key, 0)
self.reputationTimer_isRunning = False
await self.ctx.Irc.Protocol.send_priv_msg(
nick_from=dnickname,
msg=f"[ {self.ctx.Config.COLORS.red}REPUTATION{self.ctx.Config.COLORS.black} ] : Deactivated by {fromuser}",
channel=dchanlog
)
await self.ctx.Irc.Protocol.send2socket(f":{service_id} SAMODE {jail_chan} -{dumodes} {dnickname}")
await self.ctx.Irc.Protocol.send2socket(f":{service_id} MODE {jail_chan} -sS")
await self.ctx.Irc.Protocol.send2socket(f":{service_id} PART {jail_chan}")
await self.ctx.Irc.Protocol.send_set_mode('-sS', channel_name=jail_chan)
await self.ctx.Irc.Protocol.send_part_chan(service_id, jail_chan)
for chan in self.ctx.Channel.UID_CHANNEL_DB:
if chan.name != jail_chan:
await self.ctx.Irc.Protocol.send2socket(f":{service_id} MODE {chan.name} -b ~security-group:unknown-users")
await self.ctx.Irc.Protocol.send2socket(f":{service_id} MODE {chan.name} -eee ~security-group:webirc-users ~security-group:known-users ~security-group:websocket-users")
await self.ctx.Irc.Protocol.send_set_mode('-b', channel_name=chan.name, params='~security-group:unknown-users')
await self.ctx.Irc.Protocol.send_set_mode(
'-eee',
channel_name=chan.name,
params='~security-group:webirc-users ~security-group:known-users ~security-group:websocket-users'
)
await self.ctx.Channel.db_query_channel('del', self.module_name, jail_chan)
@@ -464,12 +489,17 @@ class Defender(IModule):
match get_options:
case 'release':
# .reputation release [nick]
p = await self.ctx.Irc.Protocol
link = self.ctx.Config.SERVEUR_LINK
jailed_salon = self.ctx.Config.SALON_JAIL
welcome_salon = self.ctx.Config.SALON_LIBERER
client_obj = self.ctx.User.get_user(str(cmd[2]))
if self.mod_config.reputation != 1:
await self.ctx.Irc.Protocol.send_notice(nick_from=dnickname,
nick_to=fromuser,
msg="The reputation system is not activated!")
return None
if client_obj is None:
await self.ctx.Irc.Protocol.send_notice(nick_from=dnickname,
nick_to=fromuser,
@@ -658,6 +688,7 @@ class Defender(IModule):
await self.ctx.Irc.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
return None
self.ctx.Base.create_asynctask(self.Threads.coro_local_scan(self))
await self.update_configuration(option, 1)
await self.ctx.Irc.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
@@ -667,6 +698,7 @@ class Defender(IModule):
return None
await self.update_configuration(option, 0)
self.localscan_isRunning = False
await self.ctx.Irc.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
@@ -676,6 +708,7 @@ class Defender(IModule):
await self.ctx.Irc.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
return None
self.ctx.Base.create_asynctask(self.Threads.coro_psutil_scan(self))
await self.update_configuration(option, 1)
await self.ctx.Irc.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
@@ -685,6 +718,7 @@ class Defender(IModule):
return None
await self.update_configuration(option, 0)
self.psutil_isRunning = False
await self.ctx.Irc.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
@@ -694,6 +728,7 @@ class Defender(IModule):
await self.ctx.Irc.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
return None
self.ctx.Base.create_asynctask(self.Threads.coro_abuseipdb_scan(self))
await self.update_configuration(option, 1)
await self.ctx.Irc.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
@@ -703,6 +738,7 @@ class Defender(IModule):
return None
await self.update_configuration(option, 0)
self.abuseipdb_isRunning = False
await self.ctx.Irc.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
@@ -712,6 +748,7 @@ class Defender(IModule):
await self.ctx.Irc.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
return None
self.ctx.Base.create_asynctask(self.Threads.coro_freeipapi_scan(self))
await self.update_configuration(option, 1)
await self.ctx.Irc.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
@@ -721,6 +758,7 @@ class Defender(IModule):
return None
await self.update_configuration(option, 0)
self.freeipapi_isRunning = False
await self.ctx.Irc.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
@@ -730,6 +768,7 @@ class Defender(IModule):
await self.ctx.Irc.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
return None
self.ctx.Base.create_asynctask(self.Threads.coro_cloudfilt_scan(self))
await self.update_configuration(option, 1)
await self.ctx.Irc.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
@@ -739,6 +778,7 @@ class Defender(IModule):
return None
await self.update_configuration(option, 0)
self.cloudfilt_isRunning = False
await self.ctx.Irc.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
@@ -915,3 +955,6 @@ class Defender(IModule):
await self.join_saved_channels()
return None
case _:
pass