Connectecting to inspircd

This commit is contained in:
adator
2025-09-03 22:01:52 +02:00
parent e79c15188e
commit 6b7fd16a44
13 changed files with 585 additions and 82 deletions

View File

@@ -0,0 +1,28 @@
from typing import TYPE_CHECKING, Optional
from .unreal6 import Unrealircd6
from .inspircd import Inspircd
from .interface import IProtocol
if TYPE_CHECKING:
from core.irc import Irc
class ProtocolFactorty:
def __init__(self, uplink: 'Irc'):
self.__Config = uplink.Config
self.__uplink = uplink
def get(self) -> Optional[IProtocol]:
protocol = self.__Config.SERVEUR_PROTOCOL
match protocol:
case 'unreal6':
self.__uplink.Logs.debug(f"[PROTOCOL] {protocol} has been loaded")
return Unrealircd6(self.__uplink)
case 'inspircd':
self.__uplink.Logs.debug(f"[PROTOCOL] {protocol} has been loaded")
return Inspircd(self.__uplink)
case _:
self.__uplink.Logs.critical(f"[PROTOCOL ERROR] This protocol name ({protocol} is not valid!)")
raise Exception("Unknown protocol!")

View File

@@ -1,6 +1,7 @@
from re import match, findall
from datetime import datetime
from typing import TYPE_CHECKING
import sys
from typing import TYPE_CHECKING, Optional
from ssl import SSLEOFError, SSLError
if TYPE_CHECKING:
@@ -17,8 +18,32 @@ class Inspircd:
self.__Utils = ircInstance.Loader.Utils
self.__Logs = ircInstance.Loader.Logs
self.known_protocol: set[str] = {'SJOIN', 'UID', 'MD', 'QUIT', 'SQUIT',
'EOS', 'PRIVMSG', 'MODE', 'UMODE2',
'VERSION', 'REPUTATION', 'SVS2MODE',
'SLOG', 'NICK', 'PART', 'PONG', 'SASL', 'PING',
'PROTOCTL', 'SERVER', 'SMOD', 'TKL', 'NETINFO',
'006', '007', '018'}
self.__Logs.info(f"** Loading protocol [{__name__}]")
def get_ircd_protocol_poisition(self, cmd: list[str]) -> tuple[int, Optional[str]]:
"""Get the position of known commands
Args:
cmd (list[str]): The server response
Returns:
tuple[int, Optional[str]]: The position and the command.
"""
for index, token in enumerate(cmd):
if token.upper() in self.known_protocol:
return index, token.upper()
self.__Logs.debug(f"[IRCD LOGS] You need to handle this response: {cmd}")
return (-1, None)
def send2socket(self, message: str, print_log: bool = True) -> None:
"""Envoit les commandes à envoyer au serveur.
@@ -45,6 +70,8 @@ class Inspircd:
self.__Logs.error(f"SSLError: {se} - {message}")
except OSError as oe:
self.__Logs.error(f"OSError: {oe} - {message}")
if oe.errno == 10053:
sys.exit(oe)
except AttributeError as ae:
self.__Logs.critical(f"Attribute Error: {ae}")
@@ -175,7 +202,8 @@ class Inspircd:
self.__Logs.error(f"The channel [{channel}] is not valid")
return None
self.send2socket(f":{self.__Config.SERVEUR_ID} SJOIN {self.__Utils.get_unixtime()} {channel} + :{self.__Config.SERVICE_ID}")
# self.send2socket(f":{self.__Config.SERVEUR_ID} SJOIN {self.__Utils.get_unixtime()} {channel} + :{self.__Config.SERVICE_ID}")
self.send2socket(f":{self.__Config.SERVICE_ID} FJOIN {channel} 68")
# Add defender to the channel uids list
self.__Irc.Channel.insert(self.__Irc.Loader.Definition.MChannel(name=channel, uids=[self.__Config.SERVICE_ID]))
@@ -481,32 +509,29 @@ class Inspircd:
def on_uid(self, serverMsg: list[str]) -> None:
"""Handle uid message coming from the server
[:<sid>] UID <uid> <ts> <nick> <real-host> <displayed-host> <real-user> <displayed-user> <ip> <signon> <modes> [<mode-parameters>]+ :<real>
[':97K', 'UID', '97KAAAAAB', '1756928055', 'adator_', '172.18.128.1', '172.18.128.1', '...', '...', '172.18.128.1', '1756928055', '+', ':...']
Args:
serverMsg (list[str]): Original server message
"""
# ['@s2s-md/geoip=cc=GB|cd=United\\sKingdom|asn=16276|asname=OVH\\sSAS;s2s-md/tls_cipher=TLSv1.3-TLS_CHACHA20_POLY1305_SHA256;s2s-md/creationtime=1721564601',
# ':001', 'UID', 'albatros', '0', '1721564597', 'albatros', 'vps-91b2f28b.vps.ovh.net',
# '001HB8G04', '0', '+iwxz', 'Clk-A62F1D18.vps.ovh.net', 'Clk-A62F1D18.vps.ovh.net', 'MyZBwg==', ':...']
try:
isWebirc = True if 'webirc' in serverMsg[0] else False
isWebsocket = True if 'websocket' in serverMsg[0] else False
uid = str(serverMsg[8])
nickname = str(serverMsg[3])
username = str(serverMsg[6])
hostname = str(serverMsg[7])
umodes = str(serverMsg[10])
vhost = str(serverMsg[11])
uid = str(serverMsg[2])
nickname = str(serverMsg[4])
username = str(serverMsg[7])
hostname = str(serverMsg[5])
umodes = str(serverMsg[11])
vhost = str(serverMsg[6])
if not 'S' in umodes:
remote_ip = self.__Base.decode_ip(str(serverMsg[13]))
remote_ip = self.__Base.decode_ip(str(serverMsg[9]))
else:
remote_ip = '127.0.0.1'
# extract realname
realname = ' '.join(serverMsg[14:]).lstrip(':')
realname = ' '.join(serverMsg[12:]).lstrip(':')
# Extract Geoip information
pattern = r'^.*geoip=cc=(\S{2}).*$'
@@ -540,7 +565,7 @@ class Inspircd:
except IndexError as ie:
self.__Logs.error(f"{__name__} - Index Error: {ie}")
except Exception as err:
self.__Logs.error(f"{__name__} - General Error: {err}")
self.__Logs.error(f"{__name__} - General Error: {err}", exc_info=True)
def on_server_ping(self, serverMsg: list[str]) -> None:
"""Send a PONG message to the server
@@ -560,6 +585,17 @@ class Inspircd:
except Exception as err:
self.__Logs.error(f"{__name__} - General Error: {err}")
def on_server(self, serverMsg: list[str]) -> None:
"""_summary_
Args:
serverMsg (list[str]): _description_
"""
try:
...
except Exception as err:
self.__Logs.error(f'General Error: {err}')
def on_version(self, serverMsg: list[str]) -> None:
"""Sending Server Version to the server

View File

@@ -0,0 +1,447 @@
from abc import ABC, abstractmethod
from typing import Optional, TYPE_CHECKING
if TYPE_CHECKING:
from core.classes.sasl import Sasl
from core.definition import MClient, MSasl
class IProtocol(ABC):
@abstractmethod
def get_ircd_protocol_poisition(self, cmd: list[str]) -> tuple[int, Optional[str]]:
"""Get the position of known commands
Args:
cmd (list[str]): The server response
Returns:
tuple[int, Optional[str]]: The position and the command.
"""
@abstractmethod
def send2socket(self, message: str, print_log: bool = True) -> None:
"""Envoit les commandes à envoyer au serveur.
Args:
string (Str): contient la commande à envoyer au serveur.
"""
@abstractmethod
def send_priv_msg(self, nick_from: str, msg: str, channel: str = None, nick_to: str = None):
"""Sending PRIVMSG to a channel or to a nickname by batches
could be either channel or nickname not both together
Args:
msg (str): The message to send
nick_from (str): The sender nickname
channel (str, optional): The receiver channel. Defaults to None.
nick_to (str, optional): The reciever nickname. Defaults to None.
"""
@abstractmethod
def send_notice(self, nick_from: str, nick_to: str, msg: str) -> None:
"""Sending NOTICE by batches
Args:
msg (str): The message to send to the server
nick_from (str): The sender Nickname
nick_to (str): The reciever nickname
"""
@abstractmethod
def send_link(self) -> None:
"""Créer le link et envoyer les informations nécessaires pour la
connexion au serveur.
"""
@abstractmethod
def send_gline(self, nickname: str, hostname: str, set_by: str, expire_timestamp: int, set_at_timestamp: int, reason: str) -> None:
"""_summary_
Args:
nickname (str): _description_
hostname (str): _description_
set_by (str): _description_
expire_timestamp (int): _description_
set_at_timestamp (int): _description_
reason (str): _description_
"""
@abstractmethod
def send_set_nick(self, newnickname: str) -> None:
"""Change nickname of the server
\n This method will also update the User object
Args:
newnickname (str): New nickname of the server
"""
@abstractmethod
def send_squit(self, server_id: str, server_link: str, reason: str) -> None:
"""_summary_
Args:
server_id (str): _description_
server_link (str): _description_
reason (str): _description_
"""
@abstractmethod
def send_ungline(self, nickname:str, hostname: str) -> None:
"""_summary_
Args:
nickname (str): _description_
hostname (str): _description_
"""
@abstractmethod
def send_kline(self, nickname: str, hostname: str, set_by: str, expire_timestamp: int, set_at_timestamp: int, reason: str) -> None:
"""_summary_
Args:
nickname (str): _description_
hostname (str): _description_
set_by (str): _description_
expire_timestamp (int): _description_
set_at_timestamp (int): _description_
reason (str): _description_
"""
@abstractmethod
def send_unkline(self, nickname:str, hostname: str) -> None:
"""_summary_
Args:
nickname (str): _description_
hostname (str): _description_
"""
@abstractmethod
def send_sjoin(self, channel: str) -> None:
"""Server will join a channel with pre defined umodes
Args:
channel (str): Channel to join
"""
@abstractmethod
def send_sapart(self, nick_to_sapart: str, channel_name: str) -> None:
"""_summary_
Args:
from_nick (str): _description_
nick_to (str): _description_
channel_name (str): _description_
"""
@abstractmethod
def send_sajoin(self, nick_to_sajoin: str, channel_name: str) -> None:
"""_summary_
Args:
nick_to_sajoin (str): _description_
channel_name (str): _description_
"""
@abstractmethod
def send_svspart(self, nick_to_part: str, channels: list[str], reason: str) -> None:
"""_summary_
Args:
nick_to_part (str): _description_
channels (list[str]): _description_
reason (str): _description_
"""
@abstractmethod
def send_svsjoin(self, nick_to_part: str, channels: list[str], keys: list[str]) -> None:
"""_summary_
Args:
nick_to_part (str): _description_
channels (list[str]): _description_
keys (list[str]): _description_
"""
@abstractmethod
def send_svsmode(self, nickname: str, user_mode: str) -> None:
"""_summary_
Args:
nickname (str): _description_
user_mode (str): _description_
"""
@abstractmethod
def send_svs2mode(self, nickname: str, user_mode: str) -> None:
"""_summary_
Args:
nickname (str): _description_
user_mode (str): _description_
"""
@abstractmethod
def send_svslogin(self, client_uid: str, user_account: str) -> None:
"""Log a client into his account.
Args:
client_uid (str): Client UID
user_account (str): The account of the user
"""
@abstractmethod
def send_svslogout(self, client_obj: 'MClient') -> None:
"""Logout a client from his account
Args:
client_uid (str): The Client UID
"""
@abstractmethod
def send_quit(self, uid: str, reason: str, print_log: True) -> None:
"""Send quit message
- Delete uid from User object
- Delete uid from Reputation object
Args:
uidornickname (str): The UID or the Nickname
reason (str): The reason for the quit
"""
@abstractmethod
def send_uid(self, nickname:str, username: str, hostname: str, uid:str, umodes: str, vhost: str, remote_ip: str, realname: str, print_log: bool = True) -> None:
"""Send UID to the server
- Insert User to User Object
Args:
nickname (str): Nickname of the client
username (str): Username of the client
hostname (str): Hostname of the client you want to create
uid (str): UID of the client you want to create
umodes (str): umodes of the client you want to create
vhost (str): vhost of the client you want to create
remote_ip (str): remote_ip of the client you want to create
realname (str): realname of the client you want to create
print_log (bool, optional): print logs if true. Defaults to True.
"""
@abstractmethod
def send_join_chan(self, uidornickname: str, channel: str, password: str = None, print_log: bool = True) -> None:
"""Joining a channel
Args:
uidornickname (str): UID or nickname that need to join
channel (str): channel to join
password (str, optional): The password of the channel to join. Default to None
print_log (bool, optional): Write logs. Defaults to True.
"""
@abstractmethod
def send_part_chan(self, uidornickname:str, channel: str, print_log: bool = True) -> None:
"""Part from a channel
Args:
uidornickname (str): UID or nickname that need to join
channel (str): channel to join
print_log (bool, optional): Write logs. Defaults to True.
"""
@abstractmethod
def send_mode_chan(self, channel_name: str, channel_mode: str) -> None:
"""_summary_
Args:
channel_name (str): _description_
channel_mode (str): _description_
"""
@abstractmethod
def send_raw(self, raw_command: str) -> None:
"""_summary_
Args:
raw_command (str): _description_
"""
#####################
# HANDLE EVENTS #
#####################
@abstractmethod
def on_svs2mode(self, serverMsg: list[str]) -> None:
"""Handle svs2mode coming from a server
>>> [':00BAAAAAG', 'SVS2MODE', '001U01R03', '-r']
Args:
serverMsg (list[str]): Original server message
"""
@abstractmethod
def on_mode(self, serverMsg: list[str]) -> None:
"""Handle mode coming from a server
Args:
serverMsg (list[str]): Original server message
"""
@abstractmethod
def on_umode2(self, serverMsg: list[str]) -> None:
"""Handle umode2 coming from a server
>>> [':adator_', 'UMODE2', '-i']
Args:
serverMsg (list[str]): Original server message
"""
@abstractmethod
def on_quit(self, serverMsg: list[str]) -> None:
"""Handle quit coming from a server
Args:
serverMsg (list[str]): Original server message
"""
@abstractmethod
def on_squit(self, serverMsg: list[str]) -> None:
"""Handle squit coming from a server
Args:
serverMsg (list[str]): Original server message
"""
@abstractmethod
def on_protoctl(self, serverMsg: list[str]) -> None:
"""Handle protoctl coming from a server
Args:
serverMsg (list[str]): Original server message
"""
@abstractmethod
def on_nick(self, serverMsg: list[str]) -> None:
"""Handle nick coming from a server
new nickname
Args:
serverMsg (list[str]): Original server message
"""
@abstractmethod
def on_sjoin(self, serverMsg: list[str]) -> None:
"""Handle sjoin coming from a server
Args:
serverMsg (list[str]): Original server message
"""
@abstractmethod
def on_part(self, serverMsg: list[str]) -> None:
"""Handle part coming from a server
Args:
serverMsg (list[str]): Original server message
"""
@abstractmethod
def on_eos(self, serverMsg: list[str]) -> None:
"""Handle EOS coming from a server
Args:
serverMsg (list[str]): Original server message
"""
@abstractmethod
def on_reputation(self, serverMsg: list[str]) -> None:
"""Handle REPUTATION coming from a server
Args:
serverMsg (list[str]): Original server message
"""
@abstractmethod
def on_uid(self, serverMsg: list[str]) -> None:
"""Handle uid message coming from the server
Args:
serverMsg (list[str]): Original server message
"""
@abstractmethod
def on_privmsg(self, serverMsg: list[str]) -> None:
"""Handle PRIVMSG message coming from the server
Args:
serverMsg (list[str]): Original server message
"""
@abstractmethod
def on_server_ping(self, serverMsg: list[str]) -> None:
"""Send a PONG message to the server
Args:
serverMsg (list[str]): List of str coming from the server
"""
@abstractmethod
def on_server(self, serverMsg: list[str]) -> None:
"""_summary_
Args:
serverMsg (list[str]): _description_
"""
@abstractmethod
def on_version(self, serverMsg: list[str]) -> None:
"""Sending Server Version to the server
Args:
serverMsg (list[str]): List of str coming from the server
"""
@abstractmethod
def on_time(self, serverMsg: list[str]) -> None:
"""Sending TIME answer to a requestor
Args:
serverMsg (list[str]): List of str coming from the server
"""
@abstractmethod
def on_ping(self, serverMsg: list[str]) -> None:
"""Sending a PING answer to requestor
Args:
serverMsg (list[str]): List of str coming from the server
"""
@abstractmethod
def on_version_msg(self, serverMsg: list[str]) -> None:
"""Handle version coming from the server
\n ex. /version Defender
Args:
serverMsg (list[str]): Original message from the server
"""
@abstractmethod
def on_smod(self, serverMsg: list[str]) -> None:
"""Handle SMOD message coming from the server
Args:
serverMsg (list[str]): Original server message
"""
@abstractmethod
def on_sasl(self, serverMsg: list[str], psasl: 'Sasl') -> Optional['MSasl']:
"""Handle SASL coming from a server
Args:
serverMsg (list[str]): Original server message
psasl (Sasl): The SASL process object
"""
@abstractmethod
def on_md(self, serverMsg: list[str]) -> None:
"""Handle MD responses
[':001', 'MD', 'client', '001MYIZ03', 'certfp', ':d1235648...']
Args:
serverMsg (list[str]): The server reply
"""

View File

@@ -4,6 +4,7 @@ from datetime import datetime
from typing import TYPE_CHECKING, Optional
from ssl import SSLEOFError, SSLError
from core.classes.protocols.interface import IProtocol
from core.utils import tr
if TYPE_CHECKING:
@@ -11,7 +12,7 @@ if TYPE_CHECKING:
from core.classes.sasl import Sasl
from core.definition import MClient, MSasl
class Unrealircd6:
class Unrealircd6(IProtocol):
def __init__(self, ircInstance: 'Irc'):
self.name = 'UnrealIRCD-6'
@@ -200,6 +201,7 @@ class Unrealircd6:
self.send2socket(f":{server_id} PROTOCTL SID={server_id}")
self.send2socket(f":{server_id} PROTOCTL BOOTED={unixtime}")
self.send2socket(f":{server_id} SERVER {link} 1 :{info}")
self.send2socket("EOS")
self.send2socket(f":{server_id} {nickname} :Reserved for services")
self.send2socket(f":{server_id} UID {nickname} 1 {unixtime} {username} {host} {service_id} * {smodes} * * fwAAAQ== :{realname}")
self.send_sjoin(chan)
@@ -1035,15 +1037,6 @@ class Unrealircd6:
)
return None
# if not arg[0].lower() in self.__Irc.module_commands_list:
# self.__Logs.debug(f"This command {arg[0]} is not available")
# self.send_notice(
# nick_from=self.__Config.SERVICE_NICKNAME,
# nick_to=user_trigger,
# msg=f"This command [{self.__Config.COLORS.bold}{arg[0]}{self.__Config.COLORS.bold}] is not available"
# )
# return None
cmd_to_send = convert_to_string.replace(':','')
self.__Base.log_cmd(user_trigger, cmd_to_send)
@@ -1106,7 +1099,6 @@ class Unrealircd6:
serverMsg (list[str]): List of str coming from the server
"""
try:
#
pong = str(serverMsg[1]).replace(':','')
self.send2socket(f"PONG :{pong}", print_log=False)
@@ -1115,6 +1107,11 @@ class Unrealircd6:
self.__Logs.error(f"{__name__} - General Error: {err}")
def on_server(self, serverMsg: list[str]) -> None:
"""_summary_
Args:
serverMsg (list[str]): _description_
"""
try:
# ['SERVER', 'irc.local.org', '1', ':U6100-Fhn6OoE-001', 'Local', 'Server']
sCopy = serverMsg.copy()
@@ -1177,7 +1174,7 @@ class Unrealircd6:
Args:
serverMsg (list[str]): List of str coming from the server
"""
# ['@unrealircd.org/userhost=StatServ@stats.deb.biz.st;draft/bot;bot;msgid=ehfAq3m2yjMjhgWEfi1UCS;time=2024-10-26T13:49:06.299Z', ':001INC60B', 'PRIVMSG', '12ZAAAAAB', ':\x01PING', '762382207\x01']
# ['@unrealircd.org/...', ':001INC60B', 'PRIVMSG', '12ZAAAAAB', ':\x01PING', '762382207\x01']
# Réponse a un CTCP VERSION
try:
@@ -1186,6 +1183,7 @@ class Unrealircd6:
arg = serverMsg[4].replace(':', '')
if nickname is None:
self.__Logs.debug(serverMsg)
return None
if arg == '\x01PING':
@@ -1199,6 +1197,7 @@ class Unrealircd6:
nick_to=nickname,
msg=f"\x01PING {ping_response} secs\x01"
)
self.__Logs.debug(serverMsg)
return None
except Exception as err: