Merge pull request #92 from adator85/v6.3.0

V6.3.0
This commit is contained in:
adator
2025-10-18 20:54:35 +02:00
committed by GitHub
34 changed files with 2186 additions and 588 deletions

2
.gitignore vendored
View File

@@ -1,10 +1,12 @@
.pyenv/
.vscode/
.venv/
.idea/
db/
logs/
__pycache__/
configuration.json
configuration_inspircd.json
configuration_unreal6.json
*.log
test.py

View File

@@ -600,7 +600,9 @@ class Base:
def db_patch(self, table_name: str, column_name: str, column_type: str) -> bool:
if not self.db_is_column_exist(table_name, column_name):
patch = f"ALTER TABLE {self.Config.TABLE_ADMIN} ADD COLUMN {column_name} {column_type}"
update_row = f"UPDATE {self.Config.TABLE_ADMIN} SET language = 'EN' WHERE language is null"
self.db_execute_query(patch)
self.db_execute_query(update_row)
self.logs.debug(f"The patch has been applied")
self.logs.debug(f"Table name: {table_name}, Column name: {column_name}, Column type: {column_type}")
return True
@@ -681,14 +683,13 @@ class Base:
return False
def decode_ip(self, ip_b64encoded: str) -> Optional[str]:
binary_ip = b64decode(ip_b64encoded)
try:
binary_ip = b64decode(ip_b64encoded)
decoded_ip = ipaddress.ip_address(binary_ip)
return decoded_ip.exploded
except ValueError as ve:
self.logs.critical(f'This remote ip is not valid : {ve}')
self.logs.critical(f'This remote ip ({ip_b64encoded}) is not valid : {ve}')
return None
def encode_ip(self, remote_ip_address: str) -> Optional[str]:

View File

@@ -9,6 +9,11 @@ class Admin:
UID_ADMIN_DB: list[MAdmin] = []
def __init__(self, loader: 'Loader') -> None:
"""
Args:
loader (Loader): The Loader Instance.
"""
self.Logs = loader.Logs
self.Base = loader.Base
self.Setting = loader.Settings
@@ -201,3 +206,22 @@ class Admin:
return True
return False
def db_is_admin_exist(self, admin_nickname: str) -> bool:
"""Verify if the admin exist in the database!
Args:
admin_nickname (str): The nickname admin to check.
Returns:
bool: True if the admin exist otherwise False.
"""
mes_donnees = {'admin': admin_nickname}
query_search_user = f"SELECT id FROM {self.Config.TABLE_ADMIN} WHERE user = :admin"
r = self.Base.db_execute_query(query_search_user, mes_donnees)
exist_user = r.fetchone()
if exist_user:
return True
else:
return False

View File

@@ -11,14 +11,16 @@ class Channel:
"""List that contains all the Channels objects (ChannelModel)
"""
def __init__(self, loader: 'Loader') -> None:
def __init__(self, loader: 'Loader'):
"""
Args:
loader (Loader): The Loader Instance
"""
self.Logs = loader.Logs
self.Base = loader.Base
self.Utils = loader.Utils
return None
def insert(self, new_channel: 'MChannel') -> bool:
"""This method will insert a new channel and if the channel exist it will update the user list (uids)
@@ -110,6 +112,7 @@ class Channel:
return result
except ValueError as ve:
self.Logs.error(f'{ve}')
return False
def delete_user_from_all_channel(self, uid:str) -> bool:
"""Delete a client from all channels
@@ -134,6 +137,7 @@ class Channel:
return result
except ValueError as ve:
self.Logs.error(f'{ve}')
return False
def add_user_to_a_channel(self, channel_name: str, uid: str) -> bool:
"""Add a client to a channel
@@ -226,16 +230,18 @@ class Channel:
return False
pattern = fr'^#'
isChannel = findall(pattern, channel_to_check)
is_channel = findall(pattern, channel_to_check)
if not isChannel:
if not is_channel:
return False
else:
return True
except TypeError as te:
self.Logs.error(f'TypeError: [{channel_to_check}] - {te}')
return False
except Exception as err:
self.Logs.error(f'Error Not defined: {err}')
return False
def db_query_channel(self, action: Literal['add','del'], module_name: str, channel_name: str) -> bool:
"""You can add a channel or delete a channel.
@@ -282,8 +288,7 @@ class Channel:
else:
return False
case _:
return False
except Exception as err:
self.Logs.error(err)
return False

View File

@@ -10,7 +10,11 @@ class Client:
CLIENT_DB: list['MClient'] = []
def __init__(self, loader: 'Loader'):
"""
Args:
loader (Loader): The Loader instance.
"""
self.Logs = loader.Logs
self.Base = loader.Base
@@ -34,12 +38,12 @@ class Client:
return True
def update_nickname(self, uid: str, newNickname: str) -> bool:
def update_nickname(self, uid: str, new_nickname: str) -> bool:
"""Update the nickname starting from the UID
Args:
uid (str): UID of the user
newNickname (str): New nickname
new_nickname (str): New nickname
Returns:
bool: True if updated
@@ -49,7 +53,7 @@ class Client:
if user_obj is None:
return False
user_obj.nickname = newNickname
user_obj.nickname = new_nickname
return True
@@ -181,7 +185,7 @@ class Client:
return client_obj.to_dict()
def is_exist(self, uidornikname: str) -> bool:
def is_exist(self, uidornickname: str) -> bool:
"""Check if the UID or the nickname exist in the USER DB
Args:
@@ -190,7 +194,7 @@ class Client:
Returns:
bool: True if exist
"""
user_obj = self.get_Client(uidornickname=uidornikname)
user_obj = self.get_Client(uidornickname=uidornickname)
if user_obj is None:
return False
@@ -231,9 +235,9 @@ class Client:
"""
pattern = fr'[:|@|%|\+|~|\*]*'
parsed_UID = sub(pattern, '', uid)
parsed_uid = sub(pattern, '', uid)
if not parsed_UID:
if not parsed_uid:
return None
return parsed_UID
return parsed_uid

View File

@@ -9,6 +9,10 @@ class Command:
DB_COMMANDS: list['MCommand'] = []
def __init__(self, loader: 'Loader'):
"""
Args:
loader (Loader): The Loader instance.
"""
self.Loader = loader
self.Base = loader.Base
self.Logs = loader.Logs

View File

@@ -1,19 +0,0 @@
from typing import Literal, TYPE_CHECKING
from .protocols.unreal6 import Unrealircd6
from .protocols.inspircd import Inspircd
if TYPE_CHECKING:
from core.irc import Irc
class Protocol:
def __init__(self, protocol: Literal['unreal6','inspircd'], ircInstance: 'Irc'):
self.Protocol = None
match protocol:
case 'unreal6':
self.Protocol: Unrealircd6 = Unrealircd6(ircInstance)
case 'inspircd':
self.Protocol: Inspircd = Inspircd(ircInstance)
case _:
self.Protocol: Unrealircd6 = Unrealircd6(ircInstance)

View File

@@ -0,0 +1,54 @@
from typing import Optional, TYPE_CHECKING
if TYPE_CHECKING:
from core.definition import MIrcdCommand
from core.loader import Loader
class CommandHandler:
DB_IRCDCOMMS: list['MIrcdCommand'] = []
DB_SUBSCRIBE: list = []
def __init__(self, loader: 'Loader'):
"""Init method
Args:
loader (Loader): The loader Object
"""
self.__Logs = loader.Logs
def register(self, ircd_command_model: 'MIrcdCommand') -> None:
"""Register a new command in the Handler
Args:
ircd_command_model (MIrcdCommand): The IRCD Command Object
"""
ircd_command = self.get_registred_ircd_command(ircd_command_model.command_name)
if ircd_command is None:
self.__Logs.debug(f'[IRCD COMMAND HANDLER] New IRCD command ({ircd_command_model.command_name}) added to the handler.')
self.DB_IRCDCOMMS.append(ircd_command_model)
return None
else:
self.__Logs.debug(f'[IRCD COMMAND HANDLER] This IRCD command ({ircd_command.command_name}) already exist in the handler.')
return None
def get_registred_ircd_command(self, command_name: str) -> Optional['MIrcdCommand']:
"""Get the registred IRCD command model
Returns:
MIrcdCommand: The IRCD Command object
"""
com = command_name.upper()
for ircd_com in self.DB_IRCDCOMMS:
if com == ircd_com.command_name.upper():
return ircd_com
return None
def get_ircd_commands(self) -> list['MIrcdCommand']:
"""Get the list of IRCD Commands
Returns:
list[MIrcdCommand]: a list of all registred commands
"""
return self.DB_IRCDCOMMS.copy()

View File

