2 Commits

Author SHA1 Message Date
adator
a3edf48120 Update on_version method 2024-11-22 23:26:39 +01:00
adator
f7664c9874 V6.0.4 2024-11-18 23:49:45 +01:00
4 changed files with 226 additions and 181 deletions

View File

@@ -1,10 +1,7 @@
from re import match, findall from re import match, findall, search
from datetime import datetime from datetime import datetime
from typing import TYPE_CHECKING, Union from typing import TYPE_CHECKING, Union
from ssl import SSLEOFError, SSLError from ssl import SSLEOFError, SSLError
from dataclasses import dataclass
from websockets import serve
if TYPE_CHECKING: if TYPE_CHECKING:
from core.irc import Irc from core.irc import Irc
@@ -694,6 +691,105 @@ class Unrealircd6:
except Exception as err: except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}") self.__Base.logs.error(f"{__name__} - General Error: {err}")
def on_eos(self, serverMsg: list[str]) -> None:
"""Handle EOS coming from a server
Args:
serverMsg (list[str]): Original server message
"""
try:
# [':001', 'EOS']
server_msg_copy = serverMsg.copy()
hsid = str(server_msg_copy[0]).replace(':','')
if hsid == self.__Config.HSID:
if self.__Config.DEFENDER_INIT == 1:
current_version = self.__Config.CURRENT_VERSION
latest_version = self.__Config.LATEST_VERSION
if self.__Base.check_for_new_version(False):
version = f'{current_version} >>> {latest_version}'
else:
version = f'{current_version}'
print(f"################### DEFENDER ###################")
print(f"# SERVICE CONNECTE ")
print(f"# SERVEUR : {self.__Config.SERVEUR_IP} ")
print(f"# PORT : {self.__Config.SERVEUR_PORT} ")
print(f"# SSL : {self.__Config.SERVEUR_SSL} ")
print(f"# SSL VER : {self.__Config.SSL_VERSION} ")
print(f"# NICKNAME : {self.__Config.SERVICE_NICKNAME} ")
print(f"# CHANNEL : {self.__Config.SERVICE_CHANLOG} ")
print(f"# VERSION : {version} ")
print(f"################################################")
self.__Base.logs.info(f"################### DEFENDER ###################")
self.__Base.logs.info(f"# SERVICE CONNECTE ")
self.__Base.logs.info(f"# SERVEUR : {self.__Config.SERVEUR_IP} ")
self.__Base.logs.info(f"# PORT : {self.__Config.SERVEUR_PORT} ")
self.__Base.logs.info(f"# SSL : {self.__Config.SERVEUR_SSL} ")
self.__Base.logs.info(f"# SSL VER : {self.__Config.SSL_VERSION} ")
self.__Base.logs.info(f"# NICKNAME : {self.__Config.SERVICE_NICKNAME} ")
self.__Base.logs.info(f"# CHANNEL : {self.__Config.SERVICE_CHANLOG} ")
self.__Base.logs.info(f"# VERSION : {version} ")
self.__Base.logs.info(f"################################################")
if self.__Base.check_for_new_version(False):
self.send_priv_msg(
nick_from=self.__Config.SERVICE_NICKNAME,
msg=f" New Version available {version}",
channel=self.__Config.SERVICE_CHANLOG
)
# Initialisation terminé aprés le premier PING
self.send_priv_msg(
nick_from=self.__Config.SERVICE_NICKNAME,
msg=f"[{self.__Config.COLORS.green}INFORMATION{self.__Config.COLORS.nogc}] >> Defender is ready",
channel=self.__Config.SERVICE_CHANLOG
)
self.__Config.DEFENDER_INIT = 0
# Send EOF to other modules
for classe_name, classe_object in self.__Irc.loaded_classes.items():
classe_object.cmd(server_msg_copy)
return None
except IndexError as ie:
self.__Base.logs.error(f"{__name__} - Key Error: {ie}")
except KeyError as ke:
self.__Base.logs.error(f"{__name__} - Key Error: {ke}")
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
def on_reputation(self, serverMsg: list[str]) -> None:
"""Handle REPUTATION coming from a server
Args:
serverMsg (list[str]): Original server message
"""
try:
# :001 REPUTATION 127.0.0.1 118
server_msg_copy = serverMsg.copy()
self.__Irc.first_connexion_ip = server_msg_copy[2]
self.__Irc.first_score = 0
if str(server_msg_copy[3]).find('*') != -1:
# If * available, it means that an ircop changed the repurtation score
# means also that the user exist will try to update all users with same IP
self.__Irc.first_score = int(str(server_msg_copy[3]).replace('*',''))
for user in self.__Irc.User.UID_DB:
if user.remote_ip == self.__Irc.first_connexion_ip:
user.score_connexion = self.first_score
else:
self.__Irc.first_score = int(server_msg_copy[3])
# Possibilité de déclancher les bans a ce niveau.
except IndexError as ie:
self.Logs.error(f'Index Error {__name__}: {ie}')
except ValueError as ve:
self.__Irc.first_score = 0
self.Logs.error(f'Value Error {__name__}: {ve}')
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
def on_uid(self, serverMsg: list[str]) -> None: def on_uid(self, serverMsg: list[str]) -> None:
"""Handle uid message coming from the server """Handle uid message coming from the server
@@ -757,6 +853,101 @@ class Unrealircd6:
except Exception as err: except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}") self.__Base.logs.error(f"{__name__} - General Error: {err}")
def on_privmsg(self, serverMsg: list[str]) -> None:
"""Handle PRIVMSG message coming from the server
Args:
serverMsg (list[str]): Original server message
"""
try:
srv_msg = serverMsg.copy()
cmd = serverMsg.copy()
# Supprimer la premiere valeur si MTAGS activé
if cmd[0].startswith('@'):
cmd.pop(0)
# Hide auth logs
if len(cmd) == 7:
if cmd[2] == 'PRIVMSG' and cmd[4] == ':auth':
data_copy = cmd.copy()
data_copy[6] = '**********'
self.__Base.logs.debug(f">> {data_copy}")
else:
self.__Base.logs.debug(f">> {cmd}")
else:
self.__Base.logs.debug(f">> {cmd}")
get_uid_or_nickname = str(cmd[0].replace(':',''))
user_trigger = self.__Irc.User.get_nickname(get_uid_or_nickname)
dnickname = self.__Config.SERVICE_NICKNAME
pattern = fr'(:\{self.__Config.SERVICE_PREFIX})(.*)$'
hcmds = search(pattern, ' '.join(cmd)) # va matcher avec tout les caractéres aprés le .
if hcmds: # Commande qui commencent par le point
liste_des_commandes = list(hcmds.groups())
convert_to_string = ' '.join(liste_des_commandes)
arg = convert_to_string.split()
arg.remove(f':{self.__Config.SERVICE_PREFIX}')
if not arg[0].lower() in self.__Irc.commands:
self.__Base.logs.debug(f"This command {arg[0]} is not available")
self.send_notice(
nick_from=self.__Config.SERVICE_NICKNAME,
nick_to=user_trigger,
msg=f"This command [{self.__Config.COLORS.bold}{arg[0]}{self.__Config.COLORS.bold}] is not available"
)
return None
cmd_to_send = convert_to_string.replace(':','')
self.__Base.log_cmd(user_trigger, cmd_to_send)
fromchannel = str(cmd[2]).lower() if self.__Irc.Channel.Is_Channel(cmd[2]) else None
self.__Irc.hcmds(user_trigger, fromchannel, arg, cmd)
if cmd[2] == self.__Config.SERVICE_ID:
pattern = fr'^:.*?:(.*)$'
hcmds = search(pattern, ' '.join(cmd))
if hcmds: # par /msg defender [commande]
liste_des_commandes = list(hcmds.groups())
convert_to_string = ' '.join(liste_des_commandes)
arg = convert_to_string.split()
# Réponse a un CTCP VERSION
if arg[0] == '\x01VERSION\x01':
self.on_version(srv_msg)
return False
# Réponse a un TIME
if arg[0] == '\x01TIME\x01':
self.on_time(srv_msg)
return False
# Réponse a un PING
if arg[0] == '\x01PING':
self.on_ping(srv_msg)
return False
if not arg[0].lower() in self.__Irc.commands:
self.__Base.logs.debug(f"This command {arg[0]} sent by {user_trigger} is not available")
return False
cmd_to_send = convert_to_string.replace(':','')
self.__Base.log_cmd(user_trigger, cmd_to_send)
fromchannel = None
if len(arg) >= 2:
fromchannel = str(arg[1]).lower() if self.__Irc.Channel.Is_Channel(arg[1]) else None
self.__Irc.hcmds(user_trigger, fromchannel, arg, cmd)
return None
except KeyError as ke:
self.__Base.logs.error(f"Key Error: {ke}")
except Exception as err:
self.__Base.logs.error(f"General Error: {err}")
def on_server_ping(self, serverMsg: list[str]) -> None: def on_server_ping(self, serverMsg: list[str]) -> None:
"""Send a PONG message to the server """Send a PONG message to the server

