mirror of
https://github.com/iio612/DEFENDER.git
synced 2026-02-13 19:24:23 +00:00
Remove all related to nickserv scope
This commit is contained in:
@@ -3,7 +3,7 @@ from typing import Optional, TYPE_CHECKING
|
||||
from core.classes.protocols.command_handler import CommandHandler
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.definition import MClient, MSasl, MUser, MChannel
|
||||
from core.definition import MSasl, MUser, MChannel
|
||||
from core.loader import Loader
|
||||
|
||||
class IProtocol(ABC):
|
||||
@@ -224,11 +224,8 @@ class IProtocol(ABC):
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
async def send_svslogout(self, client_obj: 'MClient') -> None:
|
||||
async def send_svslogout(self) -> None:
|
||||
"""Logout a client from his account
|
||||
|
||||
Args:
|
||||
client_obj (MClient): The Client UID
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
|
||||
@@ -1,226 +0,0 @@
|
||||
from re import sub
|
||||
from typing import Any, Optional, Union, TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.loader import Loader
|
||||
from core.definition import MClient
|
||||
|
||||
class Client:
|
||||
|
||||
CLIENT_DB: list['MClient'] = []
|
||||
|
||||
def __init__(self, loader: 'Loader'):
|
||||
"""
|
||||
|
||||
Args:
|
||||
loader (Loader): The Loader instance.
|
||||
"""
|
||||
self._ctx = loader
|
||||
|
||||
def insert(self, new_client: 'MClient') -> bool:
|
||||
"""Insert a new User object
|
||||
|
||||
Args:
|
||||
new_client (MClient): New Client object
|
||||
|
||||
Returns:
|
||||
bool: True if inserted
|
||||
"""
|
||||
|
||||
client_obj = self.get_client(new_client.uid)
|
||||
|
||||
if not client_obj is None:
|
||||
# User already created return False
|
||||
return False
|
||||
|
||||
self.CLIENT_DB.append(new_client)
|
||||
|
||||
return True
|
||||
|
||||
def update_nickname(self, uid: str, new_nickname: str) -> bool:
|
||||
"""Update the nickname starting from the UID
|
||||
|
||||
Args:
|
||||
uid (str): UID of the user
|
||||
new_nickname (str): New nickname
|
||||
|
||||
Returns:
|
||||
bool: True if updated
|
||||
"""
|
||||
user_obj = self.get_client(uidornickname=uid)
|
||||
|
||||
if user_obj is None:
|
||||
return False
|
||||
|
||||
user_obj.nickname = new_nickname
|
||||
|
||||
return True
|
||||
|
||||
def update_mode(self, uidornickname: str, modes: str) -> bool:
|
||||
"""Updating user mode
|
||||
|
||||
Args:
|
||||
uidornickname (str): The UID or Nickname of the user
|
||||
modes (str): new modes to update
|
||||
|
||||
Returns:
|
||||
bool: True if user mode has been updaed
|
||||
"""
|
||||
response = True
|
||||
user_obj = self.get_client(uidornickname=uidornickname)
|
||||
|
||||
if user_obj is None:
|
||||
return False
|
||||
|
||||
action = modes[0]
|
||||
new_modes = modes[1:]
|
||||
|
||||
existing_umodes = user_obj.umodes
|
||||
umodes = user_obj.umodes
|
||||
|
||||
if action == '+':
|
||||
|
||||
for nm in new_modes:
|
||||
if nm not in existing_umodes:
|
||||
umodes += nm
|
||||
|
||||
elif action == '-':
|
||||
for nm in new_modes:
|
||||
if nm in existing_umodes:
|
||||
umodes = umodes.replace(nm, '')
|
||||
else:
|
||||
return False
|
||||
|
||||
liste_umodes = list(umodes)
|
||||
final_umodes_liste = [x for x in self._ctx.Base.Settings.PROTOCTL_USER_MODES if x in liste_umodes]
|
||||
final_umodes = ''.join(final_umodes_liste)
|
||||
|
||||
user_obj.umodes = f"+{final_umodes}"
|
||||
|
||||
return response
|
||||
|
||||
def delete(self, uid: str) -> bool:
|
||||
"""Delete the User starting from the UID
|
||||
|
||||
Args:
|
||||
uid (str): UID of the user
|
||||
|
||||
Returns:
|
||||
bool: True if deleted
|
||||
"""
|
||||
|
||||
user_obj = self.get_client(uidornickname=uid)
|
||||
|
||||
if user_obj is None:
|
||||
return False
|
||||
|
||||
self.CLIENT_DB.remove(user_obj)
|
||||
|
||||
return True
|
||||
|
||||
def get_client(self, uidornickname: str) -> Optional['MClient']:
|
||||
"""Get The Client Object model
|
||||
|
||||
Args:
|
||||
uidornickname (str): UID or Nickname
|
||||
|
||||
Returns:
|
||||
UserModel|None: The UserModel Object | None
|
||||
"""
|
||||
for record in self.CLIENT_DB:
|
||||
if record.uid == uidornickname:
|
||||
return record
|
||||
elif record.nickname == uidornickname:
|
||||
return record
|
||||
|
||||
return None
|
||||
|
||||
def get_uid(self, uidornickname:str) -> Optional[str]:
|
||||
"""Get the UID of the user starting from the UID or the Nickname
|
||||
|
||||
Args:
|
||||
uidornickname (str): UID or Nickname
|
||||
|
||||
Returns:
|
||||
str|None: Return the UID
|
||||
"""
|
||||
|
||||
client_obj = self.get_client(uidornickname=uidornickname)
|
||||
|
||||
if client_obj is None:
|
||||
return None
|
||||
|
||||
return client_obj.uid
|
||||
|
||||
def get_nickname(self, uidornickname:str) -> Union[str, None]:
|
||||
"""Get the Nickname starting from UID or the nickname
|
||||
|
||||
Args:
|
||||
uidornickname (str): UID or Nickname of the user
|
||||
|
||||
Returns:
|
||||
str|None: the nickname
|
||||
"""
|
||||
client_obj = self.get_client(uidornickname=uidornickname)
|
||||
|
||||
if client_obj is None:
|
||||
return None
|
||||
|
||||
return client_obj.nickname
|
||||
|
||||
def is_exist(self, uidornickname: str) -> bool:
|
||||
"""Check if the UID or the nickname exist in the USER DB
|
||||
|
||||
Args:
|
||||
uidornickname (str): The UID or the NICKNAME
|
||||
|
||||
Returns:
|
||||
bool: True if exist
|
||||
"""
|
||||
user_obj = self.get_client(uidornickname=uidornickname)
|
||||
|
||||
if user_obj is None:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
async def db_is_account_exist(self, account: str) -> bool:
|
||||
"""Check if the account exist in the database
|
||||
|
||||
Args:
|
||||
account (str): The account to check
|
||||
|
||||
Returns:
|
||||
bool: True if exist
|
||||
"""
|
||||
|
||||
table_client = self._ctx.Base.Config.TABLE_CLIENT
|
||||
account_to_check = {'account': account.lower()}
|
||||
account_to_check_query = await self._ctx.Base.db_execute_query(f"""
|
||||
SELECT id FROM {table_client} WHERE LOWER(account) = :account
|
||||
""", account_to_check)
|
||||
|
||||
account_to_check_result = account_to_check_query.fetchone()
|
||||
if account_to_check_result:
|
||||
self._ctx.Logs.error(f"Account ({account}) already exist")
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def clean_uid(self, uid: str) -> Union[str, None]:
|
||||
"""Clean UID by removing @ / % / + / ~ / * / :
|
||||
|
||||
Args:
|
||||
uid (str): The UID to clean
|
||||
|
||||
Returns:
|
||||
str: Clean UID without any sign
|
||||
"""
|
||||
|
||||
pattern = fr'[:|@|%|\+|~|\*]*'
|
||||
parsed_uid = sub(pattern, '', uid)
|
||||
|
||||
if not parsed_uid:
|
||||
return None
|
||||
|
||||
return parsed_uid
|
||||
@@ -4,7 +4,7 @@ import sys
|
||||
import threading
|
||||
from typing import TYPE_CHECKING
|
||||
import core.module as module_mod
|
||||
from core.classes.modules import user, admin, client, channel, reputation, sasl
|
||||
from core.classes.modules import user, admin, channel, reputation, sasl
|
||||
from core.utils import tr
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -20,7 +20,6 @@ REHASH_MODULES = [
|
||||
'core.classes.modules.commands',
|
||||
'core.classes.modules.user',
|
||||
'core.classes.modules.admin',
|
||||
'core.classes.modules.client',
|
||||
'core.classes.modules.channel',
|
||||
'core.classes.modules.reputation',
|
||||
'core.classes.modules.sasl',
|
||||
@@ -63,7 +62,6 @@ async def restart_service(uplink: 'Loader', reason: str = "Restarting with no re
|
||||
uplink.ModuleUtils.model_clear() # Clear loaded modules.
|
||||
uplink.User.UID_DB.clear() # Clear User Object
|
||||
uplink.Channel.UID_CHANNEL_DB.clear() # Clear Channel Object
|
||||
uplink.Client.CLIENT_DB.clear() # Clear Client object
|
||||
uplink.Irc.Protocol.Handler.DB_IRCDCOMMS.clear()
|
||||
|
||||
# Reload Service modules
|
||||
@@ -77,7 +75,6 @@ async def rehash_service(uplink: 'Loader', nickname: str) -> None:
|
||||
need_a_restart = ["SERVEUR_ID"]
|
||||
uplink.Settings.set_cache('commands', uplink.Commands.DB_COMMANDS)
|
||||
uplink.Settings.set_cache('users', uplink.User.UID_DB)
|
||||
uplink.Settings.set_cache('clients', uplink.Client.CLIENT_DB)
|
||||
uplink.Settings.set_cache('admins', uplink.Admin.UID_ADMIN_DB)
|
||||
uplink.Settings.set_cache('reputations', uplink.Reputation.UID_REPUTATION_DB)
|
||||
uplink.Settings.set_cache('channels', uplink.Channel.UID_CHANNEL_DB)
|
||||
@@ -132,7 +129,6 @@ async def rehash_service(uplink: 'Loader', nickname: str) -> None:
|
||||
uplink.Base = uplink.BaseModule.Base(uplink)
|
||||
|
||||
uplink.User = user.User(uplink)
|
||||
uplink.Client = client.Client(uplink)
|
||||
uplink.Admin = admin.Admin(uplink)
|
||||
uplink.Channel = channel.Channel(uplink)
|
||||
uplink.Reputation = reputation.Reputation(uplink)
|
||||
@@ -141,7 +137,6 @@ async def rehash_service(uplink: 'Loader', nickname: str) -> None:
|
||||
|
||||
# Backup data
|
||||
uplink.User.UID_DB = uplink.Settings.get_cache('users')
|
||||
uplink.Client.CLIENT_DB = uplink.Settings.get_cache('clients')
|
||||
uplink.Admin.UID_ADMIN_DB = uplink.Settings.get_cache('admins')
|
||||
uplink.Channel.UID_CHANNEL_DB = uplink.Settings.get_cache('channels')
|
||||
uplink.Reputation.UID_REPUTATION_DB = uplink.Settings.get_cache('reputations')
|
||||
|
||||
@@ -8,7 +8,7 @@ from core.classes.interfaces.iprotocol import IProtocol
|
||||
from core.utils import tr
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.definition import MSasl, MClient, MUser, MChannel
|
||||
from core.definition import MSasl, MUser, MChannel
|
||||
|
||||
class Inspircd(IProtocol):
|
||||
|
||||
@@ -562,7 +562,6 @@ class Inspircd(IProtocol):
|
||||
uid = str(scopy[0]).replace(':','')
|
||||
newnickname = scopy[2]
|
||||
self._ctx.User.update_nickname(uid, newnickname)
|
||||
self._ctx.Client.update_nickname(uid, newnickname)
|
||||
self._ctx.Admin.update_nickname(uid, newnickname)
|
||||
|
||||
return None
|
||||
@@ -1334,15 +1333,12 @@ class Inspircd(IProtocol):
|
||||
client_uid (str): Client UID
|
||||
user_account (str): The account of the user
|
||||
"""
|
||||
...
|
||||
pass
|
||||
|
||||
async def send_svslogout(self, client_obj: 'MClient') -> None:
|
||||
async def send_svslogout(self) -> None:
|
||||
"""Logout a client from his account
|
||||
|
||||
Args:
|
||||
client_obj (MClient): The Client Object Model
|
||||
"""
|
||||
...
|
||||
pass
|
||||
|
||||
async def send_svsmode(self, nickname: str, user_mode: str) -> None:
|
||||
"""_summary_
|
||||
|
||||
@@ -7,7 +7,7 @@ from core.classes.interfaces.iprotocol import IProtocol
|
||||
from core.utils import is_coroutinefunction, tr
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.definition import MClient, MSasl, MUser, MChannel
|
||||
from core.definition import MSasl, MUser, MChannel
|
||||
|
||||
class Unrealircd6(IProtocol):
|
||||
|
||||
@@ -469,18 +469,13 @@ class Unrealircd6(IProtocol):
|
||||
except Exception as err:
|
||||
self._ctx.Logs.error(f'General Error: {err}')
|
||||
|
||||
async def send_svslogout(self, client_obj: 'MClient') -> None:
|
||||
async def send_svslogout(self) -> None:
|
||||
"""Logout a client from his account
|
||||
|
||||
Args:
|
||||
client_obj (MClient): The Client object
|
||||
"""
|
||||
try:
|
||||
c_uid = client_obj.uid
|
||||
c_nickname = client_obj.nickname
|
||||
await self.send2socket(f":{self._ctx.Config.SERVEUR_LINK} SVSLOGIN {self._ctx.Settings.MAIN_SERVER_HOSTNAME} {c_uid} 0")
|
||||
await self.send_svs2mode(c_nickname, '-r')
|
||||
|
||||
# await self.send2socket(f":{self._ctx.Config.SERVEUR_LINK} SVSLOGIN {self._ctx.Settings.MAIN_SERVER_HOSTNAME} {c_uid} 0")
|
||||
# await self.send_svs2mode(c_nickname, '-r')
|
||||
pass
|
||||
except Exception as err:
|
||||
self._ctx.Logs.error(f'General Error: {err}')
|
||||
|
||||
@@ -795,7 +790,6 @@ class Unrealircd6(IProtocol):
|
||||
|
||||
self._ctx.Channel.delete_user_from_all_channel(uid_who_quit)
|
||||
self._ctx.User.delete(uid_who_quit)
|
||||
self._ctx.Client.delete(uid_who_quit)
|
||||
self._ctx.Reputation.delete(uid_who_quit)
|
||||
self._ctx.Admin.delete(uid_who_quit)
|
||||
|
||||
@@ -879,7 +873,6 @@ class Unrealircd6(IProtocol):
|
||||
uid = str(server_msg[1]).lstrip(':')
|
||||
newnickname = server_msg[3]
|
||||
self._ctx.User.update_nickname(uid, newnickname)
|
||||
self._ctx.Client.update_nickname(uid, newnickname)
|
||||
self._ctx.Admin.update_nickname(uid, newnickname)
|
||||
self._ctx.Reputation.update(uid, newnickname)
|
||||
|
||||
|
||||
@@ -28,26 +28,6 @@ class MainModel:
|
||||
"""Return a list of attributes name"""
|
||||
return [f.name for f in fields(self)]
|
||||
|
||||
@dataclass
|
||||
class MClient(MainModel):
|
||||
"""Model Client for registred nickname"""
|
||||
uid: str = None
|
||||
account: str = None
|
||||
nickname: str = None
|
||||
username: str = None
|
||||
realname: str = None
|
||||
hostname: str = None
|
||||
umodes: str = None
|
||||
vhost: str = None
|
||||
fingerprint: str = None
|
||||
tls_cipher: str = None
|
||||
isWebirc: bool = False
|
||||
isWebsocket: bool = False
|
||||
remote_ip: str = None
|
||||
score_connexion: int = 0
|
||||
geoip: str = None
|
||||
connexion_datetime: datetime = field(default=datetime.now())
|
||||
|
||||
@dataclass
|
||||
class MUser(MainModel):
|
||||
"""Model User"""
|
||||
@@ -283,9 +263,6 @@ class MConfig(MainModel):
|
||||
LOGGING_NAME: str = "defender"
|
||||
"""The name of the Logging instance"""
|
||||
|
||||
TABLE_CLIENT: str = "core_client"
|
||||
"""Core Client table"""
|
||||
|
||||
TABLE_ADMIN: str = "core_admin"
|
||||
"""Core Admin table"""
|
||||
|
||||
|
||||
Reference in New Issue
Block a user