@@ -0,0 +1,33 @@
from typing import TYPE_CHECKING, Optional
from .unreal6 import Unrealircd6
from .inspircd import Inspircd
from .interface import IProtocol
if TYPE_CHECKING:
from core.irc import Irc
class ProtocolFactorty:
def __init__(self, uplink: 'Irc'):
"""ProtocolFactory init.
Args:
uplink (Irc): The Irc object
"""
self.__Config = uplink.Config
self.__uplink = uplink
def get(self) -> Optional[IProtocol]:
protocol = self.__Config.SERVEUR_PROTOCOL
match protocol:
case 'unreal6':
self.__uplink.Logs.debug(f"[PROTOCOL] {protocol} has been loaded")
return Unrealircd6(self.__uplink)
case 'inspircd':
self.__uplink.Logs.debug(f"[PROTOCOL] {protocol} has been loaded")
return Inspircd(self.__uplink)
case _:
self.__uplink.Logs.critical(f"[PROTOCOL ERROR] This protocol name ({protocol} is not valid!)")
raise Exception("Unknown protocol!")

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,538 @@
from abc import ABC, abstractmethod
from typing import Optional, TYPE_CHECKING
from core.classes.protocols.command_handler import CommandHandler
if TYPE_CHECKING:
from core.definition import MClient, MSasl
class IProtocol(ABC):
Handler: Optional[CommandHandler] = None
@abstractmethod
def get_ircd_protocol_poisition(self, cmd: list[str], log: bool = False) -> tuple[int, Optional[str]]:
"""Get the position of known commands
Args:
cmd (list[str]): The server response
log (bool): If true it will log in the logger
Returns:
tuple[int, Optional[str]]: The position and the command.
"""
@abstractmethod
def register_command(self):
"""Register all commands that you need to handle
"""
@abstractmethod
def send2socket(self, message: str, print_log: bool = True) -> None:
"""Envoit les commandes à envoyer au serveur.
Args:
message (str): contient la commande à envoyer au serveur.
print_log (bool): If True then print logs
"""
@abstractmethod
def send_priv_msg(self, nick_from: str, msg: str, channel: str = None, nick_to: str = None):
"""Sending PRIVMSG to a channel or to a nickname by batches
could be either channel or nickname not both together
Args:
msg (str): The message to send
nick_from (str): The sender nickname
channel (str, optional): The receiver channel. Defaults to None.
nick_to (str, optional): The reciever nickname. Defaults to None.
"""
@abstractmethod
def send_notice(self, nick_from: str, nick_to: str, msg: str) -> None:
"""Sending NOTICE by batches
Args:
msg (str): The message to send to the server
nick_from (str): The sender Nickname
nick_to (str): The reciever nickname
"""
@abstractmethod
def send_link(self) -> None:
"""Créer le link et envoyer les informations nécessaires pour la
connexion au serveur.
"""
@abstractmethod
def send_gline(self, nickname: str, hostname: str, set_by: str, expire_timestamp: int, set_at_timestamp: int, reason: str) -> None:
"""Send a gline command to the server
Args:
nickname (str): The nickname of the client.
hostname (str): The hostname of the client.
set_by (str): The nickname who send the gline
expire_timestamp (int): Expire timestamp
set_at_timestamp (int): Set at timestamp
reason (str): The reason of the gline.
"""
@abstractmethod
def send_set_nick(self, newnickname: str) -> None:
"""Change nickname of the server
\n This method will also update the User object
Args:
newnickname (str): New nickname of the server
"""
@abstractmethod
def send_set_mode(self, modes: str, *, nickname: Optional[str] = None, channel_name: Optional[str] = None, params: Optional[str] = None) -> None:
"""Set a mode to channel or to a nickname or for a user in a channel
Args:
modes (str): The selected mode
nickname (Optional[str]): The nickname
channel_name (Optional[str]): The channel name
params (Optional[str]): Parameters like password.
"""
@abstractmethod
def send_squit(self, server_id: str, server_link: str, reason: str) -> None:
"""_summary_
Args:
server_id (str): _description_
server_link (str): _description_
reason (str): _description_
"""
@abstractmethod
def send_ungline(self, nickname:str, hostname: str) -> None:
"""_summary_
Args:
nickname (str): _description_
hostname (str): _description_
"""
@abstractmethod
def send_kline(self, nickname: str, hostname: str, set_by: str, expire_timestamp: int, set_at_timestamp: int, reason: str) -> None:
"""_summary_
Args:
nickname (str): _description_
hostname (str): _description_
set_by (str): _description_
expire_timestamp (int): _description_
set_at_timestamp (int): _description_
reason (str): _description_
"""
@abstractmethod
def send_unkline(self, nickname:str, hostname: str) -> None:
"""_summary_
Args:
nickname (str): _description_
hostname (str): _description_
"""
@abstractmethod
def send_sjoin(self, channel: str) -> None:
"""Server will join a channel with pre defined umodes
Args:
channel (str): Channel to join
"""
@abstractmethod
def send_sapart(self, nick_to_sapart: str, channel_name: str) -> None:
"""_summary_
Args:
nick_to_sapart (str): _description_
channel_name (str): _description_
"""
@abstractmethod
def send_sajoin(self, nick_to_sajoin: str, channel_name: str) -> None:
"""_summary_
Args:
nick_to_sajoin (str): _description_
channel_name (str): _description_
"""
@abstractmethod
def send_svspart(self, nick_to_part: str, channels: list[str], reason: str) -> None:
"""_summary_
Args:
nick_to_part (str): _description_
channels (list[str]): _description_
reason (str): _description_
"""
@abstractmethod
def send_svsjoin(self, nick_to_part: str, channels: list[str], keys: list[str]) -> None:
"""_summary_
Args:
nick_to_part (str): _description_
channels (list[str]): _description_
keys (list[str]): _description_
"""
@abstractmethod
def send_svsmode(self, nickname: str, user_mode: str) -> None:
"""_summary_
Args:
nickname (str): _description_
user_mode (str): _description_
"""
@abstractmethod
def send_svs2mode(self, nickname: str, user_mode: str) -> None:
"""_summary_
Args:
nickname (str): _description_
user_mode (str): _description_
"""
@abstractmethod
def send_svslogin(self, client_uid: str, user_account: str) -> None:
"""Log a client into his account.
Args:
client_uid (str): Client UID
user_account (str): The account of the user
"""
@abstractmethod
def send_svslogout(self, client_obj: 'MClient') -> None:
"""Logout a client from his account
Args:
client_obj (MClient): The Client UID
"""
@abstractmethod
def send_quit(self, uid: str, reason: str, print_log: bool = True) -> None:
"""Send quit message
- Delete uid from User object
- Delete uid from Reputation object
Args:
uid (str): The UID or the Nickname
reason (str): The reason for the quit
print_log (bool): If True then print logs
"""
@abstractmethod
def send_uid(self, nickname:str, username: str, hostname: str, uid:str, umodes: str, vhost: str, remote_ip: str, realname: str, print_log: bool = True) -> None:
"""Send UID to the server
- Insert User to User Object
Args:
nickname (str): Nickname of the client
username (str): Username of the client
hostname (str): Hostname of the client you want to create
uid (str): UID of the client you want to create
umodes (str): umodes of the client you want to create
vhost (str): vhost of the client you want to create
remote_ip (str): remote_ip of the client you want to create
realname (str): realname of the client you want to create
print_log (bool, optional): print logs if true. Defaults to True.
"""
@abstractmethod
def send_join_chan(self, uidornickname: str, channel: str, password: str = None, print_log: bool = True) -> None:
"""Joining a channel
Args:
uidornickname (str): UID or nickname that need to join
channel (str): channel to join
password (str, optional): The password of the channel to join. Default to None
print_log (bool, optional): Write logs. Defaults to True.
"""
@abstractmethod
def send_part_chan(self, uidornickname:str, channel: str, print_log: bool = True) -> None:
"""Part from a channel
Args:
uidornickname (str): UID or nickname that need to join
channel (str): channel to join
print_log (bool, optional): Write logs. Defaults to True.
"""
@abstractmethod
def send_mode_chan(self, channel_name: str, channel_mode: str) -> None:
"""_summary_
Args:
channel_name (str): _description_
channel_mode (str): _description_
"""
@abstractmethod
def send_raw(self, raw_command: str) -> None:
"""Send raw message to the server
Args:
raw_command (str): The raw command you want to send.
"""
# ------------------------------------------------------------------------
# COMMON IRC PARSER
# ------------------------------------------------------------------------
@abstractmethod
def parse_uid(self, server_msg: list[str]) -> dict[str, str]:
"""Parse UID and return dictionary.
Args:
server_msg (list[str]): The UID IRCD message
Returns:
dict[str, str]: The response as dictionary.
"""
@abstractmethod
def parse_quit(self, server_msg: list[str]) -> dict[str, str]:
"""Parse quit and return dictionary.
>>> [':97KAAAAAB', 'QUIT', ':Quit:', 'this', 'is', 'my', 'reason', 'to', 'quit']
Args:
server_msg (list[str]): The server message to parse
Returns:
dict[str, str]: The response as dictionary.
"""
@abstractmethod
def parse_nick(self, server_msg: list[str]) -> dict[str, str]:
"""Parse nick changes and return dictionary.
>>> [':97KAAAAAC', 'NICK', 'testinspir', '1757360740']
Args:
server_msg (list[str]): The server message to parse
Returns:
dict[str, str]: The response as dictionary.
"""
@abstractmethod
def parse_privmsg(self, server_msg: list[str]) -> dict[str, str]:
"""Parse PRIVMSG message.
>>> [':97KAAAAAE', 'PRIVMSG', '#welcome', ':This', 'is', 'my', 'public', 'message']
Args:
server_msg (list[str]): The server message to parse
Returns:
dict[str, str]: The response as dictionary.
```python
response = {
"uid": '97KAAAAAE',
"channel": '#welcome',
"message": 'This is my public message'
}
```
"""
# ------------------------------------------------------------------------
# EVENT HANDLER
# ------------------------------------------------------------------------
@abstractmethod
def on_svs2mode(self, server_msg: list[str]) -> None:
"""Handle svs2mode coming from a server
>>> [':00BAAAAAG', 'SVS2MODE', '001U01R03', '-r']
Args:
server_msg (list[str]): Original server message
"""
@abstractmethod
def on_mode(self, server_msg: list[str]) -> None:
"""Handle mode coming from a server
Args:
server_msg (list[str]): Original server message
"""
@abstractmethod
def on_umode2(self, server_msg: list[str]) -> None:
"""Handle umode2 coming from a server
>>> [':adator_', 'UMODE2', '-i']
Args:
server_msg (list[str]): Original server message
"""
@abstractmethod
def on_quit(self, server_msg: list[str]) -> None:
"""Handle quit coming from a server
Args:
server_msg (list[str]): Original server message
"""
@abstractmethod
def on_squit(self, server_msg: list[str]) -> None:
"""Handle squit coming from a server
Args:
server_msg (list[str]): Original server message
"""
@abstractmethod
def on_protoctl(self, server_msg: list[str]) -> None:
"""Handle protoctl coming from a server
Args:
server_msg (list[str]): Original server message
"""
@abstractmethod
def on_nick(self, server_msg: list[str]) -> None:
"""Handle nick coming from a server
new nickname
Args:
server_msg (list[str]): Original server message
"""
@abstractmethod
def on_sjoin(self, server_msg: list[str]) -> None:
"""Handle sjoin coming from a server
Args:
server_msg (list[str]): Original server message
"""
@abstractmethod
def on_part(self, server_msg: list[str]) -> None:
"""Handle part coming from a server
Args:
server_msg (list[str]): Original server message
"""
@abstractmethod
def on_eos(self, server_msg: list[str]) -> None:
"""Handle EOS coming from a server
Args:
server_msg (list[str]): Original server message
"""
@abstractmethod
def on_reputation(self, server_msg: list[str]) -> None:
"""Handle REPUTATION coming from a server
Args:
server_msg (list[str]): Original server message
"""
@abstractmethod
def on_uid(self, server_msg: list[str]) -> None:
"""Handle uid message coming from the server
Args:
server_msg (list[str]): Original server message
"""
@abstractmethod
def on_privmsg(self, server_msg: list[str]) -> None:
"""Handle PRIVMSG message coming from the server
Args:
server_msg (list[str]): Original server message
"""
@abstractmethod
def on_server_ping(self, server_msg: list[str]) -> None:
"""Send a PONG message to the server
Args:
server_msg (list[str]): List of str coming from the server
"""
@abstractmethod
def on_server(self, server_msg: list[str]) -> None:
"""_summary_
Args:
server_msg (list[str]): _description_
"""
@abstractmethod
def on_version(self, server_msg: list[str]) -> None:
"""Sending Server Version to the server
Args:
server_msg (list[str]): List of str coming from the server
"""
@abstractmethod
def on_time(self, server_msg: list[str]) -> None:
"""Sending TIME answer to a requestor
Args:
server_msg (list[str]): List of str coming from the server
"""
@abstractmethod
def on_ping(self, server_msg: list[str]) -> None:
"""Sending a PING answer to requestor
Args:
server_msg (list[str]): List of str coming from the server
"""
@abstractmethod
def on_version_msg(self, server_msg: list[str]) -> None:
"""Handle version coming from the server
\n ex. /version Defender
Args:
server_msg (list[str]): Original message from the server
"""
@abstractmethod
def on_smod(self, server_msg: list[str]) -> None:
"""Handle SMOD message coming from the server
Args:
server_msg (list[str]): Original server message
"""
@abstractmethod
def on_sasl(self, server_msg: list[str]) -> Optional['MSasl']:
"""Handle SASL coming from a server
Args:
server_msg (list[str]): Original server message
Returns:
"""
@abstractmethod
def on_sasl_authentication_process(self, sasl_model: 'MSasl') -> bool:
"""Finalize sasl authentication
Args:
sasl_model (MSasl): The sasl dataclass model
Returns:
bool: True if success
"""
@abstractmethod
def on_md(self, server_msg: list[str]) -> None:
"""Handle MD responses
[':001', 'MD', 'client', '001MYIZ03', 'certfp', ':d1235648...']
Args:
server_msg (list[str]): The server reply
"""

View File

@@ -1,9 +1,11 @@
from base64 import b64decode
from re import match, findall, search
from datetime import datetime
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Any, Optional
from ssl import SSLEOFError, SSLError
from core.classes.protocols.command_handler import CommandHandler
from core.classes.protocols.interface import IProtocol
from core.utils import tr
if TYPE_CHECKING:
@@ -11,7 +13,7 @@ if TYPE_CHECKING:
from core.classes.sasl import Sasl
from core.definition import MClient, MSasl
class Unrealircd6:
class Unrealircd6(IProtocol):
def __init__(self, ircInstance: 'Irc'):
self.name = 'UnrealIRCD-6'
@@ -31,13 +33,16 @@ class Unrealircd6:
'PROTOCTL', 'SERVER', 'SMOD', 'TKL', 'NETINFO',
'006', '007', '018'}
self.__Logs.info(f"** Loading protocol [{__name__}]")
self.Handler = CommandHandler(ircInstance.Loader)
def get_ircd_protocol_poisition(self, cmd: list[str]) -> tuple[int, Optional[str]]:
self.__Logs.info(f"[PROTOCOL] Protocol [{__name__}] loaded!")
def get_ircd_protocol_poisition(self, cmd: list[str], log: bool = False) -> tuple[int, Optional[str]]:
"""Get the position of known commands
Args:
cmd (list[str]): The server response
log (bool): If true it will log in the logger
Returns:
tuple[int, Optional[str]]: The position and the command.
@@ -46,10 +51,34 @@ class Unrealircd6:
if token.upper() in self.known_protocol:
return index, token.upper()
self.__Logs.debug(f"[IRCD LOGS] You need to handle this response: {cmd}")
if log:
self.__Logs.debug(f"[IRCD LOGS] You need to handle this response: {cmd}")
return (-1, None)
def register_command(self) -> None:
m = self.__Irc.Loader.Definition.MIrcdCommand
self.Handler.register(m(command_name="PING", func=self.on_server_ping))
self.Handler.register(m(command_name="UID", func=self.on_uid))
self.Handler.register(m(command_name="QUIT", func=self.on_quit))
self.Handler.register(m(command_name="SERVER", func=self.on_server))
self.Handler.register(m(command_name="SJOIN", func=self.on_sjoin))
self.Handler.register(m(command_name="EOS", func=self.on_eos))
self.Handler.register(m(command_name="PROTOCTL", func=self.on_protoctl))
self.Handler.register(m(command_name="SVS2MODE", func=self.on_svs2mode))
self.Handler.register(m(command_name="SQUIT", func=self.on_squit))
self.Handler.register(m(command_name="PART", func=self.on_part))
self.Handler.register(m(command_name="VERSION", func=self.on_version_msg))
self.Handler.register(m(command_name="UMODE2", func=self.on_umode2))
self.Handler.register(m(command_name="NICK", func=self.on_nick))
self.Handler.register(m(command_name="REPUTATION", func=self.on_reputation))
self.Handler.register(m(command_name="SMOD", func=self.on_smod))
self.Handler.register(m(command_name="SASL", func=self.on_sasl))
self.Handler.register(m(command_name="MD", func=self.on_md))
self.Handler.register(m(command_name="PRIVMSG", func=self.on_privmsg))
return None
def parse_server_msg(self, server_msg: list[str]) -> Optional[str]:
"""Parse the server message and return the command
@@ -173,42 +202,52 @@ class Unrealircd6:
"""Créer le link et envoyer les informations nécessaires pour la
connexion au serveur.
"""
nickname = self.__Config.SERVICE_NICKNAME
username = self.__Config.SERVICE_USERNAME
realname = self.__Config.SERVICE_REALNAME
chan = self.__Config.SERVICE_CHANLOG
info = self.__Config.SERVICE_INFO
smodes = self.__Config.SERVICE_SMODES
cmodes = self.__Config.SERVICE_CMODES
umodes = self.__Config.SERVICE_UMODES
host = self.__Config.SERVICE_HOST
service_id = self.__Config.SERVICE_ID
service_nickname = self.__Config.SERVICE_NICKNAME
service_username = self.__Config.SERVICE_USERNAME
service_realname = self.__Config.SERVICE_REALNAME
service_channel_log = self.__Config.SERVICE_CHANLOG
service_info = self.__Config.SERVICE_INFO
service_smodes = self.__Config.SERVICE_SMODES
service_cmodes = self.__Config.SERVICE_CMODES
service_umodes = self.__Config.SERVICE_UMODES
service_hostname = self.__Config.SERVICE_HOST
service_name = self.__Config.SERVICE_NAME
protocolversion = self.protocol_version
password = self.__Config.SERVEUR_PASSWORD
link = self.__Config.SERVEUR_LINK
server_password = self.__Config.SERVEUR_PASSWORD
server_link = self.__Config.SERVEUR_LINK
server_id = self.__Config.SERVEUR_ID
service_id = self.__Config.SERVICE_ID
version = self.__Config.CURRENT_VERSION
unixtime = self.__Utils.get_unixtime()
self.send2socket(f":{server_id} PASS :{password}", print_log=False)
self.send2socket(f":{server_id} PASS :{server_password}", print_log=False)
self.send2socket(f":{server_id} PROTOCTL SID NOQUIT NICKv2 SJOIN SJ3 NICKIP TKLEXT2 NEXTBANS CLK EXTSWHOIS MLOCK MTAGS")
self.send2socket(f":{server_id} PROTOCTL EAUTH={link},{protocolversion},,{service_name}-v{version}")
self.send2socket(f":{server_id} PROTOCTL EAUTH={server_link},{protocolversion},,{service_name}-v{version}")
self.send2socket(f":{server_id} PROTOCTL SID={server_id}")
self.send2socket(f":{server_id} PROTOCTL BOOTED={unixtime}")
self.send2socket(f":{server_id} SERVER {link} 1 :{info}")
self.send2socket(f":{server_id} {nickname} :Reserved for services")
self.send2socket(f":{server_id} UID {nickname} 1 {unixtime} {username} {host} {service_id} * {smodes} * * fwAAAQ== :{realname}")
self.send_sjoin(chan)
self.send2socket(f":{server_id} TKL + Q * {nickname} {host} 0 {unixtime} :Reserved for services")
self.send2socket(f":{service_id} MODE {chan} {cmodes}")
self.send2socket(f":{server_id} SERVER {server_link} 1 :{service_info}")
self.send2socket("EOS")
self.send2socket(f":{server_id} {service_nickname} :Reserved for services")
self.send2socket(f":{server_id} UID {service_nickname} 1 {unixtime} {service_username} {service_hostname} {service_id} * {service_smodes} * * fwAAAQ== :{service_realname}")
self.send_sjoin(service_channel_log)
self.send2socket(f":{server_id} TKL + Q * {service_nickname} {service_hostname} 0 {unixtime} :Reserved for services")
self.send2socket(f":{service_id} MODE {service_channel_log} {service_cmodes}")
self.__Logs.debug(f'>> {__name__} Link information sent to the server')
def send_gline(self, nickname: str, hostname: str, set_by: str, expire_timestamp: int, set_at_timestamp: int, reason: str) -> None:
"""Send a gline command to the server
Args:
nickname (str): The nickname of the client.
hostname (str): The hostname of the client.
set_by (str): The nickname who send the gline
expire_timestamp (int): Expire timestamp
set_at_timestamp (int): Set at timestamp
reason (str): The reason of the gline.
"""
# TKL + G user host set_by expire_timestamp set_at_timestamp :reason
self.send2socket(f":{self.__Config.SERVEUR_ID} TKL + G {nickname} {hostname} {set_by} {expire_timestamp} {set_at_timestamp} :{reason}")
@@ -227,6 +266,42 @@ class Unrealircd6:
self.__Irc.User.update_nickname(userObj.uid, newnickname)
return None
def send_set_mode(self, modes: str, *, nickname: Optional[str] = None, channel_name: Optional[str] = None, params: Optional[str] = None) -> None:
"""Set a mode to channel or to a nickname or for a user in a channel
Args:
modes (str): The selected mode
nickname (Optional[str]): The nickname
channel_name (Optional[str]): The channel name
params (Optional[str]): Parameters like password.
"""
service_id = self.__Config.SERVICE_ID
if modes[0] not in ['+', '-']:
self.__Logs.error(f"[MODE ERROR] The mode you have provided is missing the sign: {modes}")
return None
if nickname and channel_name:
# :98KAAAAAB MODE #services +o defenderdev
if not self.__Irc.Channel.is_valid_channel(channel_name):
self.__Logs.error(f"[MODE ERROR] The channel is not valid: {channel_name}")
return None
self.send2socket(f":{service_id} MODE {channel_name} {modes} {nickname}")
return None
if nickname and channel_name is None:
self.send2socket(f":{service_id} MODE {nickname} {modes}")
return None
if nickname is None and channel_name:
if not self.__Irc.Channel.is_valid_channel(channel_name):
self.__Logs.error(f"[MODE ERROR] The channel is not valid: {channel_name}")
return None
self.send2socket(f":{service_id} MODE {channel_name} {modes} {params}")
return None
return None
def send_squit(self, server_id: str, server_link: str, reason: str) -> None:
if not reason:
@@ -429,7 +504,7 @@ class Unrealircd6:
reason (str): The reason for the quit
"""
user_obj = self.__Irc.User.get_user(uidornickname=uid)
reputationObj = self.__Irc.Reputation.get_Reputation(uidornickname=uid)
reputationObj = self.__Irc.Reputation.get_reputation(uidornickname=uid)
if not user_obj is None:
self.send2socket(f":{user_obj.uid} QUIT :{reason}", print_log=print_log)
@@ -563,6 +638,103 @@ class Unrealircd6:
return None
# ------------------------------------------------------------------------
# COMMON IRC PARSER
# ------------------------------------------------------------------------
def parse_uid(self, serverMsg: list[str]) -> dict[str, str]:
"""Parse UID and return dictionary.
>>> ['@s2s-md/geoip=cc=GBtag...', ':001', 'UID', 'albatros', '0', '1721564597', 'albatros', 'hostname...', '001HB8G04', '0', '+iwxz', 'hostname-vhost', 'hostname-vhost', 'MyZBwg==', ':...']
Args:
serverMsg (list[str]): The UID ircd response
"""
umodes = str(serverMsg[10])
remote_ip = self.__Base.decode_ip(str(serverMsg[13])) if 'S' not in umodes else '127.0.0.1'
# Extract Geoip information
pattern = r'^.*geoip=cc=(\S{2}).*$'
geoip_match = match(pattern, serverMsg[0])
geoip = geoip_match.group(1) if geoip_match else None
response = {
'uid': str(serverMsg[8]),
'nickname': str(serverMsg[3]),
'username': str(serverMsg[6]),
'hostname': str(serverMsg[7]),
'umodes': umodes,
'vhost': str(serverMsg[11]),
'ip': remote_ip,
'realname': ' '.join(serverMsg[12:]).lstrip(':'),
'geoip': geoip,
'reputation_score': 0,
'iswebirc': True if 'webirc' in serverMsg[0] else False,
'iswebsocket': True if 'websocket' in serverMsg[0] else False
}
return response
def parse_quit(self, serverMsg: list[str]) -> dict[str, str]:
"""Parse quit and return dictionary.
>>> # ['@unrealtag...', ':001JKNY0N', 'QUIT', ':Quit:', '....']
Args:
serverMsg (list[str]): The server message to parse
Returns:
dict[str, str]: The dictionary.
"""
scopy = serverMsg.copy()
if scopy[0].startswith('@'):
scopy.pop(0)
response = {
"uid": scopy[0].replace(':', ''),
"reason": " ".join(scopy[3:])
}
return response
def parse_nick(self, serverMsg: list[str]) -> dict[str, str]:
"""Parse nick changes and return dictionary.
>>> ['@unrealircd.org/geoip=FR;unrealircd.org/', ':001OOU2H3', 'NICK', 'WebIrc', '1703795844']
Args:
serverMsg (list[str]): The server message to parse
Returns:
dict[str, str]: The response as dictionary.
"""
scopy = serverMsg.copy()
if scopy[0].startswith('@'):
scopy.pop(0)
response = {
"uid": scopy[0].replace(':', ''),
"newnickname": scopy[2],
"timestamp": scopy[3]
}
return response
def parse_privmsg(self, serverMsg: list[str]) -> dict[str, str]:
"""Parse PRIVMSG message.
>>> ['@....', ':97KAAAAAE', 'PRIVMSG', '#welcome', ':This', 'is', 'my', 'public', 'message']
>>> [':97KAAAAAF', 'PRIVMSG', '98KAAAAAB', ':sasa']
Args:
serverMsg (list[str]): The server message to parse
Returns:
dict[str, str]: The response as dictionary.
"""
scopy = serverMsg.copy()
if scopy[0].startswith('@'):
scopy.pop(0)
response = {
"uid_sender": scopy[0].replace(':', ''),
"uid_reciever": self.__Irc.User.get_uid(scopy[2]),
"channel": scopy[2] if self.__Irc.Channel.is_valid_channel(scopy[2]) else None,
"message": " ".join(scopy[3:])
}
return response
#####################
# HANDLE EVENTS #
#####################
@@ -734,6 +906,7 @@ class Unrealircd6:
self.__Irc.User.update_nickname(uid, newnickname)
self.__Irc.Client.update_nickname(uid, newnickname)
self.__Irc.Admin.update_nickname(uid, newnickname)
self.__Irc.Reputation.update(uid, newnickname)
return None
@@ -854,6 +1027,8 @@ class Unrealircd6:
self.__Logs.info(f"# VERSION : {version} ")
self.__Logs.info(f"################################################")
self.send_sjoin(self.__Config.SERVICE_CHANLOG)
if self.__Base.check_for_new_version(False):
self.send_priv_msg(
nick_from=self.__Config.SERVICE_NICKNAME,
@@ -873,6 +1048,10 @@ class Unrealircd6:
for module in self.__Irc.ModuleUtils.model_get_loaded_modules().copy():
module.class_instance.cmd(server_msg_copy)
# Join saved channels & load existing modules
self.__Irc.join_saved_channels()
self.__Irc.ModuleUtils.db_load_all_existing_modules(self.__Irc)
return None
except IndexError as ie:
self.__Logs.error(f"{__name__} - Key Error: {ie}")
@@ -986,14 +1165,42 @@ class Unrealircd6:
dnickname = self.__Config.SERVICE_NICKNAME
dchanlog = self.__Config.SERVICE_CHANLOG
GREEN = self.__Config.COLORS.green
RED = self.__Config.COLORS.red
NOGC = self.__Config.COLORS.nogc
for module in self.__Irc.ModuleUtils.model_get_loaded_modules().copy():
module.class_instance.cmd(serverMsg)
# SASL authentication
# ['@s2s-md/..', ':001', 'UID', 'adator__', '0', '1755987444', '...', 'desktop-h1qck20.mshome.net', '001XLTT0U', '0', '+iwxz', '*', 'Clk-EC2256B2.mshome.net', 'rBKAAQ==', ':...']
uid = serverMsg[8]
nickname = serverMsg[3]
sasl_obj = self.__Irc.Sasl.get_sasl_obj(uid)
if sasl_obj:
if sasl_obj.auth_success:
self.__Irc.insert_db_admin(sasl_obj.client_uid, sasl_obj.username, sasl_obj.level, sasl_obj.language)
self.send_priv_msg(nick_from=dnickname,
msg=tr("[ %sSASL AUTH%s ] - %s (%s) is now connected successfuly to %s", GREEN, NOGC, nickname, sasl_obj.username, dnickname),
channel=dchanlog)
self.send_notice(nick_from=dnickname, nick_to=nickname, msg=tr("Successfuly connected to %s", dnickname))
else:
self.send_priv_msg(nick_from=dnickname,
msg=tr("[ %sSASL AUTH%s ] - %s provided a wrong password for this username %s", RED, NOGC, nickname, sasl_obj.username),
channel=dchanlog)
self.send_notice(nick_from=dnickname, nick_to=nickname, msg=tr("Wrong password!"))
# Delete sasl object!
self.__Irc.Sasl.delete_sasl_client(uid)
return None
# If no sasl authentication then auto connect via fingerprint
if self.__Irc.Admin.db_auth_admin_via_fingerprint(fingerprint, uid):
admin = self.__Irc.Admin.get_admin(uid)
account = admin.account if admin else ''
self.send_priv_msg(nick_from=dnickname,
msg=tr("[ %sSASL AUTO AUTH%s ] - %s (%s) is now connected successfuly to %s", GREEN, NOGC, nickname, account, dnickname),
channel=dchanlog)
msg=tr("[ %sSASL AUTO AUTH%s ] - %s (%s) is now connected successfuly to %s", GREEN, NOGC, nickname, account, dnickname),
channel=dchanlog)
self.send_notice(nick_from=dnickname, nick_to=nickname, msg=tr("Successfuly connected to %s", dnickname))
return None
@@ -1035,15 +1242,6 @@ class Unrealircd6:
)
return None
# if not arg[0].lower() in self.__Irc.module_commands_list:
# self.__Logs.debug(f"This command {arg[0]} is not available")
# self.send_notice(
# nick_from=self.__Config.SERVICE_NICKNAME,
# nick_to=user_trigger,
# msg=f"This command [{self.__Config.COLORS.bold}{arg[0]}{self.__Config.COLORS.bold}] is not available"
# )
# return None
cmd_to_send = convert_to_string.replace(':','')
self.__Base.log_cmd(user_trigger, cmd_to_send)
@@ -1097,7 +1295,7 @@ class Unrealircd6:
except AttributeError as ae:
self.__Logs.error(f"Attribute Error: {ae}")
except Exception as err:
self.__Logs.error(f"General Error: {err} - {srv_msg}")
self.__Logs.error(f"General Error: {err} - {srv_msg}" , exc_info=True)
def on_server_ping(self, serverMsg: list[str]) -> None:
"""Send a PONG message to the server
@@ -1106,7 +1304,6 @@ class Unrealircd6:
serverMsg (list[str]): List of str coming from the server
"""
try:
#
pong = str(serverMsg[1]).replace(':','')
self.send2socket(f"PONG :{pong}", print_log=False)
@@ -1115,6 +1312,11 @@ class Unrealircd6:
self.__Logs.error(f"{__name__} - General Error: {err}")
def on_server(self, serverMsg: list[str]) -> None:
"""_summary_
Args:
serverMsg (list[str]): _description_
"""
try:
# ['SERVER', 'irc.local.org', '1', ':U6100-Fhn6OoE-001', 'Local', 'Server']
sCopy = serverMsg.copy()
@@ -1177,7 +1379,7 @@ class Unrealircd6:
Args:
serverMsg (list[str]): List of str coming from the server
"""
# ['@unrealircd.org/userhost=StatServ@stats.deb.biz.st;draft/bot;bot;msgid=ehfAq3m2yjMjhgWEfi1UCS;time=2024-10-26T13:49:06.299Z', ':001INC60B', 'PRIVMSG', '12ZAAAAAB', ':\x01PING', '762382207\x01']
# ['@unrealircd.org/...', ':001INC60B', 'PRIVMSG', '12ZAAAAAB', ':\x01PING', '762382207\x01']
# Réponse a un CTCP VERSION
try:
@@ -1186,6 +1388,7 @@ class Unrealircd6:
arg = serverMsg[4].replace(':', '')
if nickname is None:
self.__Logs.debug(serverMsg)
return None
if arg == '\x01PING':
@@ -1199,6 +1402,7 @@ class Unrealircd6:
nick_to=nickname,
msg=f"\x01PING {ping_response} secs\x01"
)
self.__Logs.debug(serverMsg)
return None
except Exception as err:
@@ -1255,7 +1459,7 @@ class Unrealircd6:
except Exception as err:
self.__Logs.error(f'General Error: {err}')
def on_sasl(self, serverMsg: list[str], psasl: 'Sasl') -> Optional['MSasl']:
def on_sasl(self, serverMsg: list[str]) -> Optional['MSasl']:
"""Handle SASL coming from a server
Args:
@@ -1268,7 +1472,7 @@ class Unrealircd6:
# [':irc.local.org', 'SASL', 'defender-dev.deb.biz.st', '0014ZZH1F', 'S', 'EXTERNAL', 'zzzzzzzkey']
# [':irc.local.org', 'SASL', 'defender-dev.deb.biz.st', '00157Z26U', 'C', 'sasakey==']
# [':irc.local.org', 'SASL', 'defender-dev.deb.biz.st', '00157Z26U', 'D', 'A']
psasl = self.__Irc.Sasl
sasl_enabled = False
for smod in self.__Settings.SMOD_MODULES:
if smod.name == 'sasl':
@@ -1308,6 +1512,7 @@ class Unrealircd6:
sasl_obj.fingerprint = str(sCopy[6])
self.send2socket(f":{self.__Config.SERVEUR_LINK} SASL {self.__Settings.MAIN_SERVER_HOSTNAME} {sasl_obj.client_uid} C +")
self.on_sasl_authentication_process(sasl_obj)
return sasl_obj
case 'C':
@@ -1320,14 +1525,64 @@ class Unrealircd6:
sasl_obj.username = username
sasl_obj.password = password
self.on_sasl_authentication_process(sasl_obj)
return sasl_obj
elif sasl_obj.mechanisme == "EXTERNAL":
sasl_obj.message_type = sasl_message_type
self.on_sasl_authentication_process(sasl_obj)
return sasl_obj
except Exception as err:
self.__Logs.error(f'General Error: {err}', exc_info=True)
def on_sasl_authentication_process(self, sasl_model: 'MSasl') -> bool:
s = sasl_model
if sasl_model:
def db_get_admin_info(*, username: Optional[str] = None, password: Optional[str] = None, fingerprint: Optional[str] = None) -> Optional[dict[str, Any]]:
if fingerprint:
mes_donnees = {'fingerprint': fingerprint}
query = f"SELECT user, level, language FROM {self.__Config.TABLE_ADMIN} WHERE fingerprint = :fingerprint"
else:
mes_donnees = {'user': username, 'password': self.__Utils.hash_password(password)}
query = f"SELECT user, level, language FROM {self.__Config.TABLE_ADMIN} WHERE user = :user AND password = :password"
result = self.__Base.db_execute_query(query, mes_donnees)
user_from_db = result.fetchone()
if user_from_db:
return {'user': user_from_db[0], 'level': user_from_db[1], 'language': user_from_db[2]}
else:
return None
if s.message_type == 'C' and s.mechanisme == 'PLAIN':
# Connection via PLAIN
admin_info = db_get_admin_info(username=s.username, password=s.password)
if admin_info is not None:
s.auth_success = True
s.level = admin_info.get('level', 0)
s.language = admin_info.get('language', 'EN')
self.send2socket(f":{self.__Config.SERVEUR_LINK} SASL {self.__Settings.MAIN_SERVER_HOSTNAME} {s.client_uid} D S")
self.send2socket(f":{self.__Config.SERVEUR_LINK} 903 {s.username} :SASL authentication successful")
else:
self.send2socket(f":{self.__Config.SERVEUR_LINK} SASL {self.__Settings.MAIN_SERVER_HOSTNAME} {s.client_uid} D F")
self.send2socket(f":{self.__Config.SERVEUR_LINK} 904 {s.username} :SASL authentication failed")
elif s.message_type == 'S' and s.mechanisme == 'EXTERNAL':
# Connection using fingerprints
admin_info = db_get_admin_info(fingerprint=s.fingerprint)
if admin_info is not None:
s.auth_success = True
s.level = admin_info.get('level', 0)
s.username = admin_info.get('user', None)
s.language = admin_info.get('language', 'EN')
self.send2socket(f":{self.__Config.SERVEUR_LINK} SASL {self.__Settings.MAIN_SERVER_HOSTNAME} {s.client_uid} D S")
self.send2socket(f":{self.__Config.SERVEUR_LINK} 903 {s.username} :SASL authentication successful")
else:
# "904 <nick> :SASL authentication failed"
self.send2socket(f":{self.__Config.SERVEUR_LINK} SASL {self.__Settings.MAIN_SERVER_HOSTNAME} {s.client_uid} D F")
self.send2socket(f":{self.__Config.SERVEUR_LINK} 904 {s.username} :SASL authentication failed")
def on_md(self, serverMsg: list[str]) -> None:
"""Handle MD responses
[':001', 'MD', 'client', '001MYIZ03', 'certfp', ':d1235648...']

