Merge pull request #69 from adator85/V6.X.X

V6.0.4
This commit is contained in:
adator
2024-11-18 23:50:25 +01:00
committed by GitHub
4 changed files with 227 additions and 178 deletions

View File

@@ -1,4 +1,4 @@
from re import match, findall from re import match, findall, search
from datetime import datetime from datetime import datetime
from typing import TYPE_CHECKING, Union from typing import TYPE_CHECKING, Union
from ssl import SSLEOFError, SSLError from ssl import SSLEOFError, SSLError
@@ -694,6 +694,105 @@ class Unrealircd6:
except Exception as err: except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}") self.__Base.logs.error(f"{__name__} - General Error: {err}")
def on_eos(self, serverMsg: list[str]) -> None:
"""Handle EOS coming from a server
Args:
serverMsg (list[str]): Original server message
"""
try:
# [':001', 'EOS']
server_msg_copy = serverMsg.copy()
hsid = str(server_msg_copy[0]).replace(':','')
if hsid == self.__Config.HSID:
if self.__Config.DEFENDER_INIT == 1:
current_version = self.__Config.CURRENT_VERSION
latest_version = self.__Config.LATEST_VERSION
if self.__Base.check_for_new_version(False):
version = f'{current_version} >>> {latest_version}'
else:
version = f'{current_version}'
print(f"################### DEFENDER ###################")
print(f"# SERVICE CONNECTE ")
print(f"# SERVEUR : {self.__Config.SERVEUR_IP} ")
print(f"# PORT : {self.__Config.SERVEUR_PORT} ")
print(f"# SSL : {self.__Config.SERVEUR_SSL} ")
print(f"# SSL VER : {self.__Config.SSL_VERSION} ")
print(f"# NICKNAME : {self.__Config.SERVICE_NICKNAME} ")
print(f"# CHANNEL : {self.__Config.SERVICE_CHANLOG} ")
print(f"# VERSION : {version} ")
print(f"################################################")
self.__Base.logs.info(f"################### DEFENDER ###################")
self.__Base.logs.info(f"# SERVICE CONNECTE ")
self.__Base.logs.info(f"# SERVEUR : {self.__Config.SERVEUR_IP} ")
self.__Base.logs.info(f"# PORT : {self.__Config.SERVEUR_PORT} ")
self.__Base.logs.info(f"# SSL : {self.__Config.SERVEUR_SSL} ")
self.__Base.logs.info(f"# SSL VER : {self.__Config.SSL_VERSION} ")
self.__Base.logs.info(f"# NICKNAME : {self.__Config.SERVICE_NICKNAME} ")
self.__Base.logs.info(f"# CHANNEL : {self.__Config.SERVICE_CHANLOG} ")
self.__Base.logs.info(f"# VERSION : {version} ")
self.__Base.logs.info(f"################################################")
if self.__Base.check_for_new_version(False):
self.send_priv_msg(
nick_from=self.__Config.SERVICE_NICKNAME,
msg=f" New Version available {version}",
channel=self.__Config.SERVICE_CHANLOG
)
# Initialisation terminé aprés le premier PING
self.send_priv_msg(
nick_from=self.__Config.SERVICE_NICKNAME,
msg=f"[{self.__Config.COLORS.green}INFORMATION{self.__Config.COLORS.nogc}] >> Defender is ready",
channel=self.__Config.SERVICE_CHANLOG
)
self.__Config.DEFENDER_INIT = 0
# Send EOF to other modules
for classe_name, classe_object in self.__Irc.loaded_classes.items():
classe_object.cmd(server_msg_copy)
return None
except IndexError as ie:
self.__Base.logs.error(f"{__name__} - Key Error: {ie}")
except KeyError as ke:
self.__Base.logs.error(f"{__name__} - Key Error: {ke}")
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
def on_reputation(self, serverMsg: list[str]) -> None:
"""Handle REPUTATION coming from a server
Args:
serverMsg (list[str]): Original server message
"""
try:
# :001 REPUTATION 127.0.0.1 118
server_msg_copy = serverMsg.copy()
self.__Irc.first_connexion_ip = server_msg_copy[2]
self.__Irc.first_score = 0
if str(server_msg_copy[3]).find('*') != -1:
# If * available, it means that an ircop changed the repurtation score
# means also that the user exist will try to update all users with same IP
self.__Irc.first_score = int(str(server_msg_copy[3]).replace('*',''))
for user in self.__Irc.User.UID_DB:
if user.remote_ip == self.__Irc.first_connexion_ip:
user.score_connexion = self.first_score
else:
self.__Irc.first_score = int(server_msg_copy[3])
# Possibilité de déclancher les bans a ce niveau.
except IndexError as ie:
self.Logs.error(f'Index Error {__name__}: {ie}')
except ValueError as ve:
self.__Irc.first_score = 0
self.Logs.error(f'Value Error {__name__}: {ve}')
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
def on_uid(self, serverMsg: list[str]) -> None: def on_uid(self, serverMsg: list[str]) -> None:
"""Handle uid message coming from the server """Handle uid message coming from the server
@@ -757,6 +856,102 @@ class Unrealircd6:
except Exception as err: except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}") self.__Base.logs.error(f"{__name__} - General Error: {err}")
def on_privmsg(self, serverMsg: list[str]) -> None:
"""Handle PRIVMSG message coming from the server
Args:
serverMsg (list[str]): Original server message
"""
try:
srv_msg = serverMsg.copy()
# Supprimer la premiere valeur
if srv_msg[0].startswith('@'):
srv_msg.pop(0)
cmd = srv_msg
# Hide auth logs
if len(cmd) == 7:
if cmd[2] == 'PRIVMSG' and cmd[4] == ':auth':
data_copy = cmd.copy()
data_copy[6] = '**********'
self.__Base.logs.debug(f">> {data_copy}")
else:
self.__Base.logs.debug(f">> {cmd}")
else:
self.__Base.logs.debug(f">> {cmd}")
get_uid_or_nickname = str(cmd[0].replace(':',''))
user_trigger = self.__Irc.User.get_nickname(get_uid_or_nickname)
dnickname = self.__Config.SERVICE_NICKNAME
pattern = fr'(:\{self.__Config.SERVICE_PREFIX})(.*)$'
hcmds = search(pattern, ' '.join(cmd)) # va matcher avec tout les caractéres aprés le .
if hcmds: # Commande qui commencent par le point
liste_des_commandes = list(hcmds.groups())
convert_to_string = ' '.join(liste_des_commandes)
arg = convert_to_string.split()
arg.remove(f':{self.__Config.SERVICE_PREFIX}')
if not arg[0].lower() in self.__Irc.commands:
self.__Base.logs.debug(f"This command {arg[0]} is not available")
self.send_notice(
nick_from=self.__Config.SERVICE_NICKNAME,
nick_to=user_trigger,
msg=f"This command [{self.__Config.COLORS.bold}{arg[0]}{self.__Config.COLORS.bold}] is not available"
)
return None
cmd_to_send = convert_to_string.replace(':','')
self.__Base.log_cmd(user_trigger, cmd_to_send)
fromchannel = str(cmd[2]).lower() if self.__Irc.Channel.Is_Channel(cmd[2]) else None
self.__Irc.hcmds(user_trigger, fromchannel, arg, cmd)
if cmd[2] == self.__Config.SERVICE_ID:
pattern = fr'^:.*?:(.*)$'
hcmds = search(pattern, ' '.join(cmd))
if hcmds: # par /msg defender [commande]
liste_des_commandes = list(hcmds.groups())
convert_to_string = ' '.join(liste_des_commandes)
arg = convert_to_string.split()
# Réponse a un CTCP VERSION
if arg[0] == '\x01VERSION\x01':
self.on_version(srv_msg)
return False
# Réponse a un TIME
if arg[0] == '\x01TIME\x01':
self.on_time(srv_msg)
return False
# Réponse a un PING
if arg[0] == '\x01PING':
self.on_ping(srv_msg)
return False
if not arg[0].lower() in self.__Irc.commands:
self.__Base.logs.debug(f"This command {arg[0]} sent by {user_trigger} is not available")
return False
cmd_to_send = convert_to_string.replace(':','')
self.__Base.log_cmd(user_trigger, cmd_to_send)
fromchannel = None
if len(arg) >= 2:
fromchannel = str(arg[1]).lower() if self.__Irc.Channel.Is_Channel(arg[1]) else None
self.__Irc.hcmds(user_trigger, fromchannel, arg, cmd)
return None
except KeyError as ke:
self.__Base.logs.error(f"Key Error: {ke}")
except Exception as err:
self.__Base.logs.error(f"General Error: {err}")
def on_server_ping(self, serverMsg: list[str]) -> None: def on_server_ping(self, serverMsg: list[str]) -> None:
"""Send a PONG message to the server """Send a PONG message to the server

