* Remove all related to nickserv scope

* Remove Client from the loader (nickserv like)

* Fix issues coming from the feedback of dktmb
This commit is contained in:
adator
2025-12-16 01:00:36 +01:00
committed by GitHub
parent 54ea946da0
commit 91210f96ec
13 changed files with 71 additions and 567 deletions

View File

@@ -431,23 +431,27 @@ class Base:
self.running_iothreads.remove(id_obj) self.running_iothreads.remove(id_obj)
return result return result
def asynctask_done(self, task: Union[asyncio.Task, asyncio.Future]): def asynctask_done(self, task: Union[asyncio.Task, asyncio.Future], context: Optional[dict[str, Any]] = None):
"""Log task when done """Log task when done
Args: Args:
task (asyncio.Task): The Asyncio Task callback task (asyncio.Task): The Asyncio Task callback
""" """
name = task.get_name() if isinstance(task, asyncio.Task) else "Thread"
task_or_future = "Task" if isinstance(task, asyncio.Task) else "Future" if context:
print(context)
task_name = "Future" if isinstance(task, asyncio.Future) else task.get_name()
task_or_future = "Task"
try: try:
if task.exception(): if task.exception():
self.logs.error(f"[ASYNCIO] {task_or_future} {name} failed with exception: {task.exception()}") self.logs.error(f"[ASYNCIO] {task_or_future} {task_name} failed with exception: {task.exception()}")
else: else:
self.logs.debug(f"[ASYNCIO] {task_or_future} {name} completed successfully.") self.logs.debug(f"[ASYNCIO] {task_or_future} {task_name} completed successfully.")
except asyncio.CancelledError as ce: except asyncio.CancelledError as ce:
self.logs.debug(f"[ASYNCIO] {task_or_future} {name} terminated with cancelled error. {ce}") self.logs.debug(f"[ASYNCIO] {task_or_future} {task_name} terminated with cancelled error. {ce}")
except asyncio.InvalidStateError as ie: except asyncio.InvalidStateError as ie:
self.logs.debug(f"[ASYNCIO] {task_or_future} {name} terminated with invalid state error. {ie}") self.logs.debug(f"[ASYNCIO] {task_or_future} {task_name} terminated with invalid state error. {ie}")
def is_thread_alive(self, thread_name: str) -> bool: def is_thread_alive(self, thread_name: str) -> bool:
"""Check if the thread is still running! using the is_alive method of Threads. """Check if the thread is still running! using the is_alive method of Threads.
@@ -611,25 +615,10 @@ class Base:
) )
''' '''
table_core_client = f'''CREATE TABLE IF NOT EXISTS {self.Config.TABLE_CLIENT} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
createdOn TEXT,
account TEXT,
nickname TEXT,
hostname TEXT,
vhost TEXT,
realname TEXT,
email TEXT,
password TEXT,
level INTEGER
)
'''
await self.db_execute_query(table_core_log) await self.db_execute_query(table_core_log)
await self.db_execute_query(table_core_log_command) await self.db_execute_query(table_core_log_command)
await self.db_execute_query(table_core_module) await self.db_execute_query(table_core_module)
await self.db_execute_query(table_core_admin) await self.db_execute_query(table_core_admin)
await self.db_execute_query(table_core_client)
await self.db_execute_query(table_core_channel) await self.db_execute_query(table_core_channel)
await self.db_execute_query(table_core_config) await self.db_execute_query(table_core_config)

View File

@@ -3,7 +3,7 @@ from typing import Optional, TYPE_CHECKING
from core.classes.protocols.command_handler import CommandHandler from core.classes.protocols.command_handler import CommandHandler
if TYPE_CHECKING: if TYPE_CHECKING:
from core.definition import MClient, MSasl, MUser, MChannel from core.definition import MSasl, MUser, MChannel
from core.loader import Loader from core.loader import Loader
class IProtocol(ABC): class IProtocol(ABC):
@@ -224,11 +224,8 @@ class IProtocol(ABC):
""" """
@abstractmethod @abstractmethod
async def send_svslogout(self, client_obj: 'MClient') -> None: async def send_svslogout(self) -> None:
"""Logout a client from his account """Logout a client from his account
Args:
client_obj (MClient): The Client UID
""" """
@abstractmethod @abstractmethod

View File

@@ -67,15 +67,14 @@ class Admin:
Returns: Returns:
bool: True if the admin level has been updated bool: True if the admin level has been updated
""" """
admin_obj = self.get_admin(nickname)
if admin_obj:
# If the admin exist, update and do not go further
admin_obj.level = new_admin_level
self._ctx.Logs.debug(f'Admin ({admin_obj.nickname}) has been updated with new level {new_admin_level}')
return True
for record in self.UID_ADMIN_DB: self._ctx.Logs.debug(f'The new level {new_admin_level} was not updated in local variable, nickname = {nickname} is not logged in')
if record.nickname == nickname:
# If the admin exist, update and do not go further
record.level = new_admin_level
self._ctx.Logs.debug(f'Admin ({record.nickname}) has been updated with new level {new_admin_level}')
return True
self._ctx.Logs.debug(f'The new level {new_admin_level} was not updated, nickname = {nickname} - The Client is not an admin')
return False return False
@@ -94,7 +93,7 @@ class Admin:
self._ctx.Logs.debug(f'UID ({admin_obj.uid}) has been deleted') self._ctx.Logs.debug(f'UID ({admin_obj.uid}) has been deleted')
return True return True
self._ctx.Logs.debug(f'The UID {uidornickname} was not deleted') self._ctx.Logs.debug(f'The UID {uidornickname} was not deleted from the local variable (admin not connected)')
return False return False

View File

@@ -1,226 +0,0 @@
from re import sub
from typing import Any, Optional, Union, TYPE_CHECKING
if TYPE_CHECKING:
from core.loader import Loader
from core.definition import MClient
class Client:
CLIENT_DB: list['MClient'] = []
def __init__(self, loader: 'Loader'):
"""
Args:
loader (Loader): The Loader instance.
"""
self._ctx = loader
def insert(self, new_client: 'MClient') -> bool:
"""Insert a new User object
Args:
new_client (MClient): New Client object
Returns:
bool: True if inserted
"""
client_obj = self.get_client(new_client.uid)
if not client_obj is None:
# User already created return False
return False
self.CLIENT_DB.append(new_client)
return True
def update_nickname(self, uid: str, new_nickname: str) -> bool:
"""Update the nickname starting from the UID
Args:
uid (str): UID of the user
new_nickname (str): New nickname
Returns:
bool: True if updated
"""
user_obj = self.get_client(uidornickname=uid)
if user_obj is None:
return False
user_obj.nickname = new_nickname
return True
def update_mode(self, uidornickname: str, modes: str) -> bool:
"""Updating user mode
Args:
uidornickname (str): The UID or Nickname of the user
modes (str): new modes to update
Returns:
bool: True if user mode has been updaed
"""
response = True
user_obj = self.get_client(uidornickname=uidornickname)
if user_obj is None:
return False
action = modes[0]
new_modes = modes[1:]
existing_umodes = user_obj.umodes
umodes = user_obj.umodes
if action == '+':
for nm in new_modes:
if nm not in existing_umodes:
umodes += nm
elif action == '-':
for nm in new_modes:
if nm in existing_umodes:
umodes = umodes.replace(nm, '')
else:
return False
liste_umodes = list(umodes)
final_umodes_liste = [x for x in self._ctx.Base.Settings.PROTOCTL_USER_MODES if x in liste_umodes]
final_umodes = ''.join(final_umodes_liste)
user_obj.umodes = f"+{final_umodes}"
return response
def delete(self, uid: str) -> bool:
"""Delete the User starting from the UID
Args:
uid (str): UID of the user
Returns:
bool: True if deleted
"""
user_obj = self.get_client(uidornickname=uid)
if user_obj is None:
return False
self.CLIENT_DB.remove(user_obj)
return True
def get_client(self, uidornickname: str) -> Optional['MClient']:
"""Get The Client Object model
Args:
uidornickname (str): UID or Nickname
Returns:
UserModel|None: The UserModel Object | None
"""
for record in self.CLIENT_DB:
if record.uid == uidornickname:
return record
elif record.nickname == uidornickname:
return record
return None
def get_uid(self, uidornickname:str) -> Optional[str]:
"""Get the UID of the user starting from the UID or the Nickname
Args:
uidornickname (str): UID or Nickname
Returns:
str|None: Return the UID
"""
client_obj = self.get_client(uidornickname=uidornickname)
if client_obj is None:
return None
return client_obj.uid
def get_nickname(self, uidornickname:str) -> Union[str, None]:
"""Get the Nickname starting from UID or the nickname
Args:
uidornickname (str): UID or Nickname of the user
Returns:
str|None: the nickname
"""
client_obj = self.get_client(uidornickname=uidornickname)
if client_obj is None:
return None
return client_obj.nickname
def is_exist(self, uidornickname: str) -> bool:
"""Check if the UID or the nickname exist in the USER DB
Args:
uidornickname (str): The UID or the NICKNAME
Returns:
bool: True if exist
"""
user_obj = self.get_client(uidornickname=uidornickname)
if user_obj is None:
return False
return True
async def db_is_account_exist(self, account: str) -> bool:
"""Check if the account exist in the database
Args:
account (str): The account to check
Returns:
bool: True if exist
"""
table_client = self._ctx.Base.Config.TABLE_CLIENT
account_to_check = {'account': account.lower()}
account_to_check_query = await self._ctx.Base.db_execute_query(f"""
SELECT id FROM {table_client} WHERE LOWER(account) = :account
""", account_to_check)
account_to_check_result = account_to_check_query.fetchone()
if account_to_check_result:
self._ctx.Logs.error(f"Account ({account}) already exist")
return True
return False
def clean_uid(self, uid: str) -> Union[str, None]:
"""Clean UID by removing @ / % / + / ~ / * / :
Args:
uid (str): The UID to clean
Returns:
str: Clean UID without any sign
"""
pattern = fr'[:|@|%|\+|~|\*]*'
parsed_uid = sub(pattern, '', uid)
if not parsed_uid:
return None
return parsed_uid

View File

@@ -4,7 +4,7 @@ import sys
import threading import threading
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import core.module as module_mod import core.module as module_mod
from core.classes.modules import user, admin, client, channel, reputation, sasl from core.classes.modules import user, admin, channel, reputation, sasl
from core.utils import tr from core.utils import tr
if TYPE_CHECKING: if TYPE_CHECKING:
@@ -20,7 +20,6 @@ REHASH_MODULES = [
'core.classes.modules.commands', 'core.classes.modules.commands',
'core.classes.modules.user', 'core.classes.modules.user',
'core.classes.modules.admin', 'core.classes.modules.admin',
'core.classes.modules.client',
'core.classes.modules.channel', 'core.classes.modules.channel',
'core.classes.modules.reputation', 'core.classes.modules.reputation',
'core.classes.modules.sasl', 'core.classes.modules.sasl',
@@ -63,7 +62,6 @@ async def restart_service(uplink: 'Loader', reason: str = "Restarting with no re
uplink.ModuleUtils.model_clear() # Clear loaded modules. uplink.ModuleUtils.model_clear() # Clear loaded modules.
uplink.User.UID_DB.clear() # Clear User Object uplink.User.UID_DB.clear() # Clear User Object
uplink.Channel.UID_CHANNEL_DB.clear() # Clear Channel Object uplink.Channel.UID_CHANNEL_DB.clear() # Clear Channel Object
uplink.Client.CLIENT_DB.clear() # Clear Client object
uplink.Irc.Protocol.Handler.DB_IRCDCOMMS.clear() uplink.Irc.Protocol.Handler.DB_IRCDCOMMS.clear()
# Reload Service modules # Reload Service modules
@@ -77,7 +75,6 @@ async def rehash_service(uplink: 'Loader', nickname: str) -> None:
need_a_restart = ["SERVEUR_ID"] need_a_restart = ["SERVEUR_ID"]
uplink.Settings.set_cache('commands', uplink.Commands.DB_COMMANDS) uplink.Settings.set_cache('commands', uplink.Commands.DB_COMMANDS)
uplink.Settings.set_cache('users', uplink.User.UID_DB) uplink.Settings.set_cache('users', uplink.User.UID_DB)
uplink.Settings.set_cache('clients', uplink.Client.CLIENT_DB)
uplink.Settings.set_cache('admins', uplink.Admin.UID_ADMIN_DB) uplink.Settings.set_cache('admins', uplink.Admin.UID_ADMIN_DB)
uplink.Settings.set_cache('reputations', uplink.Reputation.UID_REPUTATION_DB) uplink.Settings.set_cache('reputations', uplink.Reputation.UID_REPUTATION_DB)
uplink.Settings.set_cache('channels', uplink.Channel.UID_CHANNEL_DB) uplink.Settings.set_cache('channels', uplink.Channel.UID_CHANNEL_DB)
@@ -132,7 +129,6 @@ async def rehash_service(uplink: 'Loader', nickname: str) -> None:
uplink.Base = uplink.BaseModule.Base(uplink) uplink.Base = uplink.BaseModule.Base(uplink)
uplink.User = user.User(uplink) uplink.User = user.User(uplink)
uplink.Client = client.Client(uplink)
uplink.Admin = admin.Admin(uplink) uplink.Admin = admin.Admin(uplink)
uplink.Channel = channel.Channel(uplink) uplink.Channel = channel.Channel(uplink)
uplink.Reputation = reputation.Reputation(uplink) uplink.Reputation = reputation.Reputation(uplink)
@@ -141,7 +137,6 @@ async def rehash_service(uplink: 'Loader', nickname: str) -> None:
# Backup data # Backup data
uplink.User.UID_DB = uplink.Settings.get_cache('users') uplink.User.UID_DB = uplink.Settings.get_cache('users')
uplink.Client.CLIENT_DB = uplink.Settings.get_cache('clients')
uplink.Admin.UID_ADMIN_DB = uplink.Settings.get_cache('admins') uplink.Admin.UID_ADMIN_DB = uplink.Settings.get_cache('admins')
uplink.Channel.UID_CHANNEL_DB = uplink.Settings.get_cache('channels') uplink.Channel.UID_CHANNEL_DB = uplink.Settings.get_cache('channels')
uplink.Reputation.UID_REPUTATION_DB = uplink.Settings.get_cache('reputations') uplink.Reputation.UID_REPUTATION_DB = uplink.Settings.get_cache('reputations')

View File

@@ -8,7 +8,7 @@ from core.classes.interfaces.iprotocol import IProtocol
from core.utils import tr from core.utils import tr
if TYPE_CHECKING: if TYPE_CHECKING:
from core.definition import MSasl, MClient, MUser, MChannel from core.definition import MSasl, MUser, MChannel
class Inspircd(IProtocol): class Inspircd(IProtocol):
@@ -562,7 +562,6 @@ class Inspircd(IProtocol):
uid = str(scopy[0]).replace(':','') uid = str(scopy[0]).replace(':','')
newnickname = scopy[2] newnickname = scopy[2]
self._ctx.User.update_nickname(uid, newnickname) self._ctx.User.update_nickname(uid, newnickname)
self._ctx.Client.update_nickname(uid, newnickname)
self._ctx.Admin.update_nickname(uid, newnickname) self._ctx.Admin.update_nickname(uid, newnickname)
return None return None
@@ -1334,15 +1333,12 @@ class Inspircd(IProtocol):
client_uid (str): Client UID client_uid (str): Client UID
user_account (str): The account of the user user_account (str): The account of the user
""" """
... pass
async def send_svslogout(self, client_obj: 'MClient') -> None: async def send_svslogout(self) -> None:
"""Logout a client from his account """Logout a client from his account
Args:
client_obj (MClient): The Client Object Model
""" """
... pass
async def send_svsmode(self, nickname: str, user_mode: str) -> None: async def send_svsmode(self, nickname: str, user_mode: str) -> None:
"""_summary_ """_summary_

View File

@@ -7,7 +7,7 @@ from core.classes.interfaces.iprotocol import IProtocol
from core.utils import is_coroutinefunction, tr from core.utils import is_coroutinefunction, tr
if TYPE_CHECKING: if TYPE_CHECKING:
from core.definition import MClient, MSasl, MUser, MChannel from core.definition import MSasl, MUser, MChannel
class Unrealircd6(IProtocol): class Unrealircd6(IProtocol):
@@ -469,18 +469,13 @@ class Unrealircd6(IProtocol):
except Exception as err: except Exception as err:
self._ctx.Logs.error(f'General Error: {err}') self._ctx.Logs.error(f'General Error: {err}')
async def send_svslogout(self, client_obj: 'MClient') -> None: async def send_svslogout(self) -> None:
"""Logout a client from his account """Logout a client from his account
Args:
client_obj (MClient): The Client object
""" """
try: try:
c_uid = client_obj.uid # await self.send2socket(f":{self._ctx.Config.SERVEUR_LINK} SVSLOGIN {self._ctx.Settings.MAIN_SERVER_HOSTNAME} {c_uid} 0")
c_nickname = client_obj.nickname # await self.send_svs2mode(c_nickname, '-r')
await self.send2socket(f":{self._ctx.Config.SERVEUR_LINK} SVSLOGIN {self._ctx.Settings.MAIN_SERVER_HOSTNAME} {c_uid} 0") pass
await self.send_svs2mode(c_nickname, '-r')
except Exception as err: except Exception as err:
self._ctx.Logs.error(f'General Error: {err}') self._ctx.Logs.error(f'General Error: {err}')
@@ -795,7 +790,6 @@ class Unrealircd6(IProtocol):
self._ctx.Channel.delete_user_from_all_channel(uid_who_quit) self._ctx.Channel.delete_user_from_all_channel(uid_who_quit)
self._ctx.User.delete(uid_who_quit) self._ctx.User.delete(uid_who_quit)
self._ctx.Client.delete(uid_who_quit)
self._ctx.Reputation.delete(uid_who_quit) self._ctx.Reputation.delete(uid_who_quit)
self._ctx.Admin.delete(uid_who_quit) self._ctx.Admin.delete(uid_who_quit)
@@ -879,7 +873,6 @@ class Unrealircd6(IProtocol):
uid = str(server_msg[1]).lstrip(':') uid = str(server_msg[1]).lstrip(':')
newnickname = server_msg[3] newnickname = server_msg[3]
self._ctx.User.update_nickname(uid, newnickname) self._ctx.User.update_nickname(uid, newnickname)
self._ctx.Client.update_nickname(uid, newnickname)
self._ctx.Admin.update_nickname(uid, newnickname) self._ctx.Admin.update_nickname(uid, newnickname)
self._ctx.Reputation.update(uid, newnickname) self._ctx.Reputation.update(uid, newnickname)

View File

@@ -28,26 +28,6 @@ class MainModel:
"""Return a list of attributes name""" """Return a list of attributes name"""
return [f.name for f in fields(self)] return [f.name for f in fields(self)]
@dataclass
class MClient(MainModel):
"""Model Client for registred nickname"""
uid: str = None
account: str = None
nickname: str = None
username: str = None
realname: str = None
hostname: str = None
umodes: str = None
vhost: str = None
fingerprint: str = None
tls_cipher: str = None
isWebirc: bool = False
isWebsocket: bool = False
remote_ip: str = None
score_connexion: int = 0
geoip: str = None
connexion_datetime: datetime = field(default=datetime.now())
@dataclass @dataclass
class MUser(MainModel): class MUser(MainModel):
"""Model User""" """Model User"""
@@ -283,9 +263,6 @@ class MConfig(MainModel):
LOGGING_NAME: str = "defender" LOGGING_NAME: str = "defender"
"""The name of the Logging instance""" """The name of the Logging instance"""
TABLE_CLIENT: str = "core_client"
"""Core Client table"""
TABLE_ADMIN: str = "core_admin" TABLE_ADMIN: str = "core_admin"
"""Core Admin table""" """Core Admin table"""

View File

@@ -2,14 +2,12 @@ import asyncio
import re import re
import ssl import ssl
import threading import threading
from datetime import datetime, timedelta from typing import TYPE_CHECKING, Optional, Union
from typing import TYPE_CHECKING, Any, Optional, Union
from core.classes.modules import rehash from core.classes.modules import rehash
from core.classes.interfaces.iprotocol import IProtocol from core.classes.interfaces.iprotocol import IProtocol
from core.utils import tr from core.utils import tr
if TYPE_CHECKING: if TYPE_CHECKING:
from core.definition import MSasl
from core.loader import Loader from core.loader import Loader
class Irc: class Irc:
@@ -66,18 +64,11 @@ class Irc:
self.ctx.Commands.build_command(0, 'core', 'copyright', 'Give some information about the IRC Service') self.ctx.Commands.build_command(0, 'core', 'copyright', 'Give some information about the IRC Service')
self.ctx.Commands.build_command(0, 'core', 'uptime', 'Give you since when the service is connected') self.ctx.Commands.build_command(0, 'core', 'uptime', 'Give you since when the service is connected')
self.ctx.Commands.build_command(0, 'core', 'firstauth', 'First authentication of the Service') self.ctx.Commands.build_command(0, 'core', 'firstauth', 'First authentication of the Service')
self.ctx.Commands.build_command(0, 'core', 'register', f'Register your nickname /msg {self.ctx.Config.SERVICE_NICKNAME} REGISTER <password> <email>')
self.ctx.Commands.build_command(0, 'core', 'identify', f'Identify yourself with your password /msg {self.ctx.Config.SERVICE_NICKNAME} IDENTIFY <account> <password>')
self.ctx.Commands.build_command(0, 'core', 'logout', 'Reverse the effect of the identify command')
self.ctx.Commands.build_command(1, 'core', 'load', 'Load an existing module')
self.ctx.Commands.build_command(1, 'core', 'unload', 'Unload a module')
self.ctx.Commands.build_command(1, 'core', 'reload', 'Reload a module')
self.ctx.Commands.build_command(1, 'core', 'deauth', 'Deauth from the irc service') self.ctx.Commands.build_command(1, 'core', 'deauth', 'Deauth from the irc service')
self.ctx.Commands.build_command(1, 'core', 'checkversion', 'Check the version of the irc service') self.ctx.Commands.build_command(1, 'core', 'checkversion', 'Check the version of the irc service')
self.ctx.Commands.build_command(2, 'core', 'show_modules', 'Display a list of loaded modules') self.ctx.Commands.build_command(2, 'core', 'show_modules', 'Display a list of loaded modules')
self.ctx.Commands.build_command(2, 'core', 'show_channels', 'Display a list of active channels') self.ctx.Commands.build_command(2, 'core', 'show_channels', 'Display a list of active channels')
self.ctx.Commands.build_command(2, 'core', 'show_users', 'Display a list of connected users') self.ctx.Commands.build_command(2, 'core', 'show_users', 'Display a list of connected users')
self.ctx.Commands.build_command(2, 'core', 'show_clients', 'Display a list of connected clients')
self.ctx.Commands.build_command(2, 'core', 'show_admins', 'Display a list of administrators') self.ctx.Commands.build_command(2, 'core', 'show_admins', 'Display a list of administrators')
self.ctx.Commands.build_command(2, 'core', 'show_configuration', 'Display the current configuration settings') self.ctx.Commands.build_command(2, 'core', 'show_configuration', 'Display the current configuration settings')
self.ctx.Commands.build_command(2, 'core', 'show_cache', 'Display the current cache') self.ctx.Commands.build_command(2, 'core', 'show_cache', 'Display the current cache')
@@ -96,6 +87,9 @@ class Irc:
self.ctx.Commands.build_command(4, 'core', 'show_asyncio', 'Display active asyncio') self.ctx.Commands.build_command(4, 'core', 'show_asyncio', 'Display active asyncio')
self.ctx.Commands.build_command(4, 'core', 'start_rpc', 'Start defender jsonrpc server') self.ctx.Commands.build_command(4, 'core', 'start_rpc', 'Start defender jsonrpc server')
self.ctx.Commands.build_command(4, 'core', 'stop_rpc', 'Stop defender jsonrpc server') self.ctx.Commands.build_command(4, 'core', 'stop_rpc', 'Stop defender jsonrpc server')
self.ctx.Commands.build_command(4, 'core', 'load', 'Load an existing module')
self.ctx.Commands.build_command(4, 'core', 'unload', 'Unload a module')
self.ctx.Commands.build_command(4, 'core', 'reload', 'Reload a module')
############################################## ##############################################
# CONNEXION IRC # # CONNEXION IRC #
@@ -133,14 +127,6 @@ class Irc:
run_once=True, thread_flag=True run_once=True, thread_flag=True
) )
) )
# print("start_heartbeat.................")
# await self.ctx.Base.create_thread_io(
# self.ctx.Utils.heartbeat,
# self.ctx, self.beat,
# run_once=True, thread_flag=True
# )
while self.signal: while self.signal:
data = await self.reader.readuntil(b'\r\n') data = await self.reader.readuntil(b'\r\n')
@@ -216,53 +202,6 @@ class Irc:
return None return None
# async def on_sasl_authentication_process(self, sasl_model: 'MSasl') -> bool:
# s = sasl_model
# if sasl_model:
# async def db_get_admin_info(*, username: Optional[str] = None, password: Optional[str] = None, fingerprint: Optional[str] = None) -> Optional[dict[str, Any]]:
# if fingerprint:
# mes_donnees = {'fingerprint': fingerprint}
# query = f"SELECT user, level, language FROM {self.ctx.Config.TABLE_ADMIN} WHERE fingerprint = :fingerprint"
# else:
# mes_donnees = {'user': username, 'password': self.ctx.Utils.hash_password(password)}
# query = f"SELECT user, level, language FROM {self.ctx.Config.TABLE_ADMIN} WHERE user = :user AND password = :password"
# result = await self.ctx.Base.db_execute_query(query, mes_donnees)
# user_from_db = result.fetchone()
# if user_from_db:
# return {'user': user_from_db[0], 'level': user_from_db[1], 'language': user_from_db[2]}
# else:
# return None
# if s.message_type == 'C' and s.mechanisme == 'PLAIN':
# # Connection via PLAIN
# admin_info = await db_get_admin_info(username=s.username, password=s.password)
# if admin_info is not None:
# s.auth_success = True
# s.level = admin_info.get('level', 0)
# s.language = admin_info.get('language', 'EN')
# await self.Protocol.send2socket(f":{self.ctx.Config.SERVEUR_LINK} SASL {self.ctx.Settings.MAIN_SERVER_HOSTNAME} {s.client_uid} D S")
# await self.Protocol.send2socket(f":{self.ctx.Config.SERVEUR_LINK} 903 {s.username} :SASL authentication successful")
# else:
# await self.Protocol.send2socket(f":{self.ctx.Config.SERVEUR_LINK} SASL {self.ctx.Settings.MAIN_SERVER_HOSTNAME} {s.client_uid} D F")
# await self.Protocol.send2socket(f":{self.ctx.Config.SERVEUR_LINK} 904 {s.username} :SASL authentication failed")
# elif s.message_type == 'S' and s.mechanisme == 'EXTERNAL':
# # Connection using fingerprints
# admin_info = await db_get_admin_info(fingerprint=s.fingerprint)
# if admin_info is not None:
# s.auth_success = True
# s.level = admin_info.get('level', 0)
# s.username = admin_info.get('user', None)
# s.language = admin_info.get('language', 'EN')
# await self.Protocol.send2socket(f":{self.ctx.Config.SERVEUR_LINK} SASL {self.ctx.Settings.MAIN_SERVER_HOSTNAME} {s.client_uid} D S")
# await self.Protocol.send2socket(f":{self.ctx.Config.SERVEUR_LINK} 903 {s.username} :SASL authentication successful")
# else:
# # "904 <nick> :SASL authentication failed"
# await self.Protocol.send2socket(f":{self.ctx.Config.SERVEUR_LINK} SASL {self.ctx.Settings.MAIN_SERVER_HOSTNAME} {s.client_uid} D F")
# await self.Protocol.send2socket(f":{self.ctx.Config.SERVEUR_LINK} 904 {s.username} :SASL authentication failed")
def insert_db_admin(self, uid: str, account: str, level: int, language: str) -> None: def insert_db_admin(self, uid: str, account: str, level: int, language: str) -> None:
user_obj = self.ctx.User.get_user(uid) user_obj = self.ctx.User.get_user(uid)
@@ -328,7 +267,7 @@ class Irc:
spassword = self.ctx.Utils.hash_password(password) spassword = self.ctx.Utils.hash_password(password)
# Check if the user already exist # Check if the user already exist
if not self.ctx.Admin.db_is_admin_exist(nickname): if not await self.ctx.Admin.db_is_admin_exist(nickname):
mes_donnees = {'datetime': self.ctx.Utils.get_sdatetime(), 'user': nickname, 'password': spassword, 'hostname': hostname, 'vhost': vhost, 'level': level, 'language': self.ctx.Config.LANG} mes_donnees = {'datetime': self.ctx.Utils.get_sdatetime(), 'user': nickname, 'password': spassword, 'hostname': hostname, 'vhost': vhost, 'level': level, 'language': self.ctx.Config.LANG}
await self.ctx.Base.db_execute_query(f'''INSERT INTO {self.ctx.Config.TABLE_ADMIN} await self.ctx.Base.db_execute_query(f'''INSERT INTO {self.ctx.Config.TABLE_ADMIN}
(createdOn, user, password, hostname, vhost, level, language) VALUES (createdOn, user, password, hostname, vhost, level, language) VALUES
@@ -345,7 +284,7 @@ class Irc:
async def thread_check_for_new_version(self, fromuser: str) -> None: async def thread_check_for_new_version(self, fromuser: str) -> None:
dnickname = self.ctx.Config.SERVICE_NICKNAME dnickname = self.ctx.Config.SERVICE_NICKNAME
response = self.ctx.Base.create_asynctask( response = await self.ctx.Base.create_asynctask(
self.ctx.Base.create_thread_io( self.ctx.Base.create_thread_io(
self.ctx.Base.check_for_new_version, True self.ctx.Base.check_for_new_version, True
) )
@@ -403,9 +342,6 @@ class Irc:
if u is None: if u is None:
return None return None
c = self.ctx.Client.get_client(u.uid)
"""The Client Object"""
fromuser = u.nickname fromuser = u.nickname
uid = u.uid uid = u.uid
self.ctx.Settings.current_admin = self.ctx.Admin.get_admin(user) # set Current admin if any. self.ctx.Settings.current_admin = self.ctx.Admin.get_admin(user) # set Current admin if any.
@@ -607,10 +543,19 @@ class Irc:
await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"level: from 1 to 4") await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"level: from 1 to 4")
return None return None
if len(cmd) == 4:
user_new_level = int(cmd[3])
if user_new_level > 5:
await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="Maximum authorized level is 5")
return None
if user_new_level == 0:
await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="You must provide a valid level [1 to 4]")
return None
user_to_edit = cmd[1] user_to_edit = cmd[1]
user_password = self.ctx.Utils.hash_password(cmd[2]) user_password = self.ctx.Utils.hash_password(cmd[2])
get_admin = self.ctx.Admin.get_admin(fromuser) get_admin = self.ctx.Admin.get_admin(fromuser)
if get_admin is None: if get_admin is None:
await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"This user {fromuser} has no Admin access") await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"This user {fromuser} has no Admin access")
return None return None
@@ -619,15 +564,9 @@ class Irc:
current_uid = uid current_uid = uid
current_user_level = get_admin.level current_user_level = get_admin.level
user_new_level = int(cmd[3]) if len(cmd) == 4 else get_admin.level if current_user == user_to_edit:
if current_user == fromuser:
user_new_level = get_admin.level user_new_level = get_admin.level
if user_new_level > 5:
await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="Maximum authorized level is 5")
return None
# Rechercher le user dans la base de données. # Rechercher le user dans la base de données.
mes_donnees = {'user': user_to_edit} mes_donnees = {'user': user_to_edit}
query = f"SELECT user, level FROM {self.ctx.Config.TABLE_ADMIN} WHERE user = :user" query = f"SELECT user, level FROM {self.ctx.Config.TABLE_ADMIN} WHERE user = :user"
@@ -662,6 +601,10 @@ class Irc:
case 'delaccess': case 'delaccess':
# .delaccess [USER] [CONFIRMUSER] # .delaccess [USER] [CONFIRMUSER]
if len(cmd) < 3:
await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Right command : /msg {dnickname} delaccess [nickname] [CONFIRMNICKNAME]")
return None
user_to_del = cmd[1] user_to_del = cmd[1]
user_confirmation = cmd[2] user_confirmation = cmd[2]
@@ -670,10 +613,6 @@ class Irc:
self.ctx.Logs.warning(f':{dnickname} NOTICE {fromuser} : Les user ne sont pas les mêmes, tu dois confirmer le user que tu veux supprimer') self.ctx.Logs.warning(f':{dnickname} NOTICE {fromuser} : Les user ne sont pas les mêmes, tu dois confirmer le user que tu veux supprimer')
return None return None
if len(cmd) < 3:
await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"{self.ctx.Config.SERVICE_PREFIX}delaccess [USER] [CONFIRMUSER]")
return None
get_admin = self.ctx.Admin.get_admin(fromuser) get_admin = self.ctx.Admin.get_admin(fromuser)
if get_admin is None: if get_admin is None:
@@ -745,147 +684,6 @@ class Irc:
except Exception as e: except Exception as e:
self.ctx.Logs.error(e) self.ctx.Logs.error(e)
case 'register':
# Syntax. Register PASSWORD EMAIL
try:
if len(cmd) < 3:
await self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg=f'/msg {self.ctx.Config.SERVICE_NICKNAME} {command.upper()} <PASSWORD> <EMAIL>'
)
return None
password = cmd[1]
email = cmd[2]
if not self.ctx.Base.is_valid_email(email_to_control=email):
await self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg='The email is not valid. You must provide a valid email address (first.name@email.extension)'
)
return None
user_obj = u
if user_obj is None:
self.ctx.Logs.error(f"Nickname ({fromuser}) doesn't exist, it is impossible to register this nickname")
return None
# If the account already exist.
if self.ctx.Client.db_is_account_exist(fromuser):
await self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg=f"Your account already exist, please try to login instead /msg {self.ctx.Config.SERVICE_NICKNAME} IDENTIFY <account> <password>"
)
return None
# If the account doesn't exist then insert into database
data_to_record = {
'createdOn': self.ctx.Utils.get_sdatetime(), 'account': fromuser,
'nickname': user_obj.nickname, 'hostname': user_obj.hostname, 'vhost': user_obj.vhost, 'realname': user_obj.realname, 'email': email,
'password': self.ctx.Utils.hash_password(password=password), 'level': 0
}
insert_to_db = await self.ctx.Base.db_execute_query(f"""
INSERT INTO {self.ctx.Config.TABLE_CLIENT}
(createdOn, account, nickname, hostname, vhost, realname, email, password, level)
VALUES
(:createdOn, :account, :nickname, :hostname, :vhost, :realname, :email, :password, :level)
""", data_to_record)
if insert_to_db.rowcount > 0:
await self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg=f"You have register your nickname successfully"
)
return None
except ValueError as ve:
self.ctx.Logs.error(f"Value Error : {ve}")
await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" {self.ctx.Config.SERVICE_PREFIX}{command.upper()} <PASSWORD> <EMAIL>")
case 'identify':
# Identify ACCOUNT PASSWORD
try:
if len(cmd) < 3:
await self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg=f'/msg {self.ctx.Config.SERVICE_NICKNAME} {command.upper()} <ACCOUNT> <PASSWORD>'
)
return None
account = str(cmd[1]) # account
encrypted_password = self.ctx.Utils.hash_password(cmd[2])
user_obj = u
client_obj = c
if client_obj is not None:
await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You are already logged in")
return None
db_query = f"SELECT account FROM {self.ctx.Config.TABLE_CLIENT} WHERE account = :account AND password = :password"
db_param = {'account': account, 'password': encrypted_password}
exec_query = await self.ctx.Base.db_execute_query(db_query, db_param)
result_query = exec_query.fetchone()
if result_query:
account = result_query[0]
await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You are now logged in")
client = self.ctx.Definition.MClient(**user_obj.to_dict(), account=account)
self.ctx.Client.insert(client)
await self.Protocol.send_svslogin(user_obj.uid, account)
await self.Protocol.send_svs2mode(nickname=fromuser, user_mode='+r')
else:
await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Wrong password or account")
return None
except ValueError as ve:
self.ctx.Logs.error(f"Value Error: {ve}")
await self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg=f'/msg {self.ctx.Config.SERVICE_NICKNAME} {command.upper()} <ACCOUNT> <PASSWORD>'
)
except Exception as err:
self.ctx.Logs.error(f"General Error: {err}")
case 'logout':
try:
# LOGOUT <account>
if len(cmd) < 2:
await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} <account>")
return None
user_obj = u
if user_obj is None:
self.ctx.Logs.error(f"The User [{fromuser}] is not available in the database")
return None
client_obj = c
if client_obj is None:
await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="Nothing to logout. please login first")
return None
await self.Protocol.send_svslogout(client_obj)
self.ctx.Client.delete(user_obj.uid)
await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You have been logged out successfully")
except ValueError as ve:
self.ctx.Logs.error(f"Value Error: {ve}")
await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} <account>")
except Exception as err:
self.ctx.Logs.error(f"General Error: {err}")
await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} <account>")
case 'help': case 'help':
# Syntax. !help [module_name] # Syntax. !help [module_name]
module_name = str(cmd[1]) if len(cmd) == 2 else None module_name = str(cmd[1]) if len(cmd) == 2 else None
@@ -951,9 +749,6 @@ class Irc:
for chan_name in self.ctx.Channel.UID_CHANNEL_DB: for chan_name in self.ctx.Channel.UID_CHANNEL_DB:
await self.Protocol.send_set_mode('-l', channel_name=chan_name.name) await self.Protocol.send_set_mode('-l', channel_name=chan_name.name)
for client in self.ctx.Client.CLIENT_DB:
await self.Protocol.send_svslogout(client)
await self.Protocol.send_notice( await self.Protocol.send_notice(
nick_from=dnickname, nick_from=dnickname,
nick_to=fromuser, nick_to=fromuser,
@@ -1087,17 +882,6 @@ class Irc:
) )
return None return None
case 'show_clients':
count_users = len(self.ctx.Client.CLIENT_DB)
await self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Total Connected Clients: {count_users}")
for db_client in self.ctx.Client.CLIENT_DB:
await self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg=f"UID : {db_client.uid} - isWebirc: {db_client.isWebirc} - isWebSocket: {db_client.isWebsocket} - Nickname: {db_client.nickname} - Account: {db_client.account} - Connection: {db_client.connexion_datetime}"
)
return None
case 'show_admins': case 'show_admins':
for db_admin in self.ctx.Admin.UID_ADMIN_DB: for db_admin in self.ctx.Admin.UID_ADMIN_DB:
await self.Protocol.send_notice( await self.Protocol.send_notice(
@@ -1186,4 +970,3 @@ class Irc:
case _: case _:
pass pass

View File

@@ -1,6 +1,6 @@
from logging import Logger from logging import Logger
from core.classes.modules.settings import global_settings from core.classes.modules.settings import global_settings
from core.classes.modules import translation, user, admin, client, channel, reputation, settings, sasl from core.classes.modules import translation, user, admin, channel, reputation, settings, sasl
import core.logs as logs import core.logs as logs
import core.definition as df import core.definition as df
import core.utils as utils import core.utils as utils
@@ -63,8 +63,6 @@ class Loader:
self.Settings.global_user = self.User self.Settings.global_user = self.User
self.Client: client.Client = client.Client(self)
self.Admin: admin.Admin = admin.Admin(self) self.Admin: admin.Admin = admin.Admin(self)
self.Channel: channel.Channel = channel.Channel(self) self.Channel: channel.Channel = channel.Channel(self)

View File

@@ -70,9 +70,7 @@ class Command(IModule):
self.ctx.Irc.Protocol.known_protocol.add(c) self.ctx.Irc.Protocol.known_protocol.add(c)
self.ctx.Commands.build_command(2, self.module_name, 'join', 'Join a channel') self.ctx.Commands.build_command(2, self.module_name, 'join', 'Join a channel')
self.ctx.Commands.build_command(2, self.module_name, 'assign', 'Assign a user to a role or task')
self.ctx.Commands.build_command(2, self.module_name, 'part', 'Leave a channel') self.ctx.Commands.build_command(2, self.module_name, 'part', 'Leave a channel')
self.ctx.Commands.build_command(2, self.module_name, 'unassign', 'Remove a user from a role or task')
self.ctx.Commands.build_command(2, self.module_name, 'owner', 'Give channel ownership to a user') self.ctx.Commands.build_command(2, self.module_name, 'owner', 'Give channel ownership to a user')
self.ctx.Commands.build_command(2, self.module_name, 'deowner', 'Remove channel ownership from a user') self.ctx.Commands.build_command(2, self.module_name, 'deowner', 'Remove channel ownership from a user')
self.ctx.Commands.build_command(2, self.module_name, 'protect', 'Protect a user from being kicked') self.ctx.Commands.build_command(2, self.module_name, 'protect', 'Protect a user from being kicked')
@@ -218,16 +216,12 @@ class Command(IModule):
user_uid = self.ctx.User.clean_uid(cmd[5]) user_uid = self.ctx.User.clean_uid(cmd[5])
userObj: MUser = self.ctx.User.get_user(user_uid) userObj: MUser = self.ctx.User.get_user(user_uid)
channel_name = cmd[4] if self.ctx.Channel.is_valid_channel(cmd[4]) else None channel_name = cmd[4] if self.ctx.Channel.is_valid_channel(cmd[4]) else None
client_obj = self.ctx.Client.get_client(user_uid)
nickname = userObj.nickname if userObj is not None else None nickname = userObj.nickname if userObj is not None else None
if client_obj is not None:
nickname = client_obj.account
if userObj is None: if userObj is None:
return None return None
if 'r' not in userObj.umodes and 'o' not in userObj.umodes and not self.ctx.Client.is_exist(userObj.uid): if 'r' not in userObj.umodes and 'o' not in userObj.umodes:
return None return None
db_data: dict[str, str] = {"nickname": nickname.lower(), "channel": channel_name.lower()} db_data: dict[str, str] = {"nickname": nickname.lower(), "channel": channel_name.lower()}
@@ -416,7 +410,7 @@ class Command(IModule):
except Exception as err: except Exception as err:
self.ctx.Logs.warning(f'Unknown Error: {str(err)}') self.ctx.Logs.warning(f'Unknown Error: {str(err)}')
case 'join' | 'assign': case 'join':
try: try:
await self.mod_utils.set_assign_channel_to_service(self, cmd, fromuser) await self.mod_utils.set_assign_channel_to_service(self, cmd, fromuser)
except IndexError as ie: except IndexError as ie:
@@ -425,7 +419,7 @@ class Command(IModule):
except Exception as err: except Exception as err:
self.ctx.Logs.warning(f'Unknown Error: {str(err)}') self.ctx.Logs.warning(f'Unknown Error: {str(err)}')
case 'part' | 'unassign': case 'part':
try: try:
# Syntax. !part #channel # Syntax. !part #channel
await self.mod_utils.set_unassign_channel_to_service(self, cmd, fromuser) await self.mod_utils.set_unassign_channel_to_service(self, cmd, fromuser)

View File

@@ -115,9 +115,11 @@ async def set_mode_to_all(uplink: 'Command', channel_name: str, action: Literal[
users:str = '' users:str = ''
uids_split = [chan_info.uids[i:i + 6] for i in range(0, len(chan_info.uids), 6)] uids_split = [chan_info.uids[i:i + 6] for i in range(0, len(chan_info.uids), 6)]
await uplink.ctx.Irc.Protocol.send2socket(f":{service_id} MODE {channel_name} {action}{set_mode} {dnickname}") # await uplink.ctx.Irc.Protocol.send2socket(f":{service_id} MODE {channel_name} {action}{set_mode} {dnickname}")
for uid in uids_split: for uid in uids_split:
for i in range(0, len(uid)): for i in range(0, len(uid)):
if uplink.ctx.Utils.clean_uid(uid[i]) == uplink.ctx.Config.SERVICE_ID:
continue
mode += set_mode mode += set_mode
users += f'{uplink.ctx.User.get_nickname(uplink.ctx.Utils.clean_uid(uid[i]))} ' users += f'{uplink.ctx.User.get_nickname(uplink.ctx.Utils.clean_uid(uid[i]))} '
if i == len(uid) - 1: if i == len(uid) - 1:

View File

@@ -288,10 +288,19 @@ class Defender(IModule):
jail_chan = self.ctx.Config.SALON_JAIL # Salon pot de miel jail_chan = self.ctx.Config.SALON_JAIL # Salon pot de miel
jail_chan_mode = self.ctx.Config.SALON_JAIL_MODES # Mode du salon "pot de miel" jail_chan_mode = self.ctx.Config.SALON_JAIL_MODES # Mode du salon "pot de miel"
color_green = self.ctx.Config.COLORS.green
color_red = self.ctx.Config.COLORS.red
color_black = self.ctx.Config.COLORS.black
color_nogc = self.ctx.Config.COLORS.nogc
match command: match command:
case 'show_reputation': case 'show_reputation':
if self.mod_config.reputation == 0:
await self.ctx.Irc.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="Reputation system if off!")
return None
if not self.ctx.Reputation.UID_REPUTATION_DB: if not self.ctx.Reputation.UID_REPUTATION_DB:
await self.ctx.Irc.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="No one is suspected") await self.ctx.Irc.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="No one is suspected")
@@ -664,9 +673,6 @@ class Defender(IModule):
# .proxy_scan set psutil_scan on/off --> Active les informations de connexion a la machine locale # .proxy_scan set psutil_scan on/off --> Active les informations de connexion a la machine locale
# .proxy_scan set abuseipdb_scan on/off --> Active le scan via l'api abuseipdb # .proxy_scan set abuseipdb_scan on/off --> Active le scan via l'api abuseipdb
len_cmd = len(cmd) len_cmd = len(cmd)
color_green = self.ctx.Config.COLORS.green
color_red = self.ctx.Config.COLORS.red
color_black = self.ctx.Config.COLORS.black
if len_cmd == 4: if len_cmd == 4:
set_key = str(cmd[1]).lower() set_key = str(cmd[1]).lower()
@@ -943,6 +949,7 @@ class Defender(IModule):
if chan.name not in channel_to_dont_quit: if chan.name not in channel_to_dont_quit:
await self.ctx.Irc.Protocol.send_join_chan(uidornickname=dnickname, channel=chan.name) await self.ctx.Irc.Protocol.send_join_chan(uidornickname=dnickname, channel=chan.name)
await self.ctx.Irc.Protocol.send_priv_msg(dnickname, f"Sentinel mode activated on {channel}", channel=chan.name) await self.ctx.Irc.Protocol.send_priv_msg(dnickname, f"Sentinel mode activated on {channel}", channel=chan.name)
await self.ctx.Irc.Protocol.send_priv_msg(dnickname, f"[ {color_green}SENTINEL{color_nogc} ] Activated by {fromuser}", channel=self.ctx.Config.SERVICE_CHANLOG)
return None return None
if activation == 'off': if activation == 'off':
@@ -957,7 +964,7 @@ class Defender(IModule):
await self.ctx.Irc.Protocol.send_priv_msg(dnickname, f"Sentinel mode deactivated on {channel}", channel=chan.name) await self.ctx.Irc.Protocol.send_priv_msg(dnickname, f"Sentinel mode deactivated on {channel}", channel=chan.name)
await self.join_saved_channels() await self.join_saved_channels()
await self.ctx.Irc.Protocol.send_priv_msg(dnickname, f"[ {color_red}SENTINEL{color_nogc} ] Deactivated by {fromuser}", channel=self.ctx.Config.SERVICE_CHANLOG)
return None return None
case _: case _: