Moving modules in separate folders

This commit is contained in:
adator
2025-08-08 17:38:45 +02:00
parent 7a50bc9632
commit 0a655b2df0
13 changed files with 138 additions and 127 deletions

View File

@@ -391,7 +391,7 @@ class Base:
result = response.fetchall()
for param, value in result:
if type(getattr(dataclassObj, param)) == list:
if isinstance(getattr(dataclassObj, param), list):
value = ast.literal_eval(value)
setattr(dataclassObj, param, self.int_if_possible(value))

View File

@@ -1,6 +1,6 @@
from re import match, findall, search
from datetime import datetime
from typing import TYPE_CHECKING, Union
from typing import TYPE_CHECKING, Optional, Union
from ssl import SSLEOFError, SSLError
if TYPE_CHECKING:
@@ -17,14 +17,21 @@ class Unrealircd6:
self.__Base = ircInstance.Base
self.__Settings = ircInstance.Base.Settings
self.known_protocol = ['SJOIN', 'UID', 'MD', 'QUIT', 'SQUIT',
self.known_protocol: set[str] = {'SJOIN', 'UID', 'MD', 'QUIT', 'SQUIT',
'EOS', 'PRIVMSG', 'MODE', 'UMODE2',
'VERSION', 'REPUTATION', 'SVS2MODE',
'SLOG', 'NICK', 'PART', 'PONG'
]
'SLOG', 'NICK', 'PART', 'PONG'}
self.__Base.logs.info(f"** Loading protocol [{__name__}]")
def get_ircd_protocol_poisition(self, cmd: list[str]) -> tuple[int, Optional[str]]:
for index, token in enumerate(cmd):
if token.upper() in self.known_protocol:
return index, token.upper()
return (-1, None)
def send2socket(self, message: str, print_log: bool = True) -> None:
"""Envoit les commandes à envoyer au serveur.

View File

@@ -1,10 +1,26 @@
from datetime import datetime
from dataclasses import dataclass, field
from typing import Literal
from json import dumps
from dataclasses import dataclass, field, asdict, fields
from typing import Literal, Any
from os import sep
@dataclass
class MClient:
class MainModel:
"""Parent Model contains important methods"""
def to_dict(self) -> dict[str, Any]:
"""Return the fields of a dataclass instance as a new dictionary mapping field names to field values."""
return asdict(self)
def to_json(self) -> str:
"""Return the object of a dataclass a json str."""
return dumps(self.to_dict())
def get_attributes(self) -> list[str]:
"""Return a list of attributes name"""
return [f.name for f in fields(self)]
@dataclass
class MClient(MainModel):
"""Model Client for registred nickname"""
uid: str = None
account: str = None
@@ -22,7 +38,7 @@ class MClient:
connexion_datetime: datetime = field(default=datetime.now())
@dataclass
class MUser:
class MUser(MainModel):
"""Model User"""
uid: str = None
@@ -40,7 +56,7 @@ class MUser:
connexion_datetime: datetime = field(default=datetime.now())
@dataclass
class MAdmin:
class MAdmin(MainModel):
"""Model Admin"""
uid: str = None
@@ -59,7 +75,7 @@ class MAdmin:
level: int = 0
@dataclass
class MReputation:
class MReputation(MainModel):
"""Model Reputation"""
uid: str = None
nickname: str = None
@@ -77,7 +93,7 @@ class MReputation:
secret_code: str = None
@dataclass
class MChannel:
class MChannel(MainModel):
"""Model Channel"""
name: str = None
@@ -92,7 +108,7 @@ class MChannel:
"""
@dataclass
class ColorModel:
class ColorModel(MainModel):
white: str = "\x0300"
black: str = "\x0301"
blue: str = "\x0302"
@@ -104,7 +120,7 @@ class ColorModel:
underline: str = "\x1F"
@dataclass
class MConfig:
class MConfig(MainModel):
"""Model Configuration"""
SERVEUR_IP: str = "127.0.0.1"
@@ -305,7 +321,7 @@ class MConfig:
"""0: utf-8 | 1: iso-8859-1"""
@dataclass
class MClone:
class MClone(MainModel):
"""Model Clone"""
connected: bool = False
uid: str = None

View File

@@ -494,7 +494,8 @@ class Irc:
try:
# module_name : mod_voice
module_name = module_name.lower()
class_name = module_name.split('_')[1].capitalize() # ==> Voice
module_folder = module_name.split('_')[1].lower() # ==> voice
class_name = module_name.split('_')[1].capitalize() # ==> Voice
# print(self.loaded_classes)
@@ -511,7 +512,7 @@ class Irc:
)
return False
the_module = sys.modules['mods.' + module_name]
the_module = sys.modules[f'mods.{module_folder}.{module_name}']
importlib.reload(the_module)
my_class = getattr(the_module, class_name, None)
new_instance = my_class(self.ircObject)
@@ -529,7 +530,7 @@ class Irc:
return False
# Charger le module
loaded_module = importlib.import_module(f"mods.{module_name}")
loaded_module = importlib.import_module(f'mods.{module_folder}.{module_name}')
my_class = getattr(loaded_module, class_name, None) # Récuperer le nom de classe
create_instance_of_the_class = my_class(self.ircObject) # Créer une nouvelle instance de la classe
@@ -612,13 +613,14 @@ class Irc:
def reload_module(self, from_user: str, mod_name: str) -> bool:
try:
module_name = mod_name.lower() # ==> mod_defender
module_folder = module_name.split('_')[1].lower() # ==> defender
class_name = module_name.split('_')[1].capitalize() # ==> Defender
if 'mods.' + module_name in sys.modules:
self.Logs.info('Unload the module ...')
self.loaded_classes[class_name].unload()
self.Logs.info('Module Already Loaded ... reloading the module ...')
the_module = sys.modules['mods.' + module_name]
the_module = sys.modules[f'mods.{module_folder}.{module_name}']
importlib.reload(the_module)
# Supprimer la class déja instancier

View File

@@ -2,53 +2,19 @@ import socket
import json
import time
import re
from webbrowser import get
import psutil
import requests
from dataclasses import dataclass
from datetime import datetime
from typing import Union, TYPE_CHECKING
import core.definition as df
# Le module crée devra réspecter quelques conditions
# 1. Le nom de la classe devra toujours s'appeler comme le module. Exemple => nom de class Defender | nom du module mod_defender
# 2. la methode __init__ devra toujours avoir les parametres suivant (self, irc:object)
# 1 . Créer la variable Irc dans le module
# 2 . Récuperer la configuration dans une variable
# 3 . Définir et enregistrer les nouvelles commandes
# 4 . Créer vos tables, en utilisant toujours le nom des votre classe en minuscule ==> defender_votre-table
# 3. Methode suivantes:
# cmd(self, data:list)
# hcmds(self, user:str, cmd: list)
# unload(self)
import core.definition as df
import mods.defender.schemas as schemas
if TYPE_CHECKING:
from core.irc import Irc
class Defender():
@dataclass
class ModConfModel:
reputation: int
reputation_timer: int
reputation_seuil: int
reputation_score_after_release: int
reputation_ban_all_chan: int
reputation_sg: int
local_scan: int
psutil_scan: int
abuseipdb_scan: int
freeipapi_scan: int
cloudfilt_scan: int
flood: int
flood_message: int
flood_time: int
flood_timer: int
autolimit: int
autolimit_amount: int
autolimit_interval: int
def __init__(self, ircInstance: 'Irc') -> None:
# Module name (Mandatory)
@@ -81,6 +47,9 @@ class Defender():
# Add Reputation object to the module (Optional)
self.Reputation = ircInstance.Reputation
# Add module schemas
self.Schemas = schemas
# Create module commands (Mandatory)
self.Irc.build_command(0, self.module_name, 'code', 'Display the code or key for access')
self.Irc.build_command(1, self.module_name, 'info', 'Provide information about the channel or server')
@@ -97,7 +66,7 @@ class Defender():
self.__init_module()
# Log the module
self.Logs.debug(f'-- Module {self.module_name} loaded ...')
self.Logs.debug(f'-- Module {self.module_name} V2 loaded ...')
def __init_module(self) -> None:
@@ -179,23 +148,13 @@ class Defender():
def __load_module_configuration(self) -> None:
"""### Load Module Configuration
"""
try:
# Variable qui va contenir les options de configuration du module Defender
self.ModConfig = self.ModConfModel(
reputation=0, reputation_timer=1, reputation_seuil=26, reputation_score_after_release=27,
reputation_ban_all_chan=0,reputation_sg=1,
local_scan=0, psutil_scan=0, abuseipdb_scan=0, freeipapi_scan=0, cloudfilt_scan=0,
flood=0, flood_message=5, flood_time=1, flood_timer=20,
autolimit=1, autolimit_amount=3, autolimit_interval=3
)
# Variable qui va contenir les options de configuration du module Defender
self.ModConfig = self.Schemas.ModConfModel()
# Sync the configuration with core configuration (Mandatory)
self.Base.db_sync_core_config(self.module_name, self.ModConfig)
# Sync the configuration with core configuration (Mandatory)
self.Base.db_sync_core_config(self.module_name, self.ModConfig)
return None
except TypeError as te:
self.Logs.critical(te)
return None
def __update_configuration(self, param_key: str, param_value: str):
@@ -415,8 +374,6 @@ class Defender():
Args:
action (str): _description_
timer (int): _description_
nickname (str): _description_
channel (str): _description_
Returns:
@@ -973,10 +930,17 @@ class Defender():
def cmd(self, data: list[str]) -> None:
try:
service_id = self.Config.SERVICE_ID # Defender serveur id
cmd = list(data).copy()
if not data or len(data) < 2:
return None
match cmd[1]:
service_id = self.Config.SERVICE_ID
p = self.Protocol
cmd = data.copy() if isinstance(data, list) else list(data).copy()
index, command = p.get_ircd_protocol_poisition(cmd)
if index == -1:
return None
match command:
case 'REPUTATION':
# :001 REPUTATION 8.8.8.8 118
@@ -994,11 +958,6 @@ class Defender():
except IndexError as ie:
self.Logs.error(f'cmd reputation: index error: {ie}')
if len(cmd) < 3:
return None
match cmd[2]:
case 'MODE':
# ['...', ':001XSCU0Q', 'MODE', '#jail', '+b', '~security-group:unknown-users']
# ['@unrealircd.org/...', ':001C0MF01', 'MODE', '#services', '+l', '1']
@@ -1011,19 +970,22 @@ class Defender():
if self.ModConfig.autolimit == 1:
if mode == '+l' or mode == '-l':
chan = self.Channel.get_Channel(channel)
self.Protocol.send2socket(f":{self.Config.SERVICE_ID} MODE {chan.name} +l {len(chan.uids) + self.ModConfig.autolimit_amount}")
p.send2socket(f":{self.Config.SERVICE_ID} MODE {chan.name} +l {len(chan.uids) + self.ModConfig.autolimit_amount}")
if self.Config.SALON_JAIL == channel:
if mode == '+b' and group_to_unban in group_to_check:
self.Protocol.send2socket(f":{service_id} MODE {self.Config.SALON_JAIL} -b ~security-group:unknown-users")
self.Protocol.send2socket(f":{service_id} MODE {self.Config.SALON_JAIL} -eee ~security-group:webirc-users ~security-group:known-users ~security-group:websocket-users")
p.send2socket(f":{service_id} MODE {self.Config.SALON_JAIL} -b ~security-group:unknown-users")
p.send2socket(f":{service_id} MODE {self.Config.SALON_JAIL} -eee ~security-group:webirc-users ~security-group:known-users ~security-group:websocket-users")
return None
case 'PRIVMSG':
cmd.pop(0)
user_trigger = str(cmd[0]).replace(':','')
channel = cmd[2]
# ['@mtag....',':python', 'PRIVMSG', '#defender', ':zefzefzregreg', 'regg', 'aerg']
user_trigger = str(cmd[1]).replace(':','')
channel = cmd[3]
find_nickname = self.User.get_nickname(user_trigger)
self.flood(find_nickname, channel)
return None
case 'UID':
# If Init then do nothing
@@ -1036,6 +998,10 @@ class Defender():
# Get User information
_User = self.User.get_User(str(cmd[7]))
if _User is None:
self.Logs.critical(f'This UID: [{cmd[7]}] is not available please check why')
return None
# If user is not service or IrcOp then scan them
if not re.match(r'^.*[S|o?].*$', _User.umodes):
self.abuseipdb_UserModel.append(_User) if self.ModConfig.abuseipdb_scan == 1 and _User.remote_ip not in self.Config.WHITELISTED_IP else None
@@ -1044,10 +1010,6 @@ class Defender():
self.psutil_UserModel.append(_User) if self.ModConfig.psutil_scan == 1 and _User.remote_ip not in self.Config.WHITELISTED_IP else None
self.localscan_UserModel.append(_User) if self.ModConfig.local_scan == 1 and _User.remote_ip not in self.Config.WHITELISTED_IP else None
if _User is None:
self.Logs.critical(f'This UID: [{cmd[7]}] is not available please check why')
return None
reputation_flag = self.ModConfig.reputation
reputation_seuil = self.ModConfig.reputation_seuil
@@ -1058,12 +1020,8 @@ class Defender():
# currentDateTime = self.Base.get_datetime()
self.Reputation.insert(
self.Loader.Definition.MReputation(
**_User.__dict__,
**_User.to_dict(),
secret_code=self.Base.get_random(8)
# uid=_User.uid, nickname=_User.nickname, username=_User.username, realname=_User.realname,
# hostname=_User.hostname, umodes=_User.umodes, vhost=_User.vhost, ip=_User.remote_ip, score=_User.score_connexion,
# secret_code=self.Base.get_random(8), isWebirc=_User.isWebirc, isWebsocket=_User.isWebsocket, connected_datetime=currentDateTime,
# updated_datetime=currentDateTime
)
)
if self.Reputation.is_exist(_User.uid):
@@ -1071,44 +1029,48 @@ class Defender():
self.system_reputation(_User.uid)
self.Logs.info('Démarrer le systeme de reputation')
return None
case 'SJOIN':
# ['@msgid=F9B7JeHL5pj9nN57cJ5pEr;time=2023-12-28T20:47:24.305Z', ':001', 'SJOIN', '1702138958', '#welcome', ':0015L1AHL']
# ['@msgid=F9B7JeHL5pj9nN57cJ5pEr..', ':001', 'SJOIN', '1702138958', '#welcome', ':0015L1AHL']
try:
cmd.pop(0)
parsed_chan = cmd[3] if self.Channel.Is_Channel(cmd[3]) else None
parsed_chan = cmd[4] if self.Channel.Is_Channel(cmd[4]) else None
parsed_UID = self.User.clean_uid(cmd[5])
if self.ModConfig.reputation == 1:
parsed_UID = self.User.clean_uid(cmd[4])
get_reputation = self.Reputation.get_Reputation(parsed_UID)
if parsed_chan != self.Config.SALON_JAIL:
self.Protocol.send2socket(f":{service_id} MODE {parsed_chan} +b ~security-group:unknown-users")
self.Protocol.send2socket(f":{service_id} MODE {parsed_chan} +eee ~security-group:webirc-users ~security-group:known-users ~security-group:websocket-users")
p.send2socket(f":{service_id} MODE {parsed_chan} +b ~security-group:unknown-users")
p.send2socket(f":{service_id} MODE {parsed_chan} +eee ~security-group:webirc-users ~security-group:known-users ~security-group:websocket-users")
if get_reputation is not None:
isWebirc = get_reputation.isWebirc
if not isWebirc:
if parsed_chan != self.Config.SALON_JAIL:
self.Protocol.send_sapart(nick_to_sapart=get_reputation.nickname, channel_name=parsed_chan)
p.send_sapart(nick_to_sapart=get_reputation.nickname, channel_name=parsed_chan)
if self.ModConfig.reputation_ban_all_chan == 1 and not isWebirc:
if parsed_chan != self.Config.SALON_JAIL:
self.Protocol.send2socket(f":{service_id} MODE {parsed_chan} +b {get_reputation.nickname}!*@*")
self.Protocol.send2socket(f":{service_id} KICK {parsed_chan} {get_reputation.nickname}")
p.send2socket(f":{service_id} MODE {parsed_chan} +b {get_reputation.nickname}!*@*")
p.send2socket(f":{service_id} KICK {parsed_chan} {get_reputation.nickname}")
self.Logs.debug(f'SJOIN parsed_uid : {parsed_UID}')
return None
except KeyError as ke:
self.Logs.error(f"key error SJOIN : {ke}")
case 'SLOG':
# self.Base.scan_ports(cmd[7])
cmd.pop(0)
['@unrealircd...', ':001', 'SLOG', 'info', 'blacklist', 'BLACKLIST_HIT', ':[Blacklist]', 'IP', '162.x.x.x', 'matches', 'blacklist', 'dronebl', '(dnsbl.dronebl.org/reply=6)']
if not self.Base.is_valid_ip(cmd[7]):
if not self.Base.is_valid_ip(cmd[8]):
return None
return None
# if self.ModConfig.local_scan == 1 and not cmd[7] in self.Config.WHITELISTED_IP:
# self.localscan_remote_ip.append(cmd[7])
@@ -1125,11 +1087,10 @@ class Defender():
# self.cloudfilt_remote_ip.append(cmd[7])
case 'NICK':
# :0010BS24L NICK [NEWNICK] 1697917711
# ['@unrealircd.org...', ':001MZQ0RB', 'NICK', 'halow', '1754663712']
# Changement de nickname
try:
cmd.pop(0)
uid = str(cmd[0]).replace(':','')
uid = self.User.clean_uid(str(cmd[1]))
get_Reputation = self.Reputation.get_Reputation(uid)
jail_salon = self.Config.SALON_JAIL
service_id = self.Config.SERVICE_ID
@@ -1140,38 +1101,41 @@ class Defender():
# Update the new nickname
oldnick = get_Reputation.nickname
newnickname = cmd[2]
newnickname = cmd[3]
get_Reputation.nickname = newnickname
# If ban in all channel is ON then unban old nickname an ban the new nickname
if self.ModConfig.reputation_ban_all_chan == 1:
for chan in self.Channel.UID_CHANNEL_DB:
if chan.name != jail_salon:
self.Protocol.send2socket(f":{service_id} MODE {chan.name} -b {oldnick}!*@*")
self.Protocol.send2socket(f":{service_id} MODE {chan.name} +b {newnickname}!*@*")
p.send2socket(f":{service_id} MODE {chan.name} -b {oldnick}!*@*")
p.send2socket(f":{service_id} MODE {chan.name} +b {newnickname}!*@*")
return None
except KeyError as ke:
self.Logs.error(f'cmd - NICK - KeyError: {ke}')
case 'QUIT':
# :001N1WD7L QUIT :Quit: free_znc_1
cmd.pop(0)
# ['@unrealircd.org...', ':001MZQ0RB', 'QUIT', ':Quit:', '....']
ban_all_chan = self.Base.int_if_possible(self.ModConfig.reputation_ban_all_chan)
user_id = str(cmd[0]).replace(':','')
final_UID = user_id
final_UID = self.User.clean_uid(str(cmd[1]))
jail_salon = self.Config.SALON_JAIL
service_id = self.Config.SERVICE_ID
get_user_reputation = self.Reputation.get_Reputation(final_UID)
if get_user_reputation is not None:
final_nickname = get_user_reputation.nickname
for chan in self.Channel.UID_CHANNEL_DB:
if chan.name != jail_salon and ban_all_chan == 1:
self.Protocol.send2socket(f":{service_id} MODE {chan.name} -b {final_nickname}!*@*")
p.send2socket(f":{service_id} MODE {chan.name} -b {final_nickname}!*@*")
self.Reputation.delete(final_UID)
return None
case _:
return None
except KeyError as ke:
self.Logs.error(f"{ke} / {cmd} / length {str(len(cmd))}")
except IndexError as ie:
@@ -1183,8 +1147,7 @@ class Defender():
command = str(cmd[0]).lower()
fromuser = user
fromchannel = channel if self.Channel.Is_Channel(channel) else None
channel = fromchannel
channel = fromchannel = channel if self.Channel.Is_Channel(channel) else None
dnickname = self.Config.SERVICE_NICKNAME # Defender nickname
dchanlog = self.Config.SERVICE_CHANLOG # Defender chan log
@@ -1816,16 +1779,17 @@ class Defender():
case 'sentinel':
# .sentinel on
activation = str(cmd[1]).lower()
service_id = self.Config.SERVICE_ID
channel_to_dont_quit = [self.Config.SALON_JAIL, self.Config.SERVICE_CHANLOG]
if activation == 'on':
for chan in self.Channel.UID_CHANNEL_DB:
if chan.name not in channel_to_dont_quit:
self.Protocol.send_join_chan(uidornickname=dnickname, channel=chan.name)
return None
if activation == 'off':
for chan in self.Channel.UID_CHANNEL_DB:
if chan.name not in channel_to_dont_quit:
self.Protocol.part(uidornickname=dnickname, channel=chan.name)
self.Protocol.send_part_chan(uidornickname=dnickname, channel=chan.name)
self.join_saved_channels()
return None

22
mods/defender/schemas.py Normal file
View File

@@ -0,0 +1,22 @@
from core.definition import MainModel, dataclass
@dataclass
class ModConfModel(MainModel):
reputation: int = 0
reputation_timer: int = 1
reputation_seuil: int = 26
reputation_score_after_release: int = 27
reputation_ban_all_chan: int = 0
reputation_sg: int = 1
local_scan: int = 0
psutil_scan: int = 0
abuseipdb_scan: int = 0
freeipapi_scan: int = 0
cloudfilt_scan: int = 0
flood: int = 0
flood_message: int = 5
flood_time: int = 1
flood_timer: int = 20
autolimit: int = 0
autolimit_amount: int = 3
autolimit_interval: int = 3

0
mods/defender/utils.py Normal file
View File

View File

@@ -3,7 +3,7 @@
"requests": "2.32.3",
"psutil": "6.0.0",
"unrealircd_rpc_py": "2.0.0",
"unrealircd_rpc_py": "2.0.1",
"sqlalchemy": "2.0.35",
"faker": "30.1.0"
}