View File

@@ -11,6 +11,8 @@ import traceback
from ssl import SSLSocket from ssl import SSLSocket
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Union from typing import Union
from websockets import serve
from core.loader import Loader from core.loader import Loader
from core.classes.protocol import Protocol from core.classes.protocol import Protocol
@@ -744,6 +746,16 @@ class Irc:
self.Logs.warning(f'Size ({str(len(original_response))}) - {original_response}') self.Logs.warning(f'Size ({str(len(original_response))}) - {original_response}')
return False return False
if len(original_response) == 7:
if original_response[2] == 'PRIVMSG' and original_response[4] == ':auth':
data_copy = original_response.copy()
data_copy[6] = '**********'
self.Logs.debug(f">> {data_copy}")
else:
self.Logs.debug(f">> {original_response}")
else:
self.Logs.debug(f">> {original_response}")
parsed_protocol = self.Protocol.parse_server_msg(original_response.copy()) parsed_protocol = self.Protocol.parse_server_msg(original_response.copy())
match parsed_protocol: match parsed_protocol:
@@ -757,61 +769,9 @@ class Irc:
self.Protocol.on_sjoin(serverMsg=original_response) self.Protocol.on_sjoin(serverMsg=original_response)
self.Logs.debug(f"** handle {parsed_protocol}") self.Logs.debug(f"** handle {parsed_protocol}")
case 'EOS': # TODO case 'EOS':
hsid = str(original_response[0]).replace(':','') self.Protocol.on_eos(serverMsg=original_response)
if hsid == self.Config.HSID:
if self.Config.DEFENDER_INIT == 1:
current_version = self.Config.CURRENT_VERSION
latest_version = self.Config.LATEST_VERSION
if self.Base.check_for_new_version(False):
version = f'{current_version} >>> {latest_version}'
else:
version = f'{current_version}'
print(f"################### DEFENDER ###################")
print(f"# SERVICE CONNECTE ")
print(f"# SERVEUR : {self.Config.SERVEUR_IP} ")
print(f"# PORT : {self.Config.SERVEUR_PORT} ")
print(f"# SSL : {self.Config.SERVEUR_SSL} ")
print(f"# SSL VER : {self.Config.SSL_VERSION} ")
print(f"# NICKNAME : {self.Config.SERVICE_NICKNAME} ")
print(f"# CHANNEL : {self.Config.SERVICE_CHANLOG} ")
print(f"# VERSION : {version} ")
print(f"################################################")
self.Logs.info(f"################### DEFENDER ###################")
self.Logs.info(f"# SERVICE CONNECTE ")
self.Logs.info(f"# SERVEUR : {self.Config.SERVEUR_IP} ")
self.Logs.info(f"# PORT : {self.Config.SERVEUR_PORT} ")
self.Logs.info(f"# SSL : {self.Config.SERVEUR_SSL} ")
self.Logs.info(f"# SSL VER : {self.Config.SSL_VERSION} ")
self.Logs.info(f"# NICKNAME : {self.Config.SERVICE_NICKNAME} ")
self.Logs.info(f"# CHANNEL : {self.Config.SERVICE_CHANLOG} ")
self.Logs.info(f"# VERSION : {version} ")
self.Logs.info(f"################################################")
if self.Base.check_for_new_version(False):
self.Protocol.send_priv_msg(
nick_from=self.Config.SERVICE_NICKNAME,
msg=f" New Version available {version}",
channel=self.Config.SERVICE_CHANLOG
)
# Initialisation terminé aprés le premier PING
self.Protocol.send_priv_msg(
nick_from=self.Config.SERVICE_NICKNAME,
msg=f"[{self.Config.COLORS.green}INFORMATION{self.Config.COLORS.nogc}] >> Defender is ready",
channel=self.Config.SERVICE_CHANLOG
)
self.Config.DEFENDER_INIT = 0
# Send EOF to other modules
for classe_name, classe_object in self.loaded_classes.items():
classe_object.cmd(original_response)
# Stop here When EOS
self.Logs.debug(f"** handle {parsed_protocol}") self.Logs.debug(f"** handle {parsed_protocol}")
return None
case 'UID': case 'UID':
try: try:
@@ -859,29 +819,9 @@ class Irc:
self.Protocol.on_nick(serverMsg=original_response) self.Protocol.on_nick(serverMsg=original_response)
self.Logs.debug(f"** handle {parsed_protocol}") self.Logs.debug(f"** handle {parsed_protocol}")
case 'REPUTATION': # TODO case 'REPUTATION':
# :001 REPUTATION 127.0.0.1 118 self.Protocol.on_reputation(serverMsg=original_response)
try:
self.first_connexion_ip = original_response[2]
self.first_score = 0
if str(original_response[3]).find('*') != -1:
# If * available, it means that an ircop changed the repurtation score
# means also that the user exist will try to update all users with same IP
self.first_score = int(str(original_response[3]).replace('*',''))
for user in self.User.UID_DB:
if user.remote_ip == self.first_connexion_ip:
user.score_connexion = self.first_score
else:
self.first_score = int(original_response[3])
self.Logs.debug(f"** handle {parsed_protocol}") self.Logs.debug(f"** handle {parsed_protocol}")
# Possibilité de déclancher les bans a ce niveau.
except IndexError as ie:
self.Logs.error(f'{ie}')
except ValueError as ve:
self.first_score = 0
self.Logs.error(f'Impossible to convert first_score: {ve}')
case 'SLOG': # TODO case 'SLOG': # TODO
self.Logs.debug(f"** handle {parsed_protocol}") self.Logs.debug(f"** handle {parsed_protocol}")
@@ -889,89 +829,9 @@ class Irc:
case 'MD': # TODO case 'MD': # TODO
self.Logs.debug(f"** handle {parsed_protocol}") self.Logs.debug(f"** handle {parsed_protocol}")
case 'PRIVMSG': # TODO case 'PRIVMSG':
try: self.Protocol.on_privmsg(serverMsg=original_response)
# Supprimer la premiere valeur self.Logs.debug(f"** handle {parsed_protocol}")
cmd = interm_response.copy()
get_uid_or_nickname = str(cmd[0].replace(':',''))
user_trigger = self.User.get_nickname(get_uid_or_nickname)
dnickname = self.Config.SERVICE_NICKNAME
if len(cmd) == 6:
if cmd[1] == 'PRIVMSG' and str(cmd[3]).replace(self.Config.SERVICE_PREFIX,'') == ':auth':
cmd_copy = cmd.copy()
cmd_copy[5] = '**********'
self.Logs.info(f'>> {cmd_copy}')
else:
self.Logs.info(f'>> {cmd}')
else:
self.Logs.info(f'>> {cmd}')
pattern = fr'(:\{self.Config.SERVICE_PREFIX})(.*)$'
hcmds = re.search(pattern, ' '.join(cmd)) # va matcher avec tout les caractéres aprés le .
if hcmds: # Commande qui commencent par le point
liste_des_commandes = list(hcmds.groups())
convert_to_string = ' '.join(liste_des_commandes)
arg = convert_to_string.split()
arg.remove(f':{self.Config.SERVICE_PREFIX}')
if not arg[0].lower() in self.commands:
self.Logs.debug(f"This command {arg[0]} is not available")
self.Protocol.send_notice(
nick_from=self.Config.SERVICE_NICKNAME,
nick_to=user_trigger,
msg=f"This command [{self.Config.COLORS.bold}{arg[0]}{self.Config.COLORS.bold}] is not available"
)
return None
cmd_to_send = convert_to_string.replace(':','')
self.Base.log_cmd(user_trigger, cmd_to_send)
fromchannel = str(cmd[2]).lower() if self.Channel.Is_Channel(cmd[2]) else None
self.hcmds(user_trigger, fromchannel, arg, cmd)
if cmd[2] == self.Config.SERVICE_ID:
pattern = fr'^:.*?:(.*)$'
hcmds = re.search(pattern, ' '.join(cmd))
if hcmds: # par /msg defender [commande]
liste_des_commandes = list(hcmds.groups())
convert_to_string = ' '.join(liste_des_commandes)
arg = convert_to_string.split()
# Réponse a un CTCP VERSION
if arg[0] == '\x01VERSION\x01':
self.Protocol.on_version(original_response)
return False
# Réponse a un TIME
if arg[0] == '\x01TIME\x01':
self.Protocol.on_time(original_response)
return False
# Réponse a un PING
if arg[0] == '\x01PING':
self.Protocol.on_ping(original_response)
return False
if not arg[0].lower() in self.commands:
self.Logs.debug(f"This command {arg[0]} sent by {user_trigger} is not available")
return False
cmd_to_send = convert_to_string.replace(':','')
self.Base.log_cmd(user_trigger, cmd_to_send)
fromchannel = None
if len(arg) >= 2:
fromchannel = str(arg[1]).lower() if self.Channel.Is_Channel(arg[1]) else None
self.hcmds(user_trigger, fromchannel, arg, cmd)
# print(f"** handle {parsed_protocol}")
except IndexError as io:
self.Logs.error(f'{io}')
case 'PONG': # TODO case 'PONG': # TODO
self.Logs.debug(f"** handle {parsed_protocol}") self.Logs.debug(f"** handle {parsed_protocol}")
@@ -992,16 +852,6 @@ class Irc:
case None: case None:
self.Logs.debug(f"** TO BE HANDLE {original_response}") self.Logs.debug(f"** TO BE HANDLE {original_response}")
if len(original_response) == 7:
if original_response[2] == 'PRIVMSG' and original_response[4] == ':auth':
data_copy = original_response.copy()
data_copy[6] = '**********'
self.Logs.debug(f">> {data_copy}")
else:
self.Logs.debug(f">> {original_response}")
else:
self.Logs.debug(f">> {original_response}")
if len(original_response) > 2: if len(original_response) > 2:
if original_response[2] != 'UID': if original_response[2] != 'UID':
# Envoyer la commande aux classes dynamiquement chargées # Envoyer la commande aux classes dynamiquement chargées
@@ -1606,7 +1456,7 @@ class Irc:
self.Protocol.send_notice( self.Protocol.send_notice(
nick_from=dnickname, nick_from=dnickname,
nick_to=fromuser, nick_to=fromuser,
msg=f'{key} > {value}' msg=f'{key} = {value}'
) )
case 'uptime': case 'uptime':

View File

@@ -1281,6 +1281,10 @@ class Defender():
try: try:
# autolimit on # autolimit on
# autolimit set [amount] [interval] # autolimit set [amount] [interval]
if len(cmd) < 2:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} ON")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} SET [AMOUNT] [INTERVAL]")
return None
arg = str(cmd[1]).lower() arg = str(cmd[1]).lower()
@@ -1314,12 +1318,12 @@ class Defender():
) )
case _: case _:
self.Protocol.send_notice(nick_from=dnickname, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} ON", nickname=fromuser) self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} ON")
self.Protocol.send_notice(nick_from=dnickname, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} SET [AMOUNT] [INTERVAL]", nickname=fromuser) self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} SET [AMOUNT] [INTERVAL]")
except Exception as err: except Exception as err:
self.Protocol.send_notice(nick_from=dnickname, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} ON", nickname=fromuser) self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} ON")
self.Protocol.send_notice(nick_from=dnickname, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} SET [AMOUNT] [INTERVAL]", nickname=fromuser) self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} SET [AMOUNT] [INTERVAL]")
self.Logs.error(f"Value Error -> {err}") self.Logs.error(f"Value Error -> {err}")
case 'reputation': case 'reputation':

View File

@@ -1,5 +1,5 @@
{ {
"version": "6.0.3", "version": "6.0.4",
"requests": "2.32.3", "requests": "2.32.3",
"psutil": "6.0.0", "psutil": "6.0.0",