Merge pull request #87 from adator85/v6.2.1

Adding sasl authentication
This commit is contained in:
adator
2025-08-24 03:11:33 +02:00
committed by GitHub
12 changed files with 391 additions and 59 deletions

View File

@@ -537,6 +537,7 @@ class Base:
hostname TEXT,
vhost TEXT,
password TEXT,
fingerprint TEXT,
level INTEGER
)
'''

View File

@@ -1,3 +1,4 @@
from base64 import b64decode
from re import match, findall, search
from datetime import datetime
from typing import TYPE_CHECKING, Optional
@@ -5,7 +6,8 @@ from ssl import SSLEOFError, SSLError
if TYPE_CHECKING:
from core.irc import Irc
from core.definition import MClient
from core.classes.sasl import Sasl
from core.definition import MClient, MSasl
class Unrealircd6:
@@ -23,7 +25,7 @@ class Unrealircd6:
self.known_protocol: set[str] = {'SJOIN', 'UID', 'MD', 'QUIT', 'SQUIT',
'EOS', 'PRIVMSG', 'MODE', 'UMODE2',
'VERSION', 'REPUTATION', 'SVS2MODE',
'SLOG', 'NICK', 'PART', 'PONG',
'SLOG', 'NICK', 'PART', 'PONG', 'SASL',
'PROTOCTL', 'SERVER', 'SMOD', 'TKL', 'NETINFO',
'006', '007', '018'}
@@ -398,7 +400,7 @@ class Unrealircd6:
try:
self.send2socket(f":{self.__Irc.Config.SERVEUR_LINK} SVSLOGIN {self.__Settings.MAIN_SERVER_HOSTNAME} {client_uid} {user_account}")
except Exception as err:
self.__Irc.Logs.error(f'General Error: {err}')
self.__Logs.error(f'General Error: {err}')
def send_svslogout(self, client_obj: 'MClient') -> None:
"""Logout a client from his account
@@ -413,7 +415,7 @@ class Unrealircd6:
self.send_svs2mode(c_nickname, '-r')
except Exception as err:
self.__Irc.Logs.error(f'General Error: {err}')
self.__Logs.error(f'General Error: {err}')
def send_quit(self, uid: str, reason: str, print_log: True) -> None:
"""Send quit message
@@ -898,10 +900,10 @@ class Unrealircd6:
# Possibilité de déclancher les bans a ce niveau.
except IndexError as ie:
self.Logs.error(f'Index Error {__name__}: {ie}')
self.__Logs.error(f'Index Error {__name__}: {ie}')
except ValueError as ve:
self.__Irc.first_score = 0
self.Logs.error(f'Value Error {__name__}: {ve}')
self.__Logs.error(f'Value Error {__name__}: {ve}')
except Exception as err:
self.__Logs.error(f"{__name__} - General Error: {err}")
@@ -938,6 +940,11 @@ class Unrealircd6:
pattern = r'^.*geoip=cc=(\S{2}).*$'
geoip_match = match(pattern, serverMsg[0])
# Extract Fingerprint information
pattern = r'^.*certfp=([^;]+).*$'
fp_match = match(pattern, serverMsg[0])
fingerprint = fp_match.group(1) if fp_match else None
if geoip_match:
geoip = geoip_match.group(1)
else:
@@ -954,6 +961,7 @@ class Unrealircd6:
hostname=hostname,
umodes=umodes,
vhost=vhost,
fingerprint=fingerprint,
isWebirc=isWebirc,
isWebsocket=isWebsocket,
remote_ip=remote_ip,
@@ -1086,7 +1094,7 @@ class Unrealircd6:
sCopy = serverMsg.copy()
self.__Irc.Settings.MAIN_SERVER_HOSTNAME = sCopy[1]
except Exception as err:
self.__Irc.Logs.error(f'General Error: {err}')
self.__Logs.error(f'General Error: {err}')
def on_version(self, serverMsg: list[str]) -> None:
"""Sending Server Version to the server
@@ -1201,3 +1209,95 @@ class Unrealircd6:
except Exception as err:
self.__Logs.error(f"{__name__} - General Error: {err}")
def on_smod(self, serverMsg: list[str]) -> None:
"""Handle SMOD message coming from the server
Args:
serverMsg (list[str]): Original server message
"""
try:
# [':001', 'SMOD', ':L:history_backend_mem:2.0', 'L:channeldb:1.0', 'L:tkldb:1.10', 'L:staff:3.8', 'L:ircops:3.71', ...]
sCopy = serverMsg.copy()
modules = [m.lstrip(':') for m in sCopy[2:]]
for smod in modules:
smod_split = smod.split(':')
sModObj = self.__Irc.Loader.Definition.MSModule(type=smod_split[0], name=smod_split[1], version=smod_split[2])
self.__Settings.SMOD_MODULES.append(sModObj)
except Exception as err:
self.__Logs.error(f'General Error: {err}')
def on_sasl(self, serverMsg: list[str], psasl: 'Sasl') -> Optional['MSasl']:
"""Handle SASL coming from a server
Args:
serverMsg (list[str]): Original server message
psasl (Sasl): The SASL process object
"""
try:
# [':irc.local.org', 'SASL', 'defender-dev.deb.biz.st', '00157Z26U', 'H', '172.18.128.1', '172.18.128.1']
# [':irc.local.org', 'SASL', 'defender-dev.deb.biz.st', '00157Z26U', 'S', 'PLAIN']
# [':irc.local.org', 'SASL', 'defender-dev.deb.biz.st', '0014ZZH1F', 'S', 'EXTERNAL', 'zzzzzzzkey']
# [':irc.local.org', 'SASL', 'defender-dev.deb.biz.st', '00157Z26U', 'C', 'sasakey==']
# [':irc.local.org', 'SASL', 'defender-dev.deb.biz.st', '00157Z26U', 'D', 'A']
sasl_enabled = False
for smod in self.__Settings.SMOD_MODULES:
if smod.name == 'sasl':
sasl_enabled = True
break
if not sasl_enabled:
return None
sCopy = serverMsg.copy()
client_uid = sCopy[3] if len(sCopy) >= 6 else None
sasl_obj = None
sasl_message_type = sCopy[4] if len(sCopy) >= 6 else None
psasl.insert_sasl_client(self.__Irc.Loader.Definition.MSasl(client_uid=client_uid))
sasl_obj = psasl.get_sasl_obj(client_uid)
if sasl_obj is None:
return None
match sasl_message_type:
case 'H':
sasl_obj.remote_ip = str(sCopy[5])
sasl_obj.message_type = sasl_message_type
return sasl_obj
case 'S':
sasl_obj.message_type = sasl_message_type
if str(sCopy[5]) in ['PLAIN', 'EXTERNAL']:
sasl_obj.mechanisme = str(sCopy[5])
if sasl_obj.mechanisme == "PLAIN":
self.send2socket(f":{self.__Config.SERVEUR_LINK} SASL {self.__Settings.MAIN_SERVER_HOSTNAME} {sasl_obj.client_uid} C +")
elif sasl_obj.mechanisme == "EXTERNAL":
if str(sCopy[5]) == "+":
return None
sasl_obj.fingerprint = str(sCopy[6])
self.send2socket(f":{self.__Config.SERVEUR_LINK} SASL {self.__Settings.MAIN_SERVER_HOSTNAME} {sasl_obj.client_uid} C +")
return sasl_obj
case 'C':
if sasl_obj.mechanisme == "PLAIN":
credentials = sCopy[5]
decoded_credentials = b64decode(credentials).decode()
user, username, password = decoded_credentials.split('\0')
sasl_obj.message_type = sasl_message_type
sasl_obj.username = username
sasl_obj.password = password
return sasl_obj
elif sasl_obj.mechanisme == "EXTERNAL":
sasl_obj.message_type = sasl_message_type
return sasl_obj
except Exception as err:
self.__Logs.error(f'General Error: {err}', exc_info=True)

70
core/classes/sasl.py Normal file
View File

@@ -0,0 +1,70 @@
from typing import Optional, Union, TYPE_CHECKING
if TYPE_CHECKING:
from core.definition import MSasl
from core.loader import Loader
class Sasl:
DB_SASL: list['MSasl'] = []
def __init__(self, loader: 'Loader'):
self.Logs = loader.Logs # logger
def insert_sasl_client(self, psasl: 'MSasl') -> bool:
"""Insert a new Sasl authentication
Args:
new_user (UserModel): New userModel object
Returns:
bool: True if inserted
"""
if psasl is None:
return False
sasl_obj = self.get_sasl_obj(psasl.client_uid)
if sasl_obj is not None:
# User already created return False
return False
self.DB_SASL.append(psasl)
return True
def delete_sasl_client(self, client_uid: str) -> bool:
"""Delete the User starting from the UID
Args:
uid (str): UID of the user
Returns:
bool: True if deleted
"""
sasl_obj = self.get_sasl_obj(client_uid)
if sasl_obj is None:
return False
self.DB_SASL.remove(sasl_obj)
return True
def get_sasl_obj(self, client_uid: str) -> Optional['MSasl']:
"""Get sasl client Object model
Args:
client_uid (str): UID of the client
Returns:
UserModel|None: The SASL Object | None
"""
for record in self.DB_SASL:
if record.client_uid == client_uid:
return record
return None

View File

@@ -3,6 +3,7 @@
from threading import Timer, Thread, RLock
from socket import socket
from typing import Any, Optional
from core.definition import MSModule
class Settings:
"""This Class will never be reloaded.
@@ -22,6 +23,9 @@ class Settings:
PROTOCTL_USER_MODES: list[str] = []
PROTOCTL_PREFIX: list[str] = []
SMOD_MODULES: list[MSModule] = []
"""List contains all Server modules"""
__CACHE: dict[str, Any] = {}
"""Use set_cache or get_cache instead"""
@@ -40,11 +44,16 @@ class Settings:
def get_cache(self, key) -> Optional[Any]:
"""It returns the value associated to the key and finally it removes the entry"""
if self.__CACHE.get(key):
if self.__CACHE.get(key, None) is not None:
return self.__CACHE.pop(key)
return None
def get_cache_size(self) -> int:
return len(self.__CACHE)
def clear_cache(self) -> None:
self.__CACHE.clear()
def show_cache(self) -> dict[str, Any]:
return self.__CACHE.copy()

View File

@@ -30,6 +30,7 @@ class MClient(MainModel):
hostname: str = None
umodes: str = None
vhost: str = None
fingerprint: str = None
isWebirc: bool = False
isWebsocket: bool = False
remote_ip: str = None
@@ -48,6 +49,7 @@ class MUser(MainModel):
hostname: str = None
umodes: str = None
vhost: str = None
fingerprint: str = None
isWebirc: bool = False
isWebsocket: bool = False
remote_ip: str = None
@@ -60,12 +62,14 @@ class MAdmin(MainModel):
"""Model Admin"""
uid: str = None
account: str = None
nickname: str = None
username: str = None
realname: str = None
hostname: str = None
umodes: str = None
vhost: str = None
fingerprint: str = None
isWebirc: bool = False
isWebsocket: bool = False
remote_ip: str = None
@@ -84,6 +88,7 @@ class MReputation(MainModel):
hostname: str = None
umodes: str = None
vhost: str = None
fingerprint: str = None
isWebirc: bool = False
isWebsocket: bool = False
remote_ip: str = None
@@ -334,4 +339,25 @@ class MCommand(MainModel):
class MModule(MainModel):
module_name: str = None
class_name: str = None
class_instance: Optional[Any] = None
class_instance: Optional[Any] = None
@dataclass
class MSModule:
"""Server Modules model"""
name: str = None
version: str = None
type: str = None
@dataclass
class MSasl(MainModel):
"""Sasl model"""
remote_ip: Optional[str] = None
mechanisme: Optional[str] = None
message_type: Optional[str] = None
client_uid: Optional[str] = None
username: Optional[str] = None
password: Optional[str] = None
fingerprint: Optional[str] = None
auth_success: bool = False
level: int = 0

View File

@@ -2,17 +2,18 @@ import sys
import socket
import ssl
import re
import importlib
import time
import traceback
from ssl import SSLSocket
from datetime import datetime, timedelta
from typing import Optional, Union
from typing import TYPE_CHECKING, Any, Optional, Union
from core.classes import rehash
from core.loader import Loader
from core.classes.protocol import Protocol
from core.classes.commands import Command
if TYPE_CHECKING:
from core.definition import MSasl
class Irc:
_instance = None
@@ -77,6 +78,9 @@ class Irc:
# Use Module Utils
self.ModuleUtils = self.Loader.ModuleUtils
# Use Main Sasl module
self.Sasl = self.Loader.Sasl
self.autolimit_started: bool = False
"""This variable is to make sure the thread is not running"""
@@ -111,11 +115,14 @@ class Irc:
self.build_command(2, 'core', 'show_clients', 'Display a list of connected clients')
self.build_command(2, 'core', 'show_admins', 'Display a list of administrators')
self.build_command(2, 'core', 'show_configuration', 'Display the current configuration settings')
self.build_command(2, 'core', 'show_cache', 'Display the current cache')
self.build_command(2, 'core', 'clear_cache', 'Clear the cache!')
self.build_command(3, 'core', 'quit', 'Disconnect the bot or user from the server.')
self.build_command(3, 'core', 'restart', 'Restart the bot or service.')
self.build_command(3, 'core', 'addaccess', 'Add a user or entity to an access list with specific permissions.')
self.build_command(3, 'core', 'editaccess', 'Modify permissions for an existing user or entity in the access list.')
self.build_command(3, 'core', 'delaccess', 'Remove a user or entity from the access list.')
self.build_command(3, 'core', 'cert', 'Append your new fingerprint to your account!')
self.build_command(4, 'core', 'rehash', 'Reload the configuration file without restarting')
self.build_command(4, 'core', 'raw', 'Send a raw command directly to the IRC server')
@@ -300,6 +307,51 @@ class Irc:
return None
def on_sasl_authentication_process(self, sasl_model: 'MSasl') -> bool:
s = sasl_model
if sasl_model:
def db_get_admin_info(*, username: Optional[str] = None, password: Optional[str] = None, fingerprint: Optional[str] = None) -> Optional[dict[str, Any]]:
if fingerprint:
mes_donnees = {'fingerprint': fingerprint}
query = f"SELECT user, level FROM {self.Config.TABLE_ADMIN} WHERE fingerprint = :fingerprint"
else:
mes_donnees = {'user': username, 'password': self.Utils.hash_password(password)}
query = f"SELECT user, level FROM {self.Config.TABLE_ADMIN} WHERE user = :user AND password = :password"
result = self.Base.db_execute_query(query, mes_donnees)
user_from_db = result.fetchone()
if user_from_db:
return {'user': user_from_db[0], 'level': user_from_db[1]}
else:
return None
if s.message_type == 'C' and s.mechanisme == 'PLAIN':
# Connection via PLAIN
admin_info = db_get_admin_info(username=s.username, password=s.password)
if admin_info is not None:
s.auth_success = True
s.level = admin_info.get('level', 0)
self.Protocol.send2socket(f":{self.Config.SERVEUR_LINK} SASL {self.Settings.MAIN_SERVER_HOSTNAME} {s.client_uid} D S")
self.Protocol.send2socket(f":{self.Config.SERVEUR_LINK} 903 {s.username} :SASL authentication successful")
else:
self.Protocol.send2socket(f":{self.Config.SERVEUR_LINK} SASL {self.Settings.MAIN_SERVER_HOSTNAME} {s.client_uid} D F")
self.Protocol.send2socket(f":{self.Config.SERVEUR_LINK} 904 {s.username} :SASL authentication failed")
elif s.message_type == 'S' and s.mechanisme == 'EXTERNAL':
# Connection using fingerprints
admin_info = db_get_admin_info(fingerprint=s.fingerprint)
if admin_info is not None:
s.auth_success = True
s.level = admin_info.get('level', 0)
s.username = admin_info.get('user', None)
self.Protocol.send2socket(f":{self.Config.SERVEUR_LINK} SASL {self.Settings.MAIN_SERVER_HOSTNAME} {s.client_uid} D S")
self.Protocol.send2socket(f":{self.Config.SERVEUR_LINK} 903 {s.username} :SASL authentication successful")
else:
# "904 <nick> :SASL authentication failed"
self.Protocol.send2socket(f":{self.Config.SERVEUR_LINK} SASL {self.Settings.MAIN_SERVER_HOSTNAME} {s.client_uid} D F")
self.Protocol.send2socket(f":{self.Config.SERVEUR_LINK} 904 {s.username} :SASL authentication failed")
def __create_table(self) -> None:
"""## Create core tables
"""
@@ -328,7 +380,7 @@ class Irc:
time.sleep(beat)
self.Base.execute_periodic_action()
def insert_db_admin(self, uid:str, level:int) -> None:
def insert_db_admin(self, uid: str, account: str, level: int) -> None:
user_obj = self.User.get_user(uid)
if user_obj is None:
return None
@@ -336,6 +388,7 @@ class Irc:
self.Admin.insert(
self.Loader.Definition.MAdmin(
**user_obj.to_dict(),
account=account,
level=int(level)
)
)
@@ -428,7 +481,7 @@ class Irc:
self.Logs.warning(f'Size ({str(len(original_response))}) - {original_response}')
return None
self.Logs.debug(self.Utils.hide_sensitive_data(original_response))
self.Logs.debug(f">> {self.Utils.hide_sensitive_data(original_response)}")
parsed_protocol = self.Protocol.parse_server_msg(original_response.copy())
match parsed_protocol:
@@ -449,6 +502,30 @@ class Irc:
self.Protocol.on_uid(serverMsg=original_response)
for module in self.ModuleUtils.model_get_loaded_modules().copy():
module.class_instance.cmd(original_response)
# SASL authentication
# ['@s2s-md/..', ':001', 'UID', 'adator__', '0', '1755987444', '...', 'desktop-h1qck20.mshome.net', '001XLTT0U', '0', '+iwxz', '*', 'Clk-EC2256B2.mshome.net', 'rBKAAQ==', ':...']
dnickname = self.Config.SERVICE_NICKNAME
dchanlog = self.Config.SERVICE_CHANLOG
uid = original_response[8]
nickname = original_response[3]
sasl_obj = self.Sasl.get_sasl_obj(uid)
if sasl_obj:
if sasl_obj.auth_success:
self.insert_db_admin(sasl_obj.client_uid, sasl_obj.username, sasl_obj.level)
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {self.Config.COLORS.green}SASL AUTH{self.Config.COLORS.nogc} ] - {nickname} ({sasl_obj.username}) est désormais connecté a {dnickname}",
channel=dchanlog)
self.Protocol.send_notice(nick_from=dnickname, nick_to=nickname, msg=f"Connexion a {dnickname} réussie!")
else:
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {self.Config.COLORS.red}SASL AUTH{self.Config.COLORS.nogc} ] - {nickname} a tapé un mauvais mot de pass pour le username ({sasl_obj.username})",
channel=dchanlog)
self.Protocol.send_notice(nick_from=dnickname, nick_to=nickname, msg=f"Mot de passe incorrecte")
# Delete sasl object!
self.Sasl.delete_sasl_client(uid)
return None
except Exception as err:
self.Logs.error(f'General Error: {err}')
@@ -482,6 +559,13 @@ class Irc:
case 'REPUTATION':
self.Protocol.on_reputation(serverMsg=original_response)
case 'SMOD':
self.Protocol.on_smod(original_response)
case 'SASL':
sasl_response = self.Protocol.on_sasl(original_response, self.Sasl)
self.on_sasl_authentication_process(sasl_response)
case 'SLOG': # TODO
self.Logs.debug(f"[!] TO HANDLE: {parsed_protocol}")
@@ -537,6 +621,10 @@ class Irc:
fromuser = self.User.get_nickname(user) # Nickname qui a lancé la commande
uid = self.User.get_uid(fromuser) # Récuperer le uid de l'utilisateur
RED = self.Config.COLORS.red
GREEN = self.Config.COLORS.green
NOGC = self.Config.COLORS.nogc
# Defender information
dnickname = self.Config.SERVICE_NICKNAME # Defender nickname
dchanlog = self.Config.SERVICE_CHANLOG # Defender chan log
@@ -560,7 +648,7 @@ class Irc:
try:
current_command = cmd[0]
self.Protocol.send_priv_msg(
msg=f'[ {self.Config.COLORS.red}{current_command}{self.Config.COLORS.black} ] - Accès Refusé à {self.User.get_nickname(fromuser)}',
msg=f'[ {RED}{current_command}{NOGC} ] - Accès Refusé à {self.User.get_nickname(fromuser)}',
nick_from=dnickname,
channel=dchanlog
)
@@ -581,7 +669,7 @@ class Irc:
self.delete_db_admin(uid_to_deauth)
self.Protocol.send_priv_msg(
msg=f"[ {self.Config.COLORS.red}{str(current_command).upper()} ]{self.Config.COLORS.black} - {self.User.get_nickname(fromuser)} est désormais déconnecter de {dnickname}",
msg=f"[ {RED}{str(current_command).upper()}{NOGC} ] - {self.User.get_nickname(fromuser)} est désormais déconnecter de {dnickname}",
nick_from=dnickname,
channel=dchanlog
)
@@ -645,7 +733,7 @@ class Irc:
if cmd_owner == config_owner and cmd_password == config_password:
self.Base.db_create_first_admin()
self.insert_db_admin(current_uid, 5)
self.insert_db_admin(current_uid, cmd_owner, 5)
self.Protocol.send_priv_msg(
msg=f"[ {self.Config.COLORS.green}{str(current_command).upper()} ]{self.Config.COLORS.black} - {self.User.get_nickname(fromuser)} est désormais connecté a {dnickname}",
nick_from=dnickname,
@@ -671,41 +759,50 @@ class Irc:
)
case 'auth':
# ['auth', 'adator', 'password']
if len(cmd) != 3:
# Syntax. !auth nickname password
if len(cmd) < 3:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} [nickname] [password]")
return None
current_command = cmd[0]
user_to_log = self.User.get_nickname(cmd[1])
user_to_log = cmd[1]
password = cmd[2]
current_client = self.User.get_user(fromuser)
admin_obj = self.Admin.get_admin(fromuser)
if fromuser != user_to_log:
# If the current nickname is different from the nickname you want to log in with
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Your current nickname is different from the nickname you want to log in with")
return False
if current_client is None:
# This case should never happen
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {GREEN}{str(current_command).upper()}{NOGC} ] - Nickname {fromuser} is trying to connect to defender wrongly",
channel=dchanlog)
return None
if admin_obj:
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {GREEN}{str(current_command).upper()}{NOGC} ] - You are already connected to {dnickname}",
channel=dchanlog)
return None
if user_to_log:
mes_donnees = {'user': user_to_log, 'password': self.Loader.Utils.hash_password(password)}
query = f"SELECT id, level FROM {self.Config.TABLE_ADMIN} WHERE user = :user AND password = :password"
result = self.Base.db_execute_query(query, mes_donnees)
user_from_db = result.fetchone()
if user_from_db:
uid_user = self.User.get_uid(user_to_log)
self.insert_db_admin(uid_user, user_from_db[1])
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {self.Config.COLORS.green}{str(current_command).upper()} ]{self.Config.COLORS.nogc} - {self.User.get_nickname(fromuser)} est désormais connecté a {dnickname}",
channel=dchanlog)
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Connexion a {dnickname} réussie!")
else:
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {self.Config.COLORS.red}{str(current_command).upper()} ]{self.Config.COLORS.nogc} - {self.User.get_nickname(fromuser)} a tapé un mauvais mot de pass",
channel=dchanlog)
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Mot de passe incorrecte")
mes_donnees = {'user': user_to_log, 'password': self.Loader.Utils.hash_password(password)}
query = f"SELECT id, user, level FROM {self.Config.TABLE_ADMIN} WHERE user = :user AND password = :password"
result = self.Base.db_execute_query(query, mes_donnees)
user_from_db = result.fetchone()
if user_from_db:
account = user_from_db[1]
level = user_from_db[2]
self.insert_db_admin(current_client.uid, account, level)
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {GREEN}{str(current_command).upper()}{NOGC} ] - {current_client.nickname} ({account}) est désormais connecté a {dnickname}",
channel=dchanlog)
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Connexion a {dnickname} réussie!")
return None
else:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"L'utilisateur {user_to_log} n'existe pas")
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {RED}{str(current_command).upper()}{NOGC} ] - {current_client.nickname} a tapé un mauvais mot de pass",
channel=dchanlog)
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Mot de passe incorrecte")
return None
case 'addaccess':
try:
@@ -838,6 +935,21 @@ class Irc:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Impossible de supprimer l'utilisateur.")
self.Logs.warning(f":{dnickname} NOTICE {fromuser} : Impossible de supprimer l'utilisateur.")
case 'cert':
# Syntax !cert
try:
admin_obj = self.Admin.get_admin(fromuser)
if admin_obj:
query = f'UPDATE {self.Config.TABLE_ADMIN} SET fingerprint = :fingerprint WHERE user = :user'
r = self.Base.db_execute_query(query, {'fingerprint': admin_obj.fingerprint, 'user': admin_obj.account})
if r.rowcount > 0:
self.Protocol.send_notice(dnickname, fromuser, f'[ {GREEN}CERT{NOGC} ] Your new fingerprint has been attached to your account. {admin_obj.fingerprint}')
else:
self.Protocol.send_notice(dnickname, fromuser, f'[ {RED}CERT{NOGC} ] Impossible to add your fingerprint.{admin_obj.fingerprint}')
except Exception as e:
self.Logs.error(e)
case 'register':
# Syntax. Register PASSWORD EMAIL
try:
@@ -1077,14 +1189,14 @@ class Irc:
self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg=f"{module} - {self.Config.COLORS.green}Loaded{self.Config.COLORS.nogc} by {loaded_user} on {loaded_datetime}"
msg=f"{module} - {GREEN}Loaded{NOGC} by {loaded_user} on {loaded_datetime}"
)
loaded = False
else:
self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg=f"{module} - {self.Config.COLORS.red}Not Loaded{self.Config.COLORS.nogc}"
msg=f"{module} - {RED}Not Loaded{NOGC}"
)
case 'show_timers':
@@ -1154,7 +1266,7 @@ class Irc:
self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg=f"UID : {db_admin.uid} - Nickname: {db_admin.nickname} - Level: {db_admin.level} - Connection: {db_admin.connexion_datetime}"
msg=f"UID : {db_admin.uid} - Nickname: {db_admin.nickname} - Account: {db_admin.account} - Level: {db_admin.level} - Connection: {db_admin.connexion_datetime}"
)
return None
@@ -1167,6 +1279,23 @@ class Irc:
)
return None
case 'show_cache':
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"The cache is currently contains {self.Settings.get_cache_size()} value(s).")
for key, value in self.Settings.show_cache().items():
self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg=f"Key : {key} - Value: {value}"
)
return None
case 'clear_cache':
cache_size = self.Settings.get_cache_size()
if cache_size > 0:
self.Settings.clear_cache()
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"{cache_size} value(s) has been cleared from the cache.")
return None
case 'uptime':
uptime = self.get_defender_uptime()
self.Protocol.send_notice(

View File

@@ -1,5 +1,5 @@
from logging import Logger
from core.classes import user, admin, client, channel, reputation, settings
from core.classes import user, admin, client, channel, reputation, settings, sasl
import core.logs as logs
import core.definition as df
import core.utils as utils
@@ -50,4 +50,6 @@ class Loader:
self.ModuleUtils: module_mod.Module = module_mod.Module(self)
self.Sasl: sasl.Sasl = sasl.Sasl(self)
self.Logs.debug("LOADER Success!")

View File

@@ -287,7 +287,7 @@ class Module:
Returns:
list[MModule]: A list of module model object
"""
self.__Logs.debug(f"[MODEL MODULE LOADED MODULES] {len(self.DB_MODULES)} modules found!")
# self.__Logs.debug(f"[MODEL MODULE LOADED MODULES] {len(self.DB_MODULES)} modules found!")
return self.DB_MODULES
def model_insert_module(self, module_model: MModule) -> bool:

View File

@@ -4,9 +4,10 @@ Main utils library.
import gc
import ssl
import socket
import sys
from pathlib import Path
from re import match, sub
import sys
from base64 import b64decode
from typing import Literal, Optional, Any, TYPE_CHECKING
from datetime import datetime, timedelta, timezone
from time import time

View File

@@ -178,7 +178,7 @@ class Clone:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone kill [all | group_name | nickname]")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone join [all | group_name | nickname] #channel")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone part [all | group_name | nickname] #channel")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone list")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone list [group name]")
return None
option = str(cmd[1]).lower()
@@ -356,7 +356,7 @@ class Clone:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone kill [all | group name | nickname]")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone join [all | group name | nickname] #channel")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone part [all | group name | nickname] #channel")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone list")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone list [group name]")
except IndexError as ie:
self.Logs.error(f'Index Error: {ie}')

View File

@@ -19,4 +19,4 @@ class MClone(MainModel):
remote_ip: str = '127.0.0.1'
group: str = 'Default'
DB_CLONES: list[MClone] = []
# DB_CLONES: list[MClone] = []

View File

@@ -188,12 +188,6 @@ class Defender:
"""Cette methode sera executée a chaque désactivation ou
rechargement de module
"""
self.Settings.set_cache('ABUSEIPDB', self.Schemas.DB_ABUSEIPDB_USERS)
self.Settings.set_cache('FREEIPAPI', self.Schemas.DB_FREEIPAPI_USERS)
self.Settings.set_cache('CLOUDFILT', self.Schemas.DB_CLOUDFILT_USERS)
self.Settings.set_cache('PSUTIL', self.Schemas.DB_PSUTIL_USERS)
self.Settings.set_cache('LOCALSCAN', self.Schemas.DB_LOCALSCAN_USERS)
self.Schemas.DB_ABUSEIPDB_USERS = []
self.Schemas.DB_FREEIPAPI_USERS = []
self.Schemas.DB_CLOUDFILT_USERS = []