View File

@@ -11,6 +11,8 @@ import traceback
from ssl import SSLSocket from ssl import SSLSocket
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Union from typing import Union
from websockets import serve
from core.loader import Loader from core.loader import Loader
from core.classes.protocol import Protocol from core.classes.protocol import Protocol
@@ -744,6 +746,16 @@ class Irc:
self.Logs.warning(f'Size ({str(len(original_response))}) - {original_response}') self.Logs.warning(f'Size ({str(len(original_response))}) - {original_response}')
return False return False
if len(original_response) == 7:
if original_response[2] == 'PRIVMSG' and original_response[4] == ':auth':
data_copy = original_response.copy()
data_copy[6] = '**********'
self.Logs.debug(f">> {data_copy}")
else:
self.Logs.debug(f">> {original_response}")
else:
self.Logs.debug(f">> {original_response}")
parsed_protocol = self.Protocol.parse_server_msg(original_response.copy()) parsed_protocol = self.Protocol.parse_server_msg(original_response.copy())
match parsed_protocol: match parsed_protocol:
@@ -757,61 +769,9 @@ class Irc:
self.Protocol.on_sjoin(serverMsg=original_response) self.Protocol.on_sjoin(serverMsg=original_response)
self.Logs.debug(f"** handle {parsed_protocol}") self.Logs.debug(f"** handle {parsed_protocol}")
case 'EOS': # TODO case 'EOS':
hsid = str(original_response[0]).replace(':','') self.Protocol.on_eos(serverMsg=original_response)
if hsid == self.Config.HSID:
if self.Config.DEFENDER_INIT == 1:
current_version = self.Config.CURRENT_VERSION
latest_version = self.Config.LATEST_VERSION
if self.Base.check_for_new_version(False):
version = f'{current_version} >>> {latest_version}'
else:
version = f'{current_version}'
print(f"################### DEFENDER ###################")
print(f"# SERVICE CONNECTE ")
print(f"# SERVEUR : {self.Config.SERVEUR_IP} ")
print(f"# PORT : {self.Config.SERVEUR_PORT} ")
print(f"# SSL : {self.Config.SERVEUR_SSL} ")
print(f"# SSL VER : {self.Config.SSL_VERSION} ")
print(f"# NICKNAME : {self.Config.SERVICE_NICKNAME} ")
print(f"# CHANNEL : {self.Config.SERVICE_CHANLOG} ")
print(f"# VERSION : {version} ")
print(f"################################################")
self.Logs.info(f"################### DEFENDER ###################")
self.Logs.info(f"# SERVICE CONNECTE ")
self.Logs.info(f"# SERVEUR : {self.Config.SERVEUR_IP} ")
self.Logs.info(f"# PORT : {self.Config.SERVEUR_PORT} ")
self.Logs.info(f"# SSL : {self.Config.SERVEUR_SSL} ")
self.Logs.info(f"# SSL VER : {self.Config.SSL_VERSION} ")
self.Logs.info(f"# NICKNAME : {self.Config.SERVICE_NICKNAME} ")
self.Logs.info(f"# CHANNEL : {self.Config.SERVICE_CHANLOG} ")
self.Logs.info(f"# VERSION : {version} ")
self.Logs.info(f"################################################")
if self.Base.check_for_new_version(False):
self.Protocol.send_priv_msg(
nick_from=self.Config.SERVICE_NICKNAME,
msg=f" New Version available {version}",
channel=self.Config.SERVICE_CHANLOG
)
# Initialisation terminé aprés le premier PING
self.Protocol.send_priv_msg(
nick_from=self.Config.SERVICE_NICKNAME,
msg=f"[{self.Config.COLORS.green}INFORMATION{self.Config.COLORS.nogc}] >> Defender is ready",
channel=self.Config.SERVICE_CHANLOG
)
self.Config.DEFENDER_INIT = 0
# Send EOF to other modules
for classe_name, classe_object in self.loaded_classes.items():
classe_object.cmd(original_response)
# Stop here When EOS
self.Logs.debug(f"** handle {parsed_protocol}") self.Logs.debug(f"** handle {parsed_protocol}")
return None
case 'UID': case 'UID':
try: try:
@@ -859,29 +819,9 @@ class Irc:
self.Protocol.on_nick(serverMsg=original_response) self.Protocol.on_nick(serverMsg=original_response)
self.Logs.debug(f"** handle {parsed_protocol}") self.Logs.debug(f"** handle {parsed_protocol}")
case 'REPUTATION': # TODO case 'REPUTATION':
# :001 REPUTATION 127.0.0.1 118 self.Protocol.on_reputation(serverMsg=original_response)
try:
self.first_connexion_ip = original_response[2]
self.first_score = 0
if str(original_response[3]).find('*') != -1:
# If * available, it means that an ircop changed the repurtation score
# means also that the user exist will try to update all users with same IP
self.first_score = int(str(original_response[3]).replace('*',''))
for user in self.User.UID_DB:
if user.remote_ip == self.first_connexion_ip:
user.score_connexion = self.first_score
else:
self.first_score = int(original_response[3])
self.Logs.debug(f"** handle {parsed_protocol}") self.Logs.debug(f"** handle {parsed_protocol}")
# Possibilité de déclancher les bans a ce niveau.
except IndexError as ie:
self.Logs.error(f'{ie}')
except ValueError as ve:
self.first_score = 0
self.Logs.error(f'Impossible to convert first_score: {ve}')
case 'SLOG': # TODO case 'SLOG': # TODO
self.Logs.debug(f"** handle {parsed_protocol}") self.Logs.debug(f"** handle {parsed_protocol}")
@@ -889,89 +829,9 @@ class Irc:
case 'MD': # TODO case 'MD': # TODO
self.Logs.debug(f"** handle {parsed_protocol}") self.Logs.debug(f"** handle {parsed_protocol}")
case 'PRIVMSG': # TODO case 'PRIVMSG':
try: self.Protocol.on_privmsg(serverMsg=original_response)
# Supprimer la premiere valeur self.Logs.debug(f"** handle {parsed_protocol}")
cmd = interm_response.copy()
get_uid_or_nickname = str(cmd[0].replace(':',''))
user_trigger = self.User.get_nickname(get_uid_or_nickname)
dnickname = self.Config.SERVICE_NICKNAME
if len(cmd) == 6:
if cmd[1] == 'PRIVMSG' and str(cmd[3]).replace(self.Config.SERVICE_PREFIX,'') == ':auth':
cmd_copy = cmd.copy()
cmd_copy[5] = '**********'
self.Logs.info(f'>> {cmd_copy}')
else:
self.Logs.info(f'>> {cmd}')
else:
self.Logs.info(f'>> {cmd}')
pattern = fr'(:\{self.Config.SERVICE_PREFIX})(.*)$'
hcmds = re.search(pattern, ' '.join(cmd)) # va matcher avec tout les caractéres aprés le .
if hcmds: # Commande qui commencent par le point
liste_des_commandes = list(hcmds.groups())
convert_to_string = ' '.join(liste_des_commandes)
arg = convert_to_string.split()
arg.remove(f':{self.Config.SERVICE_PREFIX}')
if not arg[0].lower() in self.commands:
self.Logs.debug(f"This command {arg[0]} is not available")
self.Protocol.send_notice(
nick_from=self.Config.SERVICE_NICKNAME,
nick_to=user_trigger,
msg=f"This command [{self.Config.COLORS.bold}{arg[0]}{self.Config.COLORS.bold}] is not available"
)
return None
cmd_to_send = convert_to_string.replace(':','')
self.Base.log_cmd(user_trigger, cmd_to_send)
fromchannel = str(cmd[2]).lower() if self.Channel.Is_Channel(cmd[2]) else None
self.hcmds(user_trigger, fromchannel, arg, cmd)
if cmd[2] == self.Config.SERVICE_ID:
pattern = fr'^:.*?:(.*)$'
hcmds = re.search(pattern, ' '.join(cmd))
if hcmds: # par /msg defender [commande]
liste_des_commandes = list(hcmds.groups())
convert_to_string = ' '.join(liste_des_commandes)
arg = convert_to_string.split()
# Réponse a un CTCP VERSION
if arg[0] == '\x01VERSION\x01':
self.Protocol.on_version(original_response)
return False
# Réponse a un TIME
if arg[0] == '\x01TIME\x01':
self.Protocol.on_time(original_response)
return False
# Réponse a un PING
if arg[0] == '\x01PING':
self.Protocol.on_ping(original_response)
return False
if not arg[0].lower() in self.commands:
self.Logs.debug(f"This command {arg[0]} sent by {user_trigger} is not available")
return False
cmd_to_send = convert_to_string.replace(':','')
self.Base.log_cmd(user_trigger, cmd_to_send)
fromchannel = None
if len(arg) >= 2:
fromchannel = str(arg[1]).lower() if self.Channel.Is_Channel(arg[1]) else None
self.hcmds(user_trigger, fromchannel, arg, cmd)
# print(f"** handle {parsed_protocol}")
except IndexError as io:
self.Logs.error(f'{io}')
case 'PONG': # TODO case 'PONG': # TODO
self.Logs.debug(f"** handle {parsed_protocol}") self.Logs.debug(f"** handle {parsed_protocol}")
@@ -992,16 +852,6 @@ class Irc:
case None: case None:
self.Logs.debug(f"** TO BE HANDLE {original_response}") self.Logs.debug(f"** TO BE HANDLE {original_response}")
if len(original_response) == 7:
if original_response[2] == 'PRIVMSG' and original_response[4] == ':auth':
data_copy = original_response.copy()
data_copy[6] = '**********'
self.Logs.debug(f">> {data_copy}")
else:
self.Logs.debug(f">> {original_response}")
else:
self.Logs.debug(f">> {original_response}")
if len(original_response) > 2: if len(original_response) > 2:
if original_response[2] != 'UID': if original_response[2] != 'UID':
# Envoyer la commande aux classes dynamiquement chargées # Envoyer la commande aux classes dynamiquement chargées
@@ -1606,7 +1456,7 @@ class Irc:
self.Protocol.send_notice( self.Protocol.send_notice(
nick_from=dnickname, nick_from=dnickname,
nick_to=fromuser, nick_to=fromuser,
msg=f'{key} > {value}' msg=f'{key} = {value}'
) )
case 'uptime': case 'uptime':

View File

@@ -1281,6 +1281,10 @@ class Defender():
try: try:
# autolimit on # autolimit on
# autolimit set [amount] [interval] # autolimit set [amount] [interval]
if len(cmd) < 2:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} ON")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} SET [AMOUNT] [INTERVAL]")
return None
arg = str(cmd[1]).lower() arg = str(cmd[1]).lower()
@@ -1314,12 +1318,12 @@ class Defender():
) )
case _: case _:
self.Protocol.send_notice(nick_from=dnickname, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} ON", nickname=fromuser) self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} ON")
self.Protocol.send_notice(nick_from=dnickname, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} SET [AMOUNT] [INTERVAL]", nickname=fromuser) self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} SET [AMOUNT] [INTERVAL]")
except Exception as err: except Exception as err:
self.Protocol.send_notice(nick_from=dnickname, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} ON", nickname=fromuser) self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} ON")
self.Protocol.send_notice(nick_from=dnickname, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} SET [AMOUNT] [INTERVAL]", nickname=fromuser) self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} SET [AMOUNT] [INTERVAL]")
self.Logs.error(f"Value Error -> {err}") self.Logs.error(f"Value Error -> {err}")
case 'reputation': case 'reputation':

View File

@@ -1,5 +1,5 @@
{ {
"version": "6.0.3", "version": "6.0.5",
"requests": "2.32.3", "requests": "2.32.3",
"psutil": "6.0.0", "psutil": "6.0.0",