many updates

This commit is contained in:
adator
2024-11-02 23:22:36 +01:00
parent 9d9ede0e80
commit cbae3dce96
12 changed files with 488 additions and 387 deletions

View File

@@ -81,6 +81,7 @@ class Channel:
def delete_user_from_channel(self, chan_name: str, uid:str) -> bool:
try:
result = False
chan_name = chan_name.lower()
for record in self.UID_CHANNEL_DB:
if record.name == chan_name:

View File

@@ -55,20 +55,15 @@ class Clone:
Returns:
bool: True if deleted
"""
result = False
for record in self.UID_CLONE_DB:
if record.nickname == uidornickname or record.uid == uidornickname:
# If the user exist then remove and return True and do not go further
self.UID_CLONE_DB.remove(record)
result = True
# self.Logs.debug(f'The clone ({record.nickname}) has been deleted')
return result
cloneObj = self.get_Clone(uidornickname=uidornickname)
if not result:
self.Logs.critical(f'The UID or Nickname {uidornickname} was not deleted')
if cloneObj is None:
return False
return result
self.UID_CLONE_DB.remove(cloneObj)
return True
def exists(self, nickname: str) -> bool:
"""Check if the nickname exist
@@ -113,11 +108,15 @@ class Clone:
Returns:
Union[MClone, None]: Return MClone object or None
"""
cloneObj = None
for clone in self.UID_CLONE_DB:
if clone.uid == uidornickname:
return clone
cloneObj = clone
if clone.nickname == uidornickname:
return clone
cloneObj = clone
return cloneObj
def get_uid(self, uidornickname: str) -> Union[str, None]:
"""Get the UID of the clone starting from the UID or the Nickname

View File