View File

@@ -3,7 +3,6 @@ import sys
import time
from typing import TYPE_CHECKING
import socket
from core.classes.protocol import Protocol
if TYPE_CHECKING:
from core.irc import Irc
@@ -15,14 +14,20 @@ REHASH_MODULES = [
'core.classes.config',
'core.base',
'core.classes.commands',
'core.classes.protocols.interface',
'core.classes.protocols.factory',
'core.classes.protocols.unreal6',
'core.classes.protocols.inspircd',
'core.classes.protocol'
'core.classes.protocols.inspircd'
]
def restart_service(uplink: 'Irc', reason: str = "Restarting with no reason!") -> None:
"""
Args:
uplink (Irc): The Irc instance
reason (str): The reason of the restart.
"""
# reload modules.
for module in uplink.ModuleUtils.model_get_loaded_modules().copy():
uplink.ModuleUtils.unload_one_module(uplink, module.module_name)
@@ -33,14 +38,8 @@ def restart_service(uplink: 'Irc', reason: str = "Restarting with no reason!") -
uplink.Client.CLIENT_DB.clear() # Clear Client object
uplink.Base.garbage_collector_thread()
# Reload configuration
uplink.Config = uplink.Loader.ConfModule.Configuration(uplink.Loader).get_config_model()
uplink.Base = uplink.Loader.BaseModule.Base(uplink.Loader)
uplink.Protocol = Protocol(uplink.Config.SERVEUR_PROTOCOL, uplink.ircObject).Protocol
uplink.Logs.debug(f'[{uplink.Config.SERVICE_NICKNAME} RESTART]: Reloading configuration!')
uplink.Protocol.send_squit(server_id=uplink.Config.SERVEUR_ID, server_link=uplink.Config.SERVEUR_LINK, reason="Defender Power off")
uplink.Protocol.send_squit(server_id=uplink.Config.SERVEUR_ID, server_link=uplink.Config.SERVEUR_LINK, reason=reason)
uplink.Logs.debug('Restarting Defender ...')
uplink.IrcSocket.shutdown(socket.SHUT_RDWR)
uplink.IrcSocket.close()
@@ -49,11 +48,19 @@ def restart_service(uplink: 'Irc', reason: str = "Restarting with no reason!") -
time.sleep(0.5)
uplink.Logs.warning('-- Waiting for socket to close ...')
# Reload configuration
uplink.Loader.Config = uplink.Loader.ConfModule.Configuration(uplink.Loader).get_config_model()
uplink.Loader.Base = uplink.Loader.BaseModule.Base(uplink.Loader)
for mod in REHASH_MODULES:
importlib.reload(sys.modules[mod])
uplink.Protocol = uplink.Loader.PFactory.get()
uplink.Protocol.register_command()
uplink.init_service_user()
uplink.Utils.create_socket(uplink)
uplink.Protocol.send_link()
uplink.join_saved_channels()
uplink.ModuleUtils.db_load_all_existing_modules(uplink)
uplink.Config.DEFENDER_RESTART = 0
def rehash_service(uplink: 'Irc', nickname: str) -> None:
@@ -105,8 +112,9 @@ def rehash_service(uplink: 'Irc', nickname: str) -> None:
uplink.Commands = uplink.Loader.CommandModule.Command(uplink.Loader)
uplink.Commands.DB_COMMANDS = uplink.Settings.get_cache('db_commands')
uplink.Base = uplink.Loader.BaseModule.Base(uplink.Loader)
uplink.Protocol = Protocol(uplink.Config.SERVEUR_PROTOCOL, uplink.ircObject).Protocol
uplink.Loader.Base = uplink.Loader.BaseModule.Base(uplink.Loader)
uplink.Protocol = uplink.Loader.PFactory.get()
uplink.Protocol.register_command()
# Reload Service modules
for module in uplink.ModuleUtils.model_get_loaded_modules().copy():

View File

@@ -9,9 +9,14 @@ class Reputation:
UID_REPUTATION_DB: list[MReputation] = []
def __init__(self, loader: 'Loader'):
"""
Args:
loader (Loader): The Loader instance.
"""
self.Logs = loader.Logs
self.MReputation: MReputation = MReputation
self.MReputation: Optional[MReputation] = None
def insert(self, new_reputation_user: MReputation) -> bool:
"""Insert a new Reputation User object
@@ -47,13 +52,13 @@ class Reputation:
Args:
uid (str): UID of the user
newNickname (str): New nickname
new_nickname (str): New nickname
Returns:
bool: True if updated
"""
reputation_obj = self.get_Reputation(uid)
reputation_obj = self.get_reputation(uid)
if reputation_obj is None:
return False
@@ -89,7 +94,7 @@ class Reputation:
return result
def get_Reputation(self, uidornickname: str) -> Optional[MReputation]:
def get_reputation(self, uidornickname: str) -> Optional[MReputation]:
"""Get The User Object model
Args:
@@ -116,7 +121,7 @@ class Reputation:
str|None: Return the UID
"""
reputation_obj = self.get_Reputation(uidornickname)
reputation_obj = self.get_reputation(uidornickname)
if reputation_obj is None:
return None
@@ -132,7 +137,7 @@ class Reputation:
Returns:
str|None: the nickname
"""
reputation_obj = self.get_Reputation(uidornickname)
reputation_obj = self.get_reputation(uidornickname)
if reputation_obj is None:
return None
@@ -149,7 +154,7 @@ class Reputation:
bool: True if exist
"""
reputation_obj = self.get_Reputation(uidornickname)
reputation_obj = self.get_reputation(uidornickname)
if isinstance(reputation_obj, MReputation):
return True

View File

@@ -1,4 +1,4 @@
from typing import Optional, Union, TYPE_CHECKING
from typing import Optional, TYPE_CHECKING
if TYPE_CHECKING:
from core.definition import MSasl
@@ -9,13 +9,18 @@ class Sasl:
DB_SASL: list['MSasl'] = []
def __init__(self, loader: 'Loader'):
"""
Args:
loader (Loader): The Loader instance.
"""
self.Logs = loader.Logs # logger
def insert_sasl_client(self, psasl: 'MSasl') -> bool:
"""Insert a new Sasl authentication
Args:
new_user (UserModel): New userModel object
psasl (MSasl): New userModel object
Returns:
bool: True if inserted
@@ -38,7 +43,7 @@ class Sasl:
"""Delete the User starting from the UID
Args:
uid (str): UID of the user
client_uid (str): UID of the user
Returns:
bool: True if deleted

View File

@@ -1,5 +1,5 @@
'''This class should never be reloaded.
'''
"""This class should never be reloaded.
"""
from logging import Logger
from threading import Timer, Thread, RLock
from socket import socket
@@ -8,7 +8,6 @@ from core.definition import MSModule, MAdmin
if TYPE_CHECKING:
from core.classes.user import User
from core.classes.admin import Admin
class Settings:
"""This Class will never be reloaded.
@@ -19,13 +18,17 @@ class Settings:
RUNNING_TIMERS: list[Timer] = []
RUNNING_THREADS: list[Thread] = []
RUNNING_SOCKETS: list[socket] = []
PERIODIC_FUNC: dict[object] = {}
PERIODIC_FUNC: dict[str, Any] = {}
LOCK: RLock = RLock()
CONSOLE: bool = False
MAIN_SERVER_HOSTNAME: str = None
MAIN_SERVER_ID: str = None
PROTOCTL_PREFIX_MODES_SIGNES : dict[str, str] = {}
PROTOCTL_PREFIX_SIGNES_MODES : dict[str, str] = {}
PROTOCTL_USER_MODES: list[str] = []
PROTOCTL_CHANNEL_MODES: list[str] = []
PROTOCTL_PREFIX: list[str] = []
SMOD_MODULES: list[MSModule] = []
@@ -42,7 +45,7 @@ class Settings:
__INSTANCE_OF_USER_UTILS: Optional['User'] = None
"""Instance of the User Utils class"""
__CURRENT_ADMIN: Optional['MAdmin'] = None
__CURRENT_ADMIN: Optional['MAdmin'] = None
"""The Current Admin Object Model"""
__LOGGER: Optional[Logger] = None
@@ -79,6 +82,7 @@ class Settings:
@property
def global_translation(self) -> dict[str, list[list[str]]]:
"""Get/set global translation variable"""
return self.__TRANSLATION
@global_translation.setter
@@ -87,6 +91,7 @@ class Settings:
@property
def global_lang(self) -> str:
"""Global default language."""
return self.__LANG
@global_lang.setter
@@ -103,6 +108,7 @@ class Settings:
@property
def current_admin(self) -> MAdmin:
"""Current admin data model."""
return self.__CURRENT_ADMIN
@current_admin.setter
@@ -111,6 +117,7 @@ class Settings:
@property
def global_logger(self) -> Logger:
"""Global logger Instance"""
return self.__LOGGER
@global_logger.setter

View File

@@ -11,9 +11,13 @@ if TYPE_CHECKING:
class Translation:
def __init__(self, loader: 'Loader') -> None:
"""
Args:
loader (Loader): The Loader instance.
"""
self.Logs = loader.Logs
self.Settings = loader.Settings
return None
def get_translation(self) -> dict[str, list[list[str]]]:
try:

View File

@@ -55,7 +55,7 @@ class User:
return False
user_obj.nickname = new_nickname
self.Logs.debug(f"UID ({uid}) has benn update with new nickname ({new_nickname}).")
return True
def update_mode(self, uidornickname: str, modes: str) -> bool:

View File

@@ -369,4 +369,14 @@ class MSasl(MainModel):
fingerprint: Optional[str] = None
language: str = "EN"
auth_success: bool = False
level: int = 0
level: int = 0
@dataclass
class MRegister:
command_name: str
func: Any
@dataclass
class MIrcdCommand:
command_name: str
func: Any

View File

@@ -7,12 +7,12 @@ from ssl import SSLSocket
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, Optional, Union
from core.classes import rehash
from core.loader import Loader
from core.classes.protocol import Protocol
from core.classes.protocols.interface import IProtocol
from core.utils import tr
if TYPE_CHECKING:
from core.definition import MSasl
from core.loader import Loader
class Irc:
_instance = None
@@ -24,7 +24,7 @@ class Irc:
return cls._instance
def __init__(self, loader: Loader) -> 'Irc':
def __init__(self, loader: 'Loader'):
# Loader class
self.Loader = loader
@@ -135,7 +135,7 @@ class Irc:
##############################################
# CONNEXION IRC #
##############################################
def init_irc(self, ircInstance: 'Irc') -> None:
def init_irc(self) -> None:
"""Create a socket and connect to irc server
Args:
@@ -143,8 +143,8 @@ class Irc:
"""
try:
self.init_service_user()
self.Utils.create_socket(ircInstance)
self.__connect_to_irc(ircInstance)
self.Utils.create_socket(self)
self.__connect_to_irc()
except AssertionError as ae:
self.Logs.critical(f'Assertion error: {ae}')
@@ -161,23 +161,20 @@ class Irc:
))
return None
def __connect_to_irc(self, ircInstance: 'Irc') -> None:
def __connect_to_irc(self) -> None:
try:
self.init_service_user()
self.ircObject = ircInstance # créer une copie de l'instance Irc
self.Protocol = Protocol(
protocol=self.Config.SERVEUR_PROTOCOL,
ircInstance=self.ircObject
).Protocol
self.Protocol: 'IProtocol' = self.Loader.PFactory.get()
self.Protocol.register_command()
self.Protocol.send_link() # Etablir le link en fonction du protocol choisi
self.signal = True # Une variable pour initier la boucle infinie
self.join_saved_channels() # Join existing channels
self.ModuleUtils.db_load_all_existing_modules(self)
# self.join_saved_channels() # Join existing channels
# self.ModuleUtils.db_load_all_existing_modules(self)
while self.signal:
try:
if self.Config.DEFENDER_RESTART == 1:
rehash.restart_service(self.ircObject)
rehash.restart_service(self)
# 4072 max what the socket can grab
buffer_size = self.IrcSocket.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
@@ -202,9 +199,11 @@ class Irc:
self.Logs.error(f"SSLEOFError __connect_to_irc: {soe} - {data}")
except ssl.SSLError as se:
self.Logs.error(f"SSLError __connect_to_irc: {se} - {data}")
sys.exit(1)
sys.exit(-1)
except OSError as oe:
self.Logs.error(f"SSLError __connect_to_irc: {oe} - {data}")
self.Logs.error(f"SSLError __connect_to_irc: {oe} {oe.errno}")
if oe.errno == 10053:
sys.exit(-1)
except (socket.error, ConnectionResetError):
self.Logs.debug("Connexion reset")
@@ -220,7 +219,7 @@ class Irc:
except ssl.SSLEOFError as soe:
self.Logs.error(f"SSLEOFError: {soe}")
except AttributeError as atte:
self.Logs.critical(f"AttributeError: {atte}")
self.Logs.critical(f"AttributeError: {atte}", exc_info=True)
except Exception as e:
self.Logs.critical(f"General Error: {e}", exc_info=True)
@@ -261,9 +260,9 @@ class Irc:
# This is only to reference the method
return None
##############################################
# --------------------------------------------
# FIN CONNEXION IRC #
##############################################
# --------------------------------------------
def build_command(self, level: int, module_name: str, command_name: str, command_description: str) -> None:
"""This method build the commands variable
@@ -409,57 +408,58 @@ class Irc:
return None
def create_defender_user(self, nickname: str, level: int, password: str) -> str:
def create_defender_user(self, sender: str, new_admin: str, new_level: int, new_password: str) -> bool:
"""Create a new admin user for defender
Args:
sender (str): The current admin sending the request
new_admin (str): The new admin to create
new_level (int): The level of the admin
new_password (str): The clear password
Returns:
bool: True if created.
"""
# > addaccess [nickname] [level] [password]
dnick = self.Config.SERVICE_NICKNAME
p = self.Protocol
get_user = self.User.get_user(nickname)
level = self.Base.convert_to_int(level)
password = password
get_user = self.User.get_user(new_admin)
level = self.Base.convert_to_int(new_level)
password = new_password
if get_user is None:
response = f'This nickname {nickname} does not exist, it is not possible to create this user'
self.Logs.warning(response)
return response
response = tr("The nickname (%s) is not currently connected! please create a new admin when the nickname is connected to the network!", new_admin)
p.send_notice(dnick, sender, response)
self.Logs.debug(f"New admin {new_admin} sent by {sender} is not connected")
return False
if level is None:
response = f'The level [{level}] must be a number from 1 to 4'
self.Logs.warning(response)
return response
if level > 4:
response = "Impossible d'ajouter un niveau > 4"
self.Logs.warning(response)
return response
if level is None or level > 4 or level == 0:
p.send_notice(dnick, sender, tr("The level (%s) must be a number from 1 to 4", level))
self.Logs.debug(f"Level must a number between 1 to 4 (sent by {sender})")
return False
nickname = get_user.nickname
response = ''
hostname = get_user.hostname
vhost = get_user.vhost
spassword = self.Loader.Utils.hash_password(password)
mes_donnees = {'admin': nickname}
query_search_user = f"SELECT id FROM {self.Config.TABLE_ADMIN} WHERE user=:admin"
r = self.Base.db_execute_query(query_search_user, mes_donnees)
exist_user = r.fetchone()
# On verifie si le user exist dans la base
if not exist_user:
mes_donnees = {'datetime': self.Utils.get_sdatetime(), 'user': nickname, 'password': spassword, 'hostname': hostname, 'vhost': vhost, 'level': level}
# Check if the user already exist
if not self.Admin.db_is_admin_exist(nickname):
mes_donnees = {'datetime': self.Utils.get_sdatetime(), 'user': nickname, 'password': spassword, 'hostname': hostname, 'vhost': vhost, 'level': level, 'language': 'EN'}
self.Base.db_execute_query(f'''INSERT INTO {self.Config.TABLE_ADMIN}
(createdOn, user, password, hostname, vhost, level) VALUES
(:datetime, :user, :password, :hostname, :vhost, :level)
(createdOn, user, password, hostname, vhost, level, language) VALUES
(:datetime, :user, :password, :hostname, :vhost, :level, :language)
''', mes_donnees)
response = f"{nickname} ajouté en tant qu'administrateur de niveau {level}"
self.Protocol.send_notice(nick_from=self.Config.SERVICE_NICKNAME, nick_to=nickname, msg=response)
self.Logs.info(response)
return response
p.send_notice(dnick, sender, tr("New admin (%s) has been added with level %s", nickname, level))
self.Logs.info(f"A new admin ({nickname}) has been created by {sender}!")
return True
else:
response = f'{nickname} Existe déjà dans les users enregistrés'
self.Protocol.send_notice(nick_from=self.Config.SERVICE_NICKNAME, nick_to=nickname, msg=response)
self.Logs.info(response)
return response
p.send_notice(dnick, sender, tr("The nickname (%s) Already exist!", nickname))
self.Logs.info(f"The nickname {nickname} already exist! (sent by {sender})")
return False
def thread_check_for_new_version(self, fromuser: str) -> None:
dnickname = self.Config.SERVICE_NICKNAME
@@ -489,116 +489,12 @@ class Irc:
return None
self.Logs.debug(f">> {self.Utils.hide_sensitive_data(original_response)}")
# parsed_protocol = self.Protocol.parse_server_msg(original_response.copy())
pos, parsed_protocol = self.Protocol.get_ircd_protocol_poisition(cmd=original_response)
match parsed_protocol:
case 'PING':
self.Protocol.on_server_ping(serverMsg=original_response)
pos, parsed_protocol = self.Protocol.get_ircd_protocol_poisition(cmd=original_response, log=True)
case 'SERVER':
self.Protocol.on_server(serverMsg=original_response)
case 'SJOIN':
self.Protocol.on_sjoin(serverMsg=original_response)
case 'EOS':
self.Protocol.on_eos(serverMsg=original_response)
case 'UID':
try:
self.Protocol.on_uid(serverMsg=original_response)
for module in self.ModuleUtils.model_get_loaded_modules().copy():
module.class_instance.cmd(original_response)
# SASL authentication
# ['@s2s-md/..', ':001', 'UID', 'adator__', '0', '1755987444', '...', 'desktop-h1qck20.mshome.net', '001XLTT0U', '0', '+iwxz', '*', 'Clk-EC2256B2.mshome.net', 'rBKAAQ==', ':...']
dnickname = self.Config.SERVICE_NICKNAME
dchanlog = self.Config.SERVICE_CHANLOG
uid = original_response[8]
nickname = original_response[3]
sasl_obj = self.Sasl.get_sasl_obj(uid)
if sasl_obj:
if sasl_obj.auth_success:
self.insert_db_admin(sasl_obj.client_uid, sasl_obj.username, sasl_obj.level, sasl_obj.language)
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=tr("[ %sSASL AUTH%s ] - %s (%s) is now connected successfuly to %s", GREEN, NOGC, nickname, sasl_obj.username, dnickname),
channel=dchanlog)
self.Protocol.send_notice(nick_from=dnickname, nick_to=nickname, msg=tr("Successfuly connected to %s", dnickname))
else:
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=tr("[ %sSASL AUTH%s ] - %s provided a wrong password for this username %s", RED, NOGC, nickname, sasl_obj.username),
channel=dchanlog)
self.Protocol.send_notice(nick_from=dnickname, nick_to=nickname, msg=tr("Wrong password!"))
# Delete sasl object!
self.Sasl.delete_sasl_client(uid)
return None
except Exception as err:
self.Logs.error(f'General Error: {err}')
case 'QUIT':
self.Protocol.on_quit(serverMsg=original_response)
case 'PROTOCTL':
self.Protocol.on_protoctl(serverMsg=original_response)
case 'SVS2MODE':
self.Protocol.on_svs2mode(serverMsg=original_response)
case 'SQUIT':
self.Protocol.on_squit(serverMsg=original_response)
case 'PART':
self.Protocol.on_part(serverMsg=original_response)
case 'VERSION':
self.Protocol.on_version_msg(serverMsg=original_response)
case 'UMODE2':
self.Protocol.on_umode2(serverMsg=original_response)
case 'NICK':
self.Protocol.on_nick(serverMsg=original_response)
case 'REPUTATION':
self.Protocol.on_reputation(serverMsg=original_response)
case 'SMOD':
self.Protocol.on_smod(original_response)
case 'SASL':
sasl_response = self.Protocol.on_sasl(original_response, self.Sasl)
self.on_sasl_authentication_process(sasl_response)
case 'MD':
self.Protocol.on_md(serverMsg=original_response)
case 'PRIVMSG':
self.Protocol.on_privmsg(serverMsg=original_response)
case 'SLOG': # TODO
self.Logs.debug(f"[!] TO HANDLE: {parsed_protocol}")
case 'PONG': # TODO
self.Logs.debug(f"[!] TO HANDLE: {parsed_protocol}")
case 'MODE': # TODO
#['@msgid=d0ySx56Yd0nc35oHts2SkC-/J9mVUA1hfM6...', ':001', 'MODE', '#a', '+nt', '1723207536']
#['@unrealircd.org/userhost=adator@localhost;...', ':001LQ0L0C', 'MODE', '#services', '-l']
self.Logs.debug(f"[!] TO HANDLE: {parsed_protocol}")
case '320': # TODO
#:irc.deb.biz.st 320 PyDefender IRCParis07 :is in security-groups: known-users,webirc-users,tls-and-known-users,tls-users
self.Logs.debug(f"[!] TO HANDLE: {parsed_protocol}")
case '318': # TODO
#:irc.deb.biz.st 318 PyDefender IRCParis93 :End of /WHOIS list.
self.Logs.debug(f"[!] TO HANDLE: {parsed_protocol}")
case None:
self.Logs.debug(f"[!] TO HANDLE: {original_response}")
for parsed in self.Protocol.Handler.get_ircd_commands():
if parsed.command_name.upper() == parsed_protocol:
parsed.func(original_response)
if len(original_response) > 2:
if original_response[2] != 'UID':
@@ -681,6 +577,9 @@ class Irc:
channel=dchanlog
)
self.Protocol.send_notice(dnickname, fromuser, tr("You have been successfully disconnected from %s", dnickname))
return None
case 'firstauth':
# firstauth OWNER_NICKNAME OWNER_PASSWORD
current_nickname = self.User.get_nickname(fromuser)
@@ -786,8 +685,9 @@ class Irc:
if admin_obj:
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {GREEN}{str(current_command).upper()}{NOGC} ] - You are already connected to {dnickname}",
msg=f"[ {GREEN}{str(current_command).upper()}{NOGC} ] - {fromuser} is already connected to {dnickname}",
channel=dchanlog)
self.Protocol.send_notice(dnickname, fromuser, tr("You are already connected to %s", dnickname))
return None
mes_donnees = {'user': user_to_log, 'password': self.Loader.Utils.hash_password(password)}
@@ -818,15 +718,14 @@ class Irc:
if len(cmd) < 4:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Right command : /msg {dnickname} addaccess [nickname] [level] [password]")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"level: from 1 to 4")
return None
newnickname = cmd[1]
newlevel = self.Base.int_if_possible(cmd[2])
password = cmd[3]
new_admin = str(cmd[1])
level = self.Base.int_if_possible(cmd[2])
password = str(cmd[3])
response = self.create_defender_user(newnickname, newlevel, password)
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"{response}")
self.Logs.info(response)
self.create_defender_user(fromuser, new_admin, level, password)
return None
except IndexError as ie:
self.Logs.error(f'_hcmd addaccess: {ie}')
@@ -1115,7 +1014,7 @@ class Irc:
except KeyError as ke:
self.Logs.error(f"Key Error: {ke} - list recieved: {cmd}")
except Exception as err:
self.Logs.error(f"General Error: {ke} - list recieved: {cmd}")
self.Logs.error(f"General Error: {err} - list recieved: {cmd}", exc_info=True)
case 'unload':
# unload mod_defender
@@ -1151,7 +1050,8 @@ class Irc:
self.Base.execute_periodic_action()
for chan_name in self.Channel.UID_CHANNEL_DB:
self.Protocol.send_mode_chan(chan_name.name, '-l')
# self.Protocol.send_mode_chan(chan_name.name, '-l')
self.Protocol.send_set_mode('-l', channel_name=chan_name.name)
for client in self.Client.CLIENT_DB:
self.Protocol.send_svslogout(client)
@@ -1176,7 +1076,7 @@ class Irc:
self.Config.DEFENDER_INIT = 1 # set init to 1 saying that the service will be re initiated
case 'rehash':
rehash.rehash_service(self.ircObject, fromuser)
rehash.rehash_service(self, fromuser)
return None
case 'show_modules':
@@ -1204,7 +1104,7 @@ class Irc:
self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg=tr('%s - %sNot Loaded%s', module, GREEN, NOGC)
msg=tr('%s - %sNot Loaded%s', module, RED, NOGC)
)
case 'show_timers':

View File

@@ -8,6 +8,8 @@ import core.base as base_mod
import core.module as module_mod
import core.classes.commands as commands_mod
import core.classes.config as conf_mod
import core.irc as irc
import core.classes.protocols.factory as factory
class Loader:
@@ -63,4 +65,8 @@ class Loader:
self.Sasl: sasl.Sasl = sasl.Sasl(self)
self.Irc: irc.Irc = irc.Irc(self)
self.PFactory: factory.ProtocolFactorty = factory.ProtocolFactorty(self.Irc)
self.Logs.debug(self.Utils.tr("Loader %s success", __name__))

View File

@@ -15,7 +15,7 @@ class ServiceLogging:
self.SERVER_PREFIX = None
self.LOGGING_CONSOLE = True
self.LOG_FILTERS: list[str] = ['PING', f":{self.SERVER_PREFIX}auth", "['PASS'"]
self.LOG_FILTERS: list[str] = ["PING", f":{self.SERVER_PREFIX}auth", "['PASS'"]
self.file_handler = None
self.stdout_handler = None

View File

@@ -206,6 +206,7 @@ class Module:
module = self.model_get_module(module_name)
if module is None:
self.__Logs.debug(f"[ UNLOAD MODULE ERROR ] This module {module_name} is not loaded!")
self.db_delete_module(module_name)
uplink.Protocol.send_priv_msg(
nick_from=self.__Config.SERVICE_NICKNAME,
msg=f"[ {red}UNLOAD MODULE ERROR{nogc} ] This module {module_name} is not loaded!",

View File

@@ -36,7 +36,7 @@ def tr(message: str, *args) -> str:
is_args_available = True if args else False
g = global_settings
try:
# Access to user object ==> global_instance.get_user_option
# Access to admin object
client_language = g.current_admin.language if g.current_admin else g.global_lang
if count_args != count_placeholder:
@@ -56,7 +56,7 @@ def tr(message: str, *args) -> str:
return message % args if is_args_available else message
except KeyError as ke:
g.global_logger.error(f"Key Error: {ke}")
g.global_logger.error(f"KeyError: {ke}")
return message % args if is_args_available else message
except Exception as err:
@@ -143,6 +143,8 @@ def create_socket(uplink: 'Irc') -> None:
uplink.Logs.critical(f"[OS Error]: {oe}")
if 'connection refused' in str(oe).lower():
sys.exit(oe)
if oe.errno == 10053:
sys.exit(oe)
except AttributeError as ae:
uplink.Logs.critical(f"AttributeError: {ae}")
@@ -241,4 +243,4 @@ def hide_sensitive_data(srvmsg: list[str]) -> list[str]:
return srv_msg
except ValueError:
return srvmsg
return srvmsg

View File

@@ -1,7 +1,7 @@
from core import installation
#############################################
# @Version : 6.2 #
# @Version : 6.3 #
# Requierements : #
# Python3.10 or higher #
# SQLAlchemy, requests, psutil #
@@ -11,14 +11,17 @@ from core import installation
try:
installation.Install()
# installation.Install()
from core.loader import Loader
from core.irc import Irc
ircInstance = Irc(Loader())
ircInstance.init_irc(ircInstance)
loader = Loader()
loader.Irc.init_irc()
# from core.irc import Irc
# ircInstance = Irc(Loader())
# ircInstance.init_irc(ircInstance)
except AssertionError as ae:
print(f'Assertion Error -> {ae}')
except KeyboardInterrupt as k:
ircInstance.Base.execute_periodic_action()
# ircInstance.Base.execute_periodic_action()
...

View File

@@ -78,11 +78,11 @@ class Clone:
self.__load_module_configuration()
self.Channel.db_query_channel(action='add', module_name=self.module_name, channel_name=self.Config.CLONE_CHANNEL)
self.Protocol.send_join_chan(self.Config.SERVICE_NICKNAME, self.Config.CLONE_CHANNEL)
self.Protocol.send_sjoin(self.Config.CLONE_CHANNEL)
self.Protocol.send_set_mode('+o', nickname=self.Config.SERVICE_NICKNAME, channel_name=self.Config.CLONE_CHANNEL)
self.Protocol.send_set_mode('+nts', channel_name=self.Config.CLONE_CHANNEL)
self.Protocol.send_set_mode('+k', channel_name=self.Config.CLONE_CHANNEL, params=self.Config.CLONE_CHANNEL_PASSWORD)
self.Protocol.send2socket(f":{self.Config.SERVICE_NICKNAME} SAMODE {self.Config.CLONE_CHANNEL} +o {self.Config.SERVICE_NICKNAME}")
self.Protocol.send2socket(f":{self.Config.SERVICE_NICKNAME} MODE {self.Config.CLONE_CHANNEL} +nts")
self.Protocol.send2socket(f":{self.Config.SERVICE_NICKNAME} MODE {self.Config.CLONE_CHANNEL} +k {self.Config.CLONE_CHANNEL_PASSWORD}")
def __create_tables(self) -> None:
"""Methode qui va créer la base de donnée si elle n'existe pas.
@@ -127,8 +127,8 @@ class Clone:
self.Settings.set_cache('UID_CLONE_DB', self.Clone.UID_CLONE_DB)
self.Channel.db_query_channel(action='del', module_name=self.module_name, channel_name=self.Config.CLONE_CHANNEL)
self.Protocol.send2socket(f":{self.Config.SERVICE_NICKNAME} MODE {self.Config.CLONE_CHANNEL} -nts")
self.Protocol.send2socket(f":{self.Config.SERVICE_NICKNAME} MODE {self.Config.CLONE_CHANNEL} -k {self.Config.CLONE_CHANNEL_PASSWORD}")
self.Protocol.send_set_mode('-nts', channel_name=self.Config.CLONE_CHANNEL)
self.Protocol.send_set_mode('-k', channel_name=self.Config.CLONE_CHANNEL)
self.Protocol.send_part_chan(self.Config.SERVICE_NICKNAME, self.Config.CLONE_CHANNEL)
self.Irc.Commands.drop_command_by_module(self.module_name)
@@ -148,7 +148,8 @@ class Clone:
match command:
case 'PRIVMSG':
return self.Utils.handle_on_privmsg(self, cmd)
self.Utils.handle_on_privmsg(self, cmd)
return None
case 'QUIT':
return None

View File

@@ -174,17 +174,17 @@ def create_new_clone(uplink: 'Clone', faker_instance: 'Faker', group: str = 'Def
return True
def handle_on_privmsg(uplink: 'Clone', srvmsg: list[str]):
def handle_on_privmsg(uplink: 'Clone', srvmsg: list[str]) -> None:
uid_sender = uplink.Irc.Utils.clean_uid(srvmsg[1])
parser = uplink.Protocol.parse_privmsg(srvmsg)
uid_sender = uplink.Irc.Utils.clean_uid(parser.get('uid_sender', None))
senderObj = uplink.User.get_user(uid_sender)
if senderObj.hostname in uplink.Config.CLONE_LOG_HOST_EXEMPT:
return
if not senderObj is None:
senderMsg = ' '.join(srvmsg[4:])
clone_obj = uplink.Clone.get_clone(srvmsg[3])
if senderObj is not None:
if senderObj.hostname in uplink.Config.CLONE_LOG_HOST_EXEMPT:
return
senderMsg = parser.get('message', None)
clone_obj = uplink.Clone.get_clone(parser.get('uid_reciever', None))
if clone_obj is None:
return
@@ -196,3 +196,5 @@ def handle_on_privmsg(uplink: 'Clone', srvmsg: list[str]):
msg=final_message,
channel=uplink.Config.CLONE_CHANNEL
)
return None

View File

@@ -53,16 +53,16 @@ class Command:
# Module Utils
self.mod_utils = utils
self.Irc.build_command(1, self.module_name, 'join', 'Join a channel')
self.Irc.build_command(1, self.module_name, 'assign', 'Assign a user to a role or task')
self.Irc.build_command(1, self.module_name, 'part', 'Leave a channel')
self.Irc.build_command(1, self.module_name, 'unassign', 'Remove a user from a role or task')
self.Irc.build_command(1, self.module_name, 'owner', 'Give channel ownership to a user')
self.Irc.build_command(1, self.module_name, 'deowner', 'Remove channel ownership from a user')
self.Irc.build_command(1, self.module_name, 'protect', 'Protect a user from being kicked')
self.Irc.build_command(1, self.module_name, 'deprotect', 'Remove protection from a user')
self.Irc.build_command(1, self.module_name, 'op', 'Grant operator privileges to a user')
self.Irc.build_command(1, self.module_name, 'deop', 'Remove operator privileges from a user')
self.Irc.build_command(2, self.module_name, 'join', 'Join a channel')
self.Irc.build_command(2, self.module_name, 'assign', 'Assign a user to a role or task')
self.Irc.build_command(2, self.module_name, 'part', 'Leave a channel')
self.Irc.build_command(2, self.module_name, 'unassign', 'Remove a user from a role or task')
self.Irc.build_command(2, self.module_name, 'owner', 'Give channel ownership to a user')
self.Irc.build_command(2, self.module_name, 'deowner', 'Remove channel ownership from a user')
self.Irc.build_command(2, self.module_name, 'protect', 'Protect a user from being kicked')
self.Irc.build_command(2, self.module_name, 'deprotect', 'Remove protection from a user')
self.Irc.build_command(2, self.module_name, 'op', 'Grant operator privileges to a user')
self.Irc.build_command(2, self.module_name, 'deop', 'Remove operator privileges from a user')
self.Irc.build_command(1, self.module_name, 'halfop', 'Grant half-operator privileges to a user')
self.Irc.build_command(1, self.module_name, 'dehalfop', 'Remove half-operator privileges from a user')
self.Irc.build_command(1, self.module_name, 'voice', 'Grant voice privileges to a user')

View File

@@ -134,17 +134,20 @@ def set_operation(uplink: 'Command', cmd: list[str], channel_name: Optional[str]
return False
if len(cmd) == 1:
uplink.Protocol.send2socket(f":{dnickname} MODE {channel_name} {mode} {client}")
# uplink.Protocol.send2socket(f":{service_id} MODE {channel_name} {mode} {client}")
uplink.Protocol.send_set_mode(mode, nickname=client, channel_name=channel_name)
return None
# deop nickname
if len(cmd) == 2:
nickname = cmd[1]
uplink.Protocol.send2socket(f":{service_id} MODE {channel_name} {mode} {nickname}")
# uplink.Protocol.send2socket(f":{service_id} MODE {channel_name} {mode} {nickname}")
uplink.Protocol.send_set_mode(mode, nickname=nickname, channel_name=channel_name)
return None
nickname = cmd[2]
uplink.Protocol.send2socket(f":{service_id} MODE {channel_name} {mode} {nickname}")
# uplink.Protocol.send2socket(f":{service_id} MODE {channel_name} {mode} {nickname}")
uplink.Protocol.send_set_mode(mode, nickname=nickname, channel_name=channel_name)
return None
def set_ban(uplink: 'Command', cmd: list[str], action: Literal['+', '-'], client: str) -> None:

View File

@@ -365,7 +365,7 @@ class Defender:
release_code = cmd[1]
jailed_nickname = self.User.get_nickname(fromuser)
jailed_UID = self.User.get_uid(fromuser)
get_reputation = self.Reputation.get_Reputation(jailed_UID)
get_reputation = self.Reputation.get_reputation(jailed_UID)
if get_reputation is None:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=" No code is requested ...")
@@ -551,7 +551,7 @@ class Defender:
msg=f"This nickname ({str(cmd[2])}) is not connected to the network!")
return None
client_to_release = self.Reputation.get_Reputation(client_obj.uid)
client_to_release = self.Reputation.get_reputation(client_obj.uid)
if client_to_release is None:
p.send_notice(nick_from=dnickname,

View File

@@ -87,7 +87,7 @@ def handle_on_sjoin(uplink: 'Defender', srvmsg: list[str]):
return
if confmodel.reputation == 1:
get_reputation = irc.Reputation.get_Reputation(parsed_UID)
get_reputation = irc.Reputation.get_reputation(parsed_UID)
if parsed_chan != gconfig.SALON_JAIL:
p.send2socket(f":{gconfig.SERVICE_ID} MODE {parsed_chan} +b ~security-group:unknown-users")
@@ -138,18 +138,20 @@ def handle_on_slog(uplink: 'Defender', srvmsg: list[str]):
return None
def handle_on_nick(uplink: 'Defender', srvmsg: list[str]):
"""_summary_
"""Handle nickname changes.
>>> srvmsg = ['@unrealircd.org...', ':001MZQ0RB', 'NICK', 'newnickname', '1754663712']
>>> [':97KAAAAAC', 'NICK', 'testinspir', '1757360740']
Args:
irc_instance (Irc): The Irc instance
srvmsg (list[str]): The Server MSG
confmodel (ModConfModel): The Module Configuration
"""
uid = uplink.Loader.Utils.clean_uid(str(srvmsg[1]))
p = uplink.Protocol
parser = p.parse_nick(srvmsg)
uid = uplink.Loader.Utils.clean_uid(parser.get('uid', None))
confmodel = uplink.ModConfig
get_reputation = uplink.Reputation.get_Reputation(uid)
get_reputation = uplink.Reputation.get_reputation(uid)
jail_salon = uplink.Config.SALON_JAIL
service_id = uplink.Config.SERVICE_ID
@@ -159,7 +161,7 @@ def handle_on_nick(uplink: 'Defender', srvmsg: list[str]):
# Update the new nickname
oldnick = get_reputation.nickname
newnickname = srvmsg[3]
newnickname = parser.get('newnickname', None)
get_reputation.nickname = newnickname
# If ban in all channel is ON then unban old nickname an ban the new nickname
@@ -170,20 +172,21 @@ def handle_on_nick(uplink: 'Defender', srvmsg: list[str]):
p.send2socket(f":{service_id} MODE {chan.name} +b {newnickname}!*@*")
def handle_on_quit(uplink: 'Defender', srvmsg: list[str]):
"""_summary_
"""Handle on quit message
>>> srvmsg = ['@unrealircd.org...', ':001MZQ0RB', 'QUIT', ':Quit:', 'quit message']
Args:
uplink (Irc): The Defender Module instance
srvmsg (list[str]): The Server MSG
"""
p = uplink.Protocol
parser = p.parse_quit(srvmsg)
confmodel = uplink.ModConfig
ban_all_chan = uplink.Base.int_if_possible(confmodel.reputation_ban_all_chan)
final_UID = uplink.Loader.Utils.clean_uid(str(srvmsg[1]))
final_UID = uplink.Loader.Utils.clean_uid(str(parser.get('uid', None)))
jail_salon = uplink.Config.SALON_JAIL
service_id = uplink.Config.SERVICE_ID
get_user_reputation = uplink.Reputation.get_Reputation(final_UID)
get_user_reputation = uplink.Reputation.get_reputation(final_UID)
if get_user_reputation is not None:
final_nickname = get_user_reputation.nickname
@@ -204,6 +207,7 @@ def handle_on_uid(uplink: 'Defender', srvmsg: list[str]):
uplink (Defender): The Defender instance
srvmsg (list[str]): The Server MSG
"""
parser_uid = uplink.Protocol.parse_uid(srvmsg)
gconfig = uplink.Config
irc = uplink.Irc
confmodel = uplink.ModConfig
@@ -213,10 +217,10 @@ def handle_on_uid(uplink: 'Defender', srvmsg: list[str]):
return None
# Get User information
_User = irc.User.get_user(str(srvmsg[8]))
_User = irc.User.get_user(parser_uid.get('uid', None))
if _User is None:
irc.Logs.warning(f'This UID: [{srvmsg[8]}] is not available please check why')
irc.Logs.warning(f'This UID: [{parser_uid.get("uid", None)}] is not available please check why')
return
# If user is not service or IrcOp then scan them
@@ -249,7 +253,8 @@ def handle_on_uid(uplink: 'Defender', srvmsg: list[str]):
####################
# ACTION FUNCTIONS #
####################
# [:<sid>] UID <uid> <ts> <nick> <real-host> <displayed-host> <real-user> <ip> <signon> <modes> [<mode-parameters>]+ :<real>
# [:<sid>] UID nickname hopcount timestamp username hostname uid servicestamp umodes virthost cloakedhost ip :gecos
def action_on_flood(uplink: 'Defender', srvmsg: list[str]):
confmodel = uplink.ModConfig
@@ -318,7 +323,7 @@ def action_add_reputation_sanctions(uplink: 'Defender', jailed_uid: str ):
p = uplink.Protocol
confmodel = uplink.ModConfig
get_reputation = irc.Reputation.get_Reputation(jailed_uid)
get_reputation = irc.Reputation.get_reputation(jailed_uid)
if get_reputation is None:
irc.Logs.warning(f'UID {jailed_uid} has not been found')
@@ -404,7 +409,7 @@ def action_apply_reputation_santions(uplink: 'Defender') -> None:
# Suppression des éléments dans {UID_DB} et {REPUTATION_DB}
for chan in irc.Channel.UID_CHANNEL_DB:
if chan.name != salon_jail and ban_all_chan == 1:
get_user_reputation = irc.Reputation.get_Reputation(uid)
get_user_reputation = irc.Reputation.get_reputation(uid)
p.send2socket(f":{service_id} MODE {chan.name} -b {get_user_reputation.nickname}!*@*")
# Lorsqu'un utilisateur quitte, il doit être supprimé de {UID_DB}.

View File

@@ -1,13 +1,13 @@
import logging
import asyncio
from unrealircd_rpc_py.objects.Definition import LiveRPCResult
import mods.jsonrpc.utils as utils
import mods.jsonrpc.threads as thds
from time import sleep
from types import SimpleNamespace
from typing import TYPE_CHECKING
from dataclasses import dataclass
from unrealircd_rpc_py.Live import LiveWebsocket, LiveUnixSocket
from unrealircd_rpc_py.Loader import Loader
from unrealircd_rpc_py.ConnectionFactory import ConnectionFactory
from unrealircd_rpc_py.LiveConnectionFactory import LiveConnectionFactory
if TYPE_CHECKING:
from core.irc import Irc
@@ -85,43 +85,24 @@ class Jsonrpc():
self.__load_module_configuration()
# End of mandatory methods you can start your customization #
self.UnrealIrcdRpcLive: LiveWebsocket = LiveWebsocket(
url=self.Config.JSONRPC_URL,
username=self.Config.JSONRPC_USER,
password=self.Config.JSONRPC_PASSWORD,
callback_object_instance=self,
callback_method_or_function_name='callback_sent_to_irc'
)
if self.UnrealIrcdRpcLive.get_error.code != 0:
self.Logs.error(f"{self.UnrealIrcdRpcLive.get_error.message} ({self.UnrealIrcdRpcLive.get_error.code})")
try:
self.Rpc = ConnectionFactory(self.Config.DEBUG_LEVEL).get(self.Config.JSONRPC_METHOD)
self.LiveRpc = LiveConnectionFactory(self.Config.DEBUG_LEVEL).get(self.Config.JSONRPC_METHOD)
self.Rpc.setup({'url': self.Config.JSONRPC_URL, 'username': self.Config.JSONRPC_USER, 'password': self.Config.JSONRPC_PASSWORD})
self.LiveRpc.setup({'url': self.Config.JSONRPC_URL, 'username': self.Config.JSONRPC_USER, 'password': self.Config.JSONRPC_PASSWORD,
'callback_object_instance' : self, 'callback_method_or_function_name': 'callback_sent_to_irc'})
if self.ModConfig.jsonrpc == 1:
self.Base.create_thread(func=self.Threads.thread_subscribe, func_args=(self, ), run_once=True)
return None
except Exception as err:
self.Protocol.send_priv_msg(
nick_from=self.Config.SERVICE_NICKNAME,
msg=f"[{self.Config.COLORS.red}ERROR{self.Config.COLORS.nogc}] {self.UnrealIrcdRpcLive.get_error.message}",
msg=f"[{self.Config.COLORS.red}JSONRPC ERROR{self.Config.COLORS.nogc}] {err.__str__()}",
channel=self.Config.SERVICE_CHANLOG
)
raise Exception(f"[LIVE-JSONRPC ERROR] {self.UnrealIrcdRpcLive.get_error.message}")
self.Rpc: Loader = Loader(
req_method=self.Config.JSONRPC_METHOD,
url=self.Config.JSONRPC_URL,
username=self.Config.JSONRPC_USER,
password=self.Config.JSONRPC_PASSWORD
)
if self.Rpc.get_error.code != 0:
self.Logs.error(f"{self.Rpc.get_error.message} ({self.Rpc.get_error.code})")
self.Protocol.send_priv_msg(
nick_from=self.Config.SERVICE_NICKNAME,
msg=f"[{self.Config.COLORS.red}JSONRPC ERROR{self.Config.COLORS.nogc}] {self.Rpc.get_error.message}",
channel=self.Config.SERVICE_CHANLOG
)
raise Exception(f"[JSONRPC ERROR] {self.Rpc.get_error.message}")
if self.ModConfig.jsonrpc == 1:
self.Base.create_thread(func=self.Threads.thread_subscribe, func_args=(self, ), run_once=True)
return None
)
self.Logs.error(f"JSONRPC ERROR: {err.__str__()}")
def __create_tables(self) -> None:
"""Methode qui va créer la base de donnée si elle n'existe pas.
@@ -143,7 +124,7 @@ class Jsonrpc():
self.Base.db_execute_query(table_logs)
return None
def callback_sent_to_irc(self, response: SimpleNamespace) -> None:
def callback_sent_to_irc(self, response: LiveRPCResult) -> None:
dnickname = self.Config.SERVICE_NICKNAME
dchanlog = self.Config.SERVICE_CHANLOG
@@ -152,29 +133,19 @@ class Jsonrpc():
bold = self.Config.COLORS.bold
red = self.Config.COLORS.red
if self.UnrealIrcdRpcLive.get_error.code != 0:
if response.error.code != 0:
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[{bold}{red}JSONRPC ERROR{nogc}{bold}] {self.UnrealIrcdRpcLive.get_error.message}",
msg=f"[{bold}{red}JSONRPC ERROR{nogc}{bold}] {response.error.message} ({response.error.code})",
channel=dchanlog)
return None
if hasattr(response, 'error'):
if response.error.code != 0:
if isinstance(response.result, bool):
if response.result:
self.Protocol.send_priv_msg(
nick_from=self.Config.SERVICE_NICKNAME,
msg=f"[{bold}{red}JSONRPC{nogc}{bold}] JSONRPC Event activated on {self.Config.JSONRPC_URL}",
channel=dchanlog)
return None
if hasattr(response, 'result'):
if isinstance(response.result, bool):
if response.result:
self.Protocol.send_priv_msg(
nick_from=self.Config.SERVICE_NICKNAME,
msg=f"[{bold}{green}JSONRPC{nogc}{bold}] JSONRPC Event activated on {self.Config.JSONRPC_URL}",
channel=dchanlog)
return None
return None
level = response.result.level if hasattr(response.result, 'level') else ''
subsystem = response.result.subsystem if hasattr(response.result, 'subsystem') else ''
@@ -278,18 +249,13 @@ class Jsonrpc():
match option:
case 'get':
nickname = str(cmd[2])
uid_to_get = self.User.get_uid(nickname)
if uid_to_get is None:
return None
rpc = self.Rpc
UserInfo = rpc.User.get(uid_to_get)
if rpc.get_error.code != 0:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'{rpc.get_error.message}')
UserInfo = rpc.User.get(nickname)
if UserInfo.error.code != 0:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'{UserInfo.error.message}')
return None
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'UID : {UserInfo.id}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'NICKNAME : {UserInfo.name}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'USERNAME : {UserInfo.user.username}')
@@ -321,9 +287,8 @@ class Jsonrpc():
case 'jrinstances':
try:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"GC Collect: {self.MainUtils.run_python_garbage_collector()}")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Nombre d'instance LiveWebsock: {self.MainUtils.get_number_gc_objects(LiveWebsocket)}")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Nombre d'instance LiveUnixSocket: {self.MainUtils.get_number_gc_objects(LiveUnixSocket)}")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Nombre d'instance Loader: {self.MainUtils.get_number_gc_objects(Loader)}")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Nombre d'instance LiveWebsock: {self.MainUtils.get_number_gc_objects(LiveConnectionFactory)}")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Nombre d'instance ConnectionFactory: {self.MainUtils.get_number_gc_objects(ConnectionFactory)}")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Nombre de toute les instances: {self.MainUtils.get_number_gc_objects()}")
except Exception as err:
self.Logs.error(f"Unknown Error: {err}")

View File

@@ -5,24 +5,20 @@ if TYPE_CHECKING:
from mods.jsonrpc.mod_jsonrpc import Jsonrpc
def thread_subscribe(uplink: 'Jsonrpc') -> None:
response: dict[str, dict] = {}
snickname = uplink.Config.SERVICE_NICKNAME
schannel = uplink.Config.SERVICE_CHANLOG
uplink.is_streaming = True
response = asyncio.run(uplink.LiveRpc.subscribe(["all"]))
if uplink.UnrealIrcdRpcLive.get_error.code == 0:
uplink.is_streaming = True
response = asyncio.run(uplink.UnrealIrcdRpcLive.subscribe(["all"]))
else:
if response.error.code != 0:
uplink.Protocol.send_priv_msg(nick_from=snickname,
msg=f"[{uplink.Config.COLORS.red}JSONRPC ERROR{uplink.Config.COLORS.nogc}] {uplink.UnrealIrcdRpcLive.get_error.message}",
msg=f"[{uplink.Config.COLORS.red}JSONRPC ERROR{uplink.Config.COLORS.nogc}] {response.error.message}",
channel=schannel
)
if response is None:
return
code = response.get('error', {}).get('code', 0)
message = response.get('error', {}).get('message', None)
code = response.error.code
message = response.error.message
if code == 0:
uplink.Protocol.send_priv_msg(
@@ -39,18 +35,15 @@ def thread_subscribe(uplink: 'Jsonrpc') -> None:
def thread_unsubscribe(uplink: 'Jsonrpc') -> None:
response: dict[str, dict] = asyncio.run(uplink.UnrealIrcdRpcLive.unsubscribe())
response = asyncio.run(uplink.LiveRpc.unsubscribe())
uplink.Logs.debug("[JSONRPC UNLOAD] Unsubscribe from the stream!")
uplink.is_streaming = False
uplink.update_configuration('jsonrpc', 0)
snickname = uplink.Config.SERVICE_NICKNAME
schannel = uplink.Config.SERVICE_CHANLOG
if response is None:
return None
code = response.get('error', {}).get('code', 0)
message = response.get('error', {}).get('message', None)
code = response.error.code
message = response.error.message
if code != 0:
uplink.Protocol.send_priv_msg(

View File

@@ -1,9 +1,9 @@
{
"version": "6.2.5",
"version": "6.3.0",
"requests": "2.32.3",
"psutil": "6.0.0",
"unrealircd_rpc_py": "2.0.5",
"unrealircd_rpc_py": "3.0.0",
"sqlalchemy": "2.0.35",
"faker": "30.1.0",
"pyyaml": "6.0.2"