@@ -1,3 +1,5 @@
from re import match, findall
from datetime import datetime
from typing import TYPE_CHECKING
from ssl import SSLEOFError, SSLError
@@ -23,12 +25,12 @@ class Unrealircd6:
if print_log:
self.__Base.logs.debug(f'<< {message}')
except UnicodeDecodeError:
self.__Base.logs.error(f'Decode Error try iso-8859-1 - message: {message}')
self.__Irc.IrcSocket.send(f"{message}\r\n".encode(self.__Config.SERVEUR_CHARSET[0],'replace'))
except UnicodeEncodeError:
self.__Base.logs.error(f'Encode Error try iso-8859-1 - message: {message}')
self.__Irc.IrcSocket.send(f"{message}\r\n".encode(self.__Config.SERVEUR_CHARSET[0],'replace'))
except UnicodeDecodeError as ude:
self.__Base.logs.error(f'Decode Error try iso-8859-1 - {ude} - {message}')
self.__Irc.IrcSocket.send(f"{message}\r\n".encode(self.__Config.SERVEUR_CHARSET[1],'replace'))
except UnicodeEncodeError as uee:
self.__Base.logs.error(f'Encode Error try iso-8859-1 - {uee} - {message}')
self.__Irc.IrcSocket.send(f"{message}\r\n".encode(self.__Config.SERVEUR_CHARSET[1],'replace'))
except AssertionError as ae:
self.__Base.logs.warning(f'Assertion Error {ae} - message: {message}')
except SSLEOFError as soe:
@@ -150,6 +152,9 @@ class Unrealircd6:
def squit(self, server_id: str, server_link: str, reason: str) -> None:
if not reason:
reason = 'Service Shutdown'
self.send2socket(f":{server_id} SQUIT {server_link} :{reason}")
return None
@@ -168,13 +173,81 @@ class Unrealircd6:
def sjoin(self, channel: str) -> None:
if not self.__Irc.Channel.Is_Channel(channel):
self.__Base.logs.error(f"The channel [{channel}] is not valid")
return None
self.send2socket(f":{self.__Config.SERVEUR_ID} SJOIN {self.__Base.get_unixtime()} {channel} + :{self.__Config.SERVICE_ID}")
# Add defender to the channel uids list
self.__Irc.Channel.insert(self.__Irc.Loader.Definition.MChannel(name=channel, uids=[self.__Config.SERVICE_ID]))
return None
def join(self, uidornickname: str, channel: str, password: str = None, print_log: bool = True) -> None:
def sendQuit(self, uid: str, reason: str, print_log: True) -> None:
"""Send quit message
Args:
uidornickname (str): The UID or the Nickname
reason (str): The reason for the quit
"""
userObj = self.__Irc.User.get_User(uidornickname=uid)
cloneObj = self.__Irc.Clone.get_Clone(uidornickname=uid)
reputationObj = self.__Irc.Reputation.get_Reputation(uidornickname=uid)
if not userObj is None:
self.send2socket(f":{userObj.uid} QUIT :{reason}", print_log=print_log)
self.__Irc.User.delete(userObj.uid)
if not cloneObj is None:
self.__Irc.Clone.delete(cloneObj.uid)
if not reputationObj is None:
self.__Irc.Reputation.delete(reputationObj.uid)
if not self.__Irc.Channel.delete_user_from_all_channel(uid):
self.__Base.logs.error(f"The UID [{uid}] has not been deleted from all channels")
return None
def sendUID(self, nickname:str, username: str, hostname: str, uid:str, umodes: str, vhost: str, remote_ip: str, realname: str, print_log: bool = True) -> None:
"""Send UID to the server
Args:
nickname (str): Nickname of the client
username (str): Username of the client
hostname (str): Hostname of the client you want to create
uid (str): UID of the client you want to create
umodes (str): umodes of the client you want to create
vhost (str): vhost of the client you want to create
remote_ip (str): remote_ip of the client you want to create
realname (str): realname of the client you want to create
print_log (bool, optional): print logs if true. Defaults to True.
"""
# {self.Config.SERVEUR_ID} UID
# {clone.nickname} 1 {self.Base.get_unixtime()} {clone.username} {clone.hostname} {clone.uid} * {clone.umodes} {clone.vhost} * {self.Base.encode_ip(clone.remote_ip)} :{clone.realname}
try:
unixtime = self.__Base.get_unixtime()
encoded_ip = self.__Base.encode_ip(remote_ip)
# Create the user
self.__Irc.User.insert(
self.__Irc.Loader.Definition.MUser(
uid=uid, nickname=nickname, username=username,
realname=realname,hostname=hostname, umodes=umodes,
vhost=vhost, remote_ip=remote_ip
)
)
uid_msg = f":{self.__Config.SERVEUR_ID} UID {nickname} 1 {unixtime} {username} {hostname} {uid} * {umodes} {vhost} * {encoded_ip} :{realname}"
self.send2socket(uid_msg, print_log=print_log)
return None
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
def sendChanJoin(self, uidornickname: str, channel: str, password: str = None, print_log: bool = True) -> None:
"""Joining a channel
Args:
@@ -191,6 +264,7 @@ class Unrealircd6:
return None
if not self.__Irc.Channel.Is_Channel(channel):
self.__Base.logs.error(f"The channel [{channel}] is not valid")
return None
self.send2socket(f":{userObj.uid} JOIN {channel} {passwordChannel}", print_log=print_log)
@@ -199,7 +273,7 @@ class Unrealircd6:
self.__Irc.Channel.insert(self.__Irc.Loader.Definition.MChannel(name=channel, uids=[userObj.uid]))
return None
def part(self, uidornickname:str, channel: str, print_log: bool = True) -> None:
def sendChanPart(self, uidornickname:str, channel: str, print_log: bool = True) -> None:
"""Part from a channel
Args:
@@ -211,9 +285,11 @@ class Unrealircd6:
userObj = self.__Irc.User.get_User(uidornickname)
if userObj is None:
self.__Base.logs.error(f"The user [{uidornickname}] is not valid")
return None
if not self.__Irc.Channel.Is_Channel(channel):
self.__Base.logs.error(f"The channel [{channel}] is not valid")
return None
self.send2socket(f":{userObj.uid} PART {channel}", print_log=print_log)
@@ -228,6 +304,214 @@ class Unrealircd6:
return None
def on_umode2(self, serverMsg: list[str]) -> None:
"""Handle umode2 coming from a server
Args:
serverMsg (list[str]): Original server message
"""
try:
# [':adator_', 'UMODE2', '-iwx']
userObj = self.__Irc.User.get_User(str(serverMsg[0]).lstrip(':'))
userMode = serverMsg[2]
if userObj is None: # If user is not created
return None
# save previous user modes
old_umodes = userObj.umodes
# TODO : User object should be able to update user modes
if self.__Irc.User.update_mode(userObj.uid, userMode):
return None
# self.__Base.logs.debug(f"Updating user mode for [{userObj.nickname}] [{old_umodes}] => [{userObj.umodes}]")
return None
except IndexError as ie:
self.__Base.logs.error(f"{__name__} - Index Error: {ie}")
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
def on_quit(self, serverMsg: list[str]) -> None:
"""Handle quit coming from a server
Args:
serverMsg (list[str]): Original server message
"""
try:
# ['@unrealircd.org/userhost=...@192.168.1.10;unrealircd.org/userip=...@192.168.1.10;msgid=CssUrV08BzekYuq7BfvPHn;time=2024-11-02T15:03:33.182Z', ':001JKNY0N', 'QUIT', ':Quit:', '....']
uid_who_quit = str(serverMsg[1]).lstrip(':')
self.__Irc.Channel.delete_user_from_all_channel(uid_who_quit)
self.__Irc.User.delete(uid_who_quit)
self.__Irc.Reputation.delete(uid_who_quit)
self.__Irc.Clone.delete(uid_who_quit)
return None
except IndexError as ie:
self.__Base.logs.error(f"{__name__} - Index Error: {ie}")
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
def on_nick(self, serverMsg: list[str]) -> None:
"""Handle nick coming from a server
new nickname
Args:
serverMsg (list[str]): Original server message
"""
try:
# ['@unrealircd.org/geoip=FR;unrealircd.org/', ':001OOU2H3', 'NICK', 'WebIrc', '1703795844']
# Changement de nickname
uid = str(serverMsg[1]).lstrip(':')
newnickname = serverMsg[3]
self.__Irc.User.update(uid, newnickname)
return None
except IndexError as ie:
self.__Base.logs.error(f"{__name__} - Index Error: {ie}")
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
def on_sjoin(self, serverMsg: list[str]) -> None:
"""Handle sjoin coming from a server
Args:
serverMsg (list[str]): Original server message
"""
try:
# ['@msgid=5sTwGdj349D82L96p749SY;time=2024-08-15T09:50:23.528Z', ':001', 'SJOIN', '1721564574', '#welcome', ':001JD94QH']
# ['@msgid=bvceb6HthbLJapgGLXn1b0;time=2024-08-15T09:50:11.464Z', ':001', 'SJOIN', '1721564574', '#welcome', '+lnrt', '13', ':001CIVLQF', '+11ZAAAAAB', '001QGR10C', '*@0014UE10B', '001NL1O07', '001SWZR05', '001HB8G04', '@00BAAAAAJ', '0019M7101']
# ['@msgid=SKUeuVzOrTShRDduq8VerX;time=2024-08-23T19:37:04.266Z', ':001', 'SJOIN', '1723993047', '#welcome', '+lnrt', '13',
# ':001T6VU3F', '001JGWB2K', '@11ZAAAAAB',
# '001F16WGR', '001X9YMGQ', '*+001DYPFGP', '@00BAAAAAJ', '001AAGOG9', '001FMFVG8', '001DAEEG7',
# '&~G:unknown-users', '"~G:websocket-users', '"~G:known-users', '"~G:webirc-users']
serverMsg.pop(0)
channel = str(serverMsg[3]).lower()
len_cmd = len(serverMsg)
list_users:list = []
occurence = 0
start_boucle = 0
# Trouver le premier user
for i in range(len_cmd):
s: list = findall(fr':', serverMsg[i])
if s:
occurence += 1
if occurence == 2:
start_boucle = i
# Boucle qui va ajouter l'ensemble des users (UID)
for i in range(start_boucle, len(serverMsg)):
parsed_UID = str(serverMsg[i])
clean_uid = self.__Irc.User.clean_uid(parsed_UID)
if not clean_uid is None and len(clean_uid) == 9:
list_users.append(parsed_UID)
if list_users:
self.__Irc.Channel.insert(
self.__Irc.Loader.Definition.MChannel(
name=channel,
uids=list_users
)
)
return None
except IndexError as ie:
self.__Base.logs.error(f"{__name__} - Index Error: {ie}")
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
def on_part(self, serverMsg: list[str]) -> None:
"""Handle part coming from a server
Args:
serverMsg (list[str]): Original server message
"""
try:
# ['@unrealircd.org/geoip=FR;unrealircd.org/userhost=50d6492c@80.214.73.44;unrealircd.org/userip=50d6492c@80.214.73.44;msgid=YSIPB9q4PcRu0EVfC9ci7y-/mZT0+Gj5FLiDSZshH5NCw;time=2024-08-15T15:35:53.772Z',
# ':001EPFBRD', 'PART', '#welcome', ':WEB', 'IRC', 'Paris']
uid = str(serverMsg[1]).lstrip(':')
channel = str(serverMsg[3]).lower()
self.__Irc.Channel.delete_user_from_channel(channel, uid)
return None
except IndexError as ie:
self.__Base.logs.error(f"{__name__} - Index Error: {ie}")
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
def on_uid(self, serverMsg: list[str]) -> None:
"""Handle uid message coming from the server
Args:
serverMsg (list[str]): Original server message
"""
# ['@s2s-md/geoip=cc=GB|cd=United\\sKingdom|asn=16276|asname=OVH\\sSAS;s2s-md/tls_cipher=TLSv1.3-TLS_CHACHA20_POLY1305_SHA256;s2s-md/creationtime=1721564601',
# ':001', 'UID', 'albatros', '0', '1721564597', 'albatros', 'vps-91b2f28b.vps.ovh.net',
# '001HB8G04', '0', '+iwxz', 'Clk-A62F1D18.vps.ovh.net', 'Clk-A62F1D18.vps.ovh.net', 'MyZBwg==', ':...']
try:
isWebirc = True if 'webirc' in serverMsg[0] else False
isWebsocket = True if 'websocket' in serverMsg[0] else False
uid = str(serverMsg[8])
nickname = str(serverMsg[3])
username = str(serverMsg[6])
hostname = str(serverMsg[7])
umodes = str(serverMsg[10])
vhost = str(serverMsg[11])
if not 'S' in umodes:
remote_ip = self.__Base.decode_ip(str(serverMsg[13]))
else:
remote_ip = '127.0.0.1'
# extract realname
realname = ' '.join(serverMsg[14:]).lstrip(':')
# Extract Geoip information
pattern = r'^.*geoip=cc=(\S{2}).*$'
geoip_match = match(pattern, serverMsg[0])
if geoip_match:
geoip = geoip_match.group(1)
else:
geoip = None
score_connexion = 0
self.__Irc.User.insert(
self.__Irc.Loader.Definition.MUser(
uid=uid,
nickname=nickname,
username=username,
realname=realname,
hostname=hostname,
umodes=umodes,
vhost=vhost,
isWebirc=isWebirc,
isWebsocket=isWebsocket,
remote_ip=remote_ip,
geoip=geoip,
score_connexion=score_connexion,
connexion_datetime=datetime.now()
)
)
return None
except IndexError as ie:
self.__Base.logs.error(f"{__name__} - Index Error: {ie}")
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
def on_server_ping(self, serverMsg: list[str]) -> None:
"""Send a PONG message to the server

View File

@@ -52,20 +52,15 @@ class Reputation:
Returns:
bool: True if updated
"""
result = False
for record in self.UID_REPUTATION_DB:
if record.uid == uid:
# If the user exist then update and return True and do not go further
record.nickname = newNickname
result = True
self.Logs.debug(f'Reputation UID ({record.uid}) has been updated with new nickname {newNickname}')
return result
reputationObj = self.get_Reputation(uid)
if not result:
self.Logs.critical(f'Reputation new nickname {newNickname} was not updated, uid = {uid}')
if reputationObj is None:
return False
return result
reputationObj.nickname = newNickname
return True
def delete(self, uid: str) -> bool:
"""Delete the User starting from the UID
@@ -78,6 +73,9 @@ class Reputation:
"""
result = False
if not self.is_exist(uid):
return result
for record in self.UID_REPUTATION_DB:
if record.uid == uid:
# If the user exist then remove and return True and do not go further
@@ -107,9 +105,6 @@ class Reputation:
elif record.nickname == uidornickname:
User = record
if not User is None:
self.Logs.debug(f'Reputation found for {uidornickname} -> {User}')
return User
def get_uid(self, uidornickname:str) -> Union[str, None]:
@@ -121,17 +116,13 @@ class Reputation:
Returns:
str|None: Return the UID
"""
uid = None
for record in self.UID_REPUTATION_DB:
if record.uid == uidornickname:
uid = record.uid
if record.nickname == uidornickname:
uid = record.uid
if not uid is None:
self.Logs.debug(f'Reputation UID found for {uidornickname} -> {uid}')
reputationObj = self.get_Reputation(uidornickname)
return uid
if reputationObj is None:
return None
return reputationObj.uid
def get_nickname(self, uidornickname:str) -> Union[str, None]:
"""Get the Nickname starting from UID or the nickname
@@ -142,17 +133,12 @@ class Reputation:
Returns:
str|None: the nickname
"""
nickname = None
for record in self.UID_REPUTATION_DB:
if record.nickname == uidornickname:
nickname = record.nickname
if record.uid == uidornickname:
nickname = record.nickname
if not nickname is None:
self.Logs.debug(f'Reputation nickname found for {uidornickname} -> {nickname}')
reputationObj = self.get_Reputation(uidornickname)
return nickname
if reputationObj is None:
return None
return reputationObj.nickname
def is_exist(self, uidornickname: str) -> bool:
"""Check if the UID or the nickname exist in the reputation DB
@@ -164,12 +150,9 @@ class Reputation:
bool: True if exist
"""
found = False
reputationObj = self.get_Reputation(uidornickname)
for record in self.UID_REPUTATION_DB:
if record.uid == uidornickname:
found = True
if record.nickname == uidornickname:
found = True
return found
if reputationObj is None:
return False
else:
return True

View File

@@ -8,3 +8,5 @@ class Settings:
RUNNING_SOCKETS: list[socket] = []
PERIODIC_FUNC: dict[object] = {}
LOCK: RLock = RLock()
CONSOLE: bool = True

View File

@@ -1,4 +1,4 @@
import re
from re import sub
from typing import Union, TYPE_CHECKING
from dataclasses import asdict
@@ -26,25 +26,16 @@ class User:
Returns:
bool: True if inserted
"""
result = False
exist = False
for record in self.UID_DB:
if record.uid == newUser.uid:
# If the user exist then return False and do not go further
exist = True
self.Logs.debug(f'{record.uid} already exist')
return result
userObj = self.get_User(newUser.uid)
if not exist:
self.UID_DB.append(newUser)
result = True
# self.Logs.debug(f'New User Created: ({newUser})')
if not userObj is None:
# User already created return False
return False
if not result:
self.Logs.critical(f'The User Object was not inserted {newUser}')
self.UID_DB.append(newUser)
return result
return True
def update(self, uid: str, newNickname: str) -> bool:
"""Update the nickname starting from the UID
@@ -56,20 +47,53 @@ class User:
Returns:
bool: True if updated
"""
result = False
userObj = self.get_User(uidornickname=uid)
for record in self.UID_DB:
if record.uid == uid:
# If the user exist then update and return True and do not go further
record.nickname = newNickname
result = True
# self.Logs.debug(f'UID ({record.uid}) has been updated with new nickname {newNickname}')
return result
if userObj is None:
return False
if not result:
self.Logs.critical(f'The new nickname {newNickname} was not updated, uid = {uid}')
userObj.nickname = newNickname
return result
return True
def update_mode(self, uidornickname: str, modes: str) -> bool:
"""Updating user mode
Args:
uidornickname (str): The UID or Nickname of the user
modes (str): new modes to update
Returns:
bool: True if user mode has been updaed
"""
response = True
userObj = self.get_User(uidornickname=uidornickname)
if userObj is None:
return False
action = modes[0]
new_modes = modes[1:]
existing_umodes = userObj.umodes
final_umodes = userObj.umodes
if action == '+':
for nm in new_modes:
if nm not in existing_umodes:
final_umodes += nm
elif action == '-':
for nm in new_modes:
if nm in existing_umodes:
final_umodes = final_umodes.replace(nm, '')
else:
return False
userObj.umodes = final_umodes
return response
def delete(self, uid: str) -> bool:
"""Delete the User starting from the UID
@@ -80,20 +104,15 @@ class User:
Returns:
bool: True if deleted
"""
result = False
for record in self.UID_DB:
if record.uid == uid:
# If the user exist then remove and return True and do not go further
self.UID_DB.remove(record)
result = True
# self.Logs.debug(f'UID ({record.uid}) has been deleted')
return result
userObj = self.get_User(uidornickname=uid)
if not result:
self.Logs.critical(f'The UID {uid} was not deleted')
if userObj is None:
return False
return result
self.UID_DB.remove(userObj)
return True
def get_User(self, uidornickname: str) -> Union['MUser', None]:
"""Get The User Object model
@@ -111,8 +130,6 @@ class User:
elif record.nickname == uidornickname:
User = record
# self.Logs.debug(f'Search {uidornickname} -- result = {User}')
return User
def get_uid(self, uidornickname:str) -> Union[str, None]:
@@ -124,17 +141,13 @@ class User:
Returns:
str|None: Return the UID
"""
uid = None
for record in self.UID_DB:
if record.uid == uidornickname:
uid = record.uid
if record.nickname == uidornickname:
uid = record.uid
# if not uid is None:
# self.Logs.debug(f'The UID that you are looking for {uidornickname} has been found {uid}')
userObj = self.get_User(uidornickname=uidornickname)
return uid
if userObj is None:
return None
return userObj.uid
def get_nickname(self, uidornickname:str) -> Union[str, None]:
"""Get the Nickname starting from UID or the nickname
@@ -145,26 +158,46 @@ class User:
Returns:
str|None: the nickname
"""
nickname = None
for record in self.UID_DB:
if record.nickname == uidornickname:
nickname = record.nickname
if record.uid == uidornickname:
nickname = record.nickname
# self.Logs.debug(f'The value to check {uidornickname} -> {nickname}')
return nickname
def get_User_AsDict(self, uidornickname: str) -> Union[dict[str, any], None]:
userObj = self.get_User(uidornickname=uidornickname)
if not userObj is None:
user_as_dict = asdict(userObj)
return user_as_dict
else:
if userObj is None:
return None
def clean_uid(self, uid: str) -> str:
return userObj.nickname
def get_User_AsDict(self, uidornickname: str) -> Union[dict[str, any], None]:
"""Transform User Object to a dictionary
Args:
uidornickname (str): The UID or The nickname
Returns:
Union[dict[str, any], None]: User Object as a dictionary or None
"""
userObj = self.get_User(uidornickname=uidornickname)
if userObj is None:
return None
return asdict(userObj)
def is_exist(self, uidornikname: str) -> bool:
"""Check if the UID or the nickname exist in the USER DB
Args:
uidornickname (str): The UID or the NICKNAME
Returns:
bool: True if exist
"""
userObj = self.get_User(uidornickname=uidornikname)
if userObj is None:
return False
return True
def clean_uid(self, uid: str) -> Union[str, None]:
"""Clean UID by removing @ / % / + / ~ / * / :
Args:
@@ -175,6 +208,9 @@ class User:
"""
pattern = fr'[:|@|%|\+|~|\*]*'
parsed_UID = re.sub(pattern, '', uid)
parsed_UID = sub(pattern, '', uid)
if not parsed_UID:
return None
return parsed_UID