mirror of
https://github.com/iio612/DEFENDER.git
synced 2026-02-13 11:14:23 +00:00
Fix get_datetime call and update some docstring.
This commit is contained in:
@@ -36,7 +36,7 @@ class Command:
|
|||||||
def get_command(self, command_name: str, module_name: str) -> Optional[MCommand]:
|
def get_command(self, command_name: str, module_name: str) -> Optional[MCommand]:
|
||||||
|
|
||||||
for command in self.DB_COMMANDS:
|
for command in self.DB_COMMANDS:
|
||||||
if command.command_name.lower() == command_name and command.module_name == module_name:
|
if command.command_name.lower() == command_name.lower() and command.module_name.lower() == module_name.lower():
|
||||||
return command
|
return command
|
||||||
|
|
||||||
return None
|
return None
|
||||||
@@ -90,7 +90,7 @@ class Command:
|
|||||||
admin_level = admin.level if admin else 0
|
admin_level = admin.level if admin else 0
|
||||||
commands = self.get_commands_by_level(admin_level)
|
commands = self.get_commands_by_level(admin_level)
|
||||||
|
|
||||||
if command_name in [command.command_name for command in commands]:
|
if command_name.lower() in [command.command_name.lower() for command in commands]:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|||||||
@@ -42,11 +42,11 @@ class Configuration:
|
|||||||
try:
|
try:
|
||||||
import_config = self.__load_json_service_configuration()
|
import_config = self.__load_json_service_configuration()
|
||||||
|
|
||||||
Model_keys = MConfig().to_dict()
|
model_keys = MConfig().to_dict()
|
||||||
model_key_list: list = []
|
model_key_list: list = []
|
||||||
json_config_key_list: list = []
|
json_config_key_list: list = []
|
||||||
|
|
||||||
for key in Model_keys:
|
for key in model_keys:
|
||||||
model_key_list.append(key)
|
model_key_list.append(key)
|
||||||
|
|
||||||
for key in import_config:
|
for key in import_config:
|
||||||
|
|||||||
@@ -288,46 +288,46 @@ class IProtocol(ABC):
|
|||||||
# ------------------------------------------------------------------------
|
# ------------------------------------------------------------------------
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def parse_uid(self, server_msg: list[str]) -> dict[str, str]:
|
def parse_uid(self, serverMsg: list[str]) -> dict[str, str]:
|
||||||
"""Parse UID and return dictionary.
|
"""Parse UID and return dictionary.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
server_msg (list[str]): The UID IRCD message
|
serverMsg (list[str]): The UID IRCD message
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
dict[str, str]: The response as dictionary.
|
dict[str, str]: The response as dictionary.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def parse_quit(self, server_msg: list[str]) -> dict[str, str]:
|
def parse_quit(self, serverMsg: list[str]) -> dict[str, str]:
|
||||||
"""Parse quit and return dictionary.
|
"""Parse quit and return dictionary.
|
||||||
>>> [':97KAAAAAB', 'QUIT', ':Quit:', 'this', 'is', 'my', 'reason', 'to', 'quit']
|
>>> [':97KAAAAAB', 'QUIT', ':Quit:', 'this', 'is', 'my', 'reason', 'to', 'quit']
|
||||||
Args:
|
Args:
|
||||||
server_msg (list[str]): The server message to parse
|
serverMsg (list[str]): The server message to parse
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
dict[str, str]: The response as dictionary.
|
dict[str, str]: The response as dictionary.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def parse_nick(self, server_msg: list[str]) -> dict[str, str]:
|
def parse_nick(self, serverMsg: list[str]) -> dict[str, str]:
|
||||||
"""Parse nick changes and return dictionary.
|
"""Parse nick changes and return dictionary.
|
||||||
>>> [':97KAAAAAC', 'NICK', 'testinspir', '1757360740']
|
>>> [':97KAAAAAC', 'NICK', 'testinspir', '1757360740']
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
server_msg (list[str]): The server message to parse
|
serverMsg (list[str]): The server message to parse
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
dict[str, str]: The response as dictionary.
|
dict[str, str]: The response as dictionary.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def parse_privmsg(self, server_msg: list[str]) -> dict[str, str]:
|
def parse_privmsg(self, serverMsg: list[str]) -> dict[str, str]:
|
||||||
"""Parse PRIVMSG message.
|
"""Parse PRIVMSG message.
|
||||||
>>> [':97KAAAAAE', 'PRIVMSG', '#welcome', ':This', 'is', 'my', 'public', 'message']
|
>>> [':97KAAAAAE', 'PRIVMSG', '#welcome', ':This', 'is', 'my', 'public', 'message']
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
server_msg (list[str]): The server message to parse
|
serverMsg (list[str]): The server message to parse
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
dict[str, str]: The response as dictionary.
|
dict[str, str]: The response as dictionary.
|
||||||
@@ -345,174 +345,174 @@ class IProtocol(ABC):
|
|||||||
# ------------------------------------------------------------------------
|
# ------------------------------------------------------------------------
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def on_svs2mode(self, server_msg: list[str]) -> None:
|
def on_svs2mode(self, serverMsg: list[str]) -> None:
|
||||||
"""Handle svs2mode coming from a server
|
"""Handle svs2mode coming from a server
|
||||||
>>> [':00BAAAAAG', 'SVS2MODE', '001U01R03', '-r']
|
>>> [':00BAAAAAG', 'SVS2MODE', '001U01R03', '-r']
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
server_msg (list[str]): Original server message
|
serverMsg (list[str]): Original server message
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def on_mode(self, server_msg: list[str]) -> None:
|
def on_mode(self, serverMsg: list[str]) -> None:
|
||||||
"""Handle mode coming from a server
|
"""Handle mode coming from a server
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
server_msg (list[str]): Original server message
|
serverMsg (list[str]): Original server message
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def on_umode2(self, server_msg: list[str]) -> None:
|
def on_umode2(self, serverMsg: list[str]) -> None:
|
||||||
"""Handle umode2 coming from a server
|
"""Handle umode2 coming from a server
|
||||||
>>> [':adator_', 'UMODE2', '-i']
|
>>> [':adator_', 'UMODE2', '-i']
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
server_msg (list[str]): Original server message
|
serverMsg (list[str]): Original server message
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def on_quit(self, server_msg: list[str]) -> None:
|
def on_quit(self, serverMsg: list[str]) -> None:
|
||||||
"""Handle quit coming from a server
|
"""Handle quit coming from a server
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
server_msg (list[str]): Original server message
|
serverMsg (list[str]): Original server message
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def on_squit(self, server_msg: list[str]) -> None:
|
def on_squit(self, serverMsg: list[str]) -> None:
|
||||||
"""Handle squit coming from a server
|
"""Handle squit coming from a server
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
server_msg (list[str]): Original server message
|
serverMsg (list[str]): Original server message
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def on_protoctl(self, server_msg: list[str]) -> None:
|
def on_protoctl(self, serverMsg: list[str]) -> None:
|
||||||
"""Handle protoctl coming from a server
|
"""Handle protoctl coming from a server
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
server_msg (list[str]): Original server message
|
serverMsg (list[str]): Original server message
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def on_nick(self, server_msg: list[str]) -> None:
|
def on_nick(self, serverMsg: list[str]) -> None:
|
||||||
"""Handle nick coming from a server
|
"""Handle nick coming from a server
|
||||||
new nickname
|
new nickname
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
server_msg (list[str]): Original server message
|
serverMsg (list[str]): Original server message
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def on_sjoin(self, server_msg: list[str]) -> None:
|
def on_sjoin(self, serverMsg: list[str]) -> None:
|
||||||
"""Handle sjoin coming from a server
|
"""Handle sjoin coming from a server
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
server_msg (list[str]): Original server message
|
serverMsg (list[str]): Original server message
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def on_part(self, server_msg: list[str]) -> None:
|
def on_part(self, serverMsg: list[str]) -> None:
|
||||||
"""Handle part coming from a server
|
"""Handle part coming from a server
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
server_msg (list[str]): Original server message
|
serverMsg (list[str]): Original server message
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def on_eos(self, server_msg: list[str]) -> None:
|
def on_eos(self, serverMsg: list[str]) -> None:
|
||||||
"""Handle EOS coming from a server
|
"""Handle EOS coming from a server
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
server_msg (list[str]): Original server message
|
serverMsg (list[str]): Original server message
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def on_reputation(self, server_msg: list[str]) -> None:
|
def on_reputation(self, serverMsg: list[str]) -> None:
|
||||||
"""Handle REPUTATION coming from a server
|
"""Handle REPUTATION coming from a server
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
server_msg (list[str]): Original server message
|
serverMsg (list[str]): Original server message
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def on_uid(self, server_msg: list[str]) -> None:
|
def on_uid(self, serverMsg: list[str]) -> None:
|
||||||
"""Handle uid message coming from the server
|
"""Handle uid message coming from the server
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
server_msg (list[str]): Original server message
|
serverMsg (list[str]): Original server message
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def on_privmsg(self, server_msg: list[str]) -> None:
|
def on_privmsg(self, serverMsg: list[str]) -> None:
|
||||||
"""Handle PRIVMSG message coming from the server
|
"""Handle PRIVMSG message coming from the server
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
server_msg (list[str]): Original server message
|
serverMsg (list[str]): Original server message
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def on_server_ping(self, server_msg: list[str]) -> None:
|
def on_server_ping(self, serverMsg: list[str]) -> None:
|
||||||
"""Send a PONG message to the server
|
"""Send a PONG message to the server
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
server_msg (list[str]): List of str coming from the server
|
serverMsg (list[str]): List of str coming from the server
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def on_server(self, server_msg: list[str]) -> None:
|
def on_server(self, serverMsg: list[str]) -> None:
|
||||||
"""_summary_
|
"""_summary_
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
server_msg (list[str]): _description_
|
serverMsg (list[str]): _description_
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def on_version(self, server_msg: list[str]) -> None:
|
def on_version(self, serverMsg: list[str]) -> None:
|
||||||
"""Sending Server Version to the server
|
"""Sending Server Version to the server
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
server_msg (list[str]): List of str coming from the server
|
serverMsg (list[str]): List of str coming from the server
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def on_time(self, server_msg: list[str]) -> None:
|
def on_time(self, serverMsg: list[str]) -> None:
|
||||||
"""Sending TIME answer to a requestor
|
"""Sending TIME answer to a requestor
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
server_msg (list[str]): List of str coming from the server
|
serverMsg (list[str]): List of str coming from the server
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def on_ping(self, server_msg: list[str]) -> None:
|
def on_ping(self, serverMsg: list[str]) -> None:
|
||||||
"""Sending a PING answer to requestor
|
"""Sending a PING answer to requestor
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
server_msg (list[str]): List of str coming from the server
|
serverMsg (list[str]): List of str coming from the server
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def on_version_msg(self, server_msg: list[str]) -> None:
|
def on_version_msg(self, serverMsg: list[str]) -> None:
|
||||||
"""Handle version coming from the server
|
"""Handle version coming from the server
|
||||||
\n ex. /version Defender
|
\n ex. /version Defender
|
||||||
Args:
|
Args:
|
||||||
server_msg (list[str]): Original message from the server
|
serverMsg (list[str]): Original message from the server
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def on_smod(self, server_msg: list[str]) -> None:
|
def on_smod(self, serverMsg: list[str]) -> None:
|
||||||
"""Handle SMOD message coming from the server
|
"""Handle SMOD message coming from the server
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
server_msg (list[str]): Original server message
|
serverMsg (list[str]): Original server message
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def on_sasl(self, server_msg: list[str]) -> Optional['MSasl']:
|
def on_sasl(self, serverMsg: list[str]) -> Optional['MSasl']:
|
||||||
"""Handle SASL coming from a server
|
"""Handle SASL coming from a server
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
server_msg (list[str]): Original server message
|
serverMsg (list[str]): Original server message
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
|
|
||||||
@@ -530,9 +530,18 @@ class IProtocol(ABC):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def on_md(self, server_msg: list[str]) -> None:
|
def on_md(self, serverMsg: list[str]) -> None:
|
||||||
"""Handle MD responses
|
"""Handle MD responses
|
||||||
[':001', 'MD', 'client', '001MYIZ03', 'certfp', ':d1235648...']
|
[':001', 'MD', 'client', '001MYIZ03', 'certfp', ':d1235648...']
|
||||||
Args:
|
Args:
|
||||||
server_msg (list[str]): The server reply
|
serverMsg (list[str]): The server reply
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def on_kick(self, serverMsg: list[str]) -> None:
|
||||||
|
"""When a user is kicked out from a channel
|
||||||
|
|
||||||
|
Eg. ['@unrealircd.org...', ':001', 'KICK', '#jsonrpc', '001ELW13T', ':Kicked', 'from', 'JSONRPC', 'User']
|
||||||
|
Args:
|
||||||
|
serverMsg (list[str]): The server message
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -76,6 +76,7 @@ class Unrealircd6(IProtocol):
|
|||||||
self.Handler.register(m(command_name="SASL", func=self.on_sasl))
|
self.Handler.register(m(command_name="SASL", func=self.on_sasl))
|
||||||
self.Handler.register(m(command_name="MD", func=self.on_md))
|
self.Handler.register(m(command_name="MD", func=self.on_md))
|
||||||
self.Handler.register(m(command_name="PRIVMSG", func=self.on_privmsg))
|
self.Handler.register(m(command_name="PRIVMSG", func=self.on_privmsg))
|
||||||
|
self.Handler.register(m(command_name="KICK", func=self.on_kick))
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@@ -699,7 +700,9 @@ class Unrealircd6(IProtocol):
|
|||||||
serverMsg (list[str]): The server message to parse
|
serverMsg (list[str]): The server message to parse
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
dict[str, str]: The response as dictionary.
|
dict: The response as dictionary.
|
||||||
|
|
||||||
|
>>> {"uid": "", "newnickname": "", "timestamp": ""}
|
||||||
"""
|
"""
|
||||||
scopy = serverMsg.copy()
|
scopy = serverMsg.copy()
|
||||||
if scopy[0].startswith('@'):
|
if scopy[0].startswith('@'):
|
||||||
@@ -1611,4 +1614,19 @@ class Unrealircd6(IProtocol):
|
|||||||
|
|
||||||
...
|
...
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.__Logs.error(f"General Error: {e}")
|
self.__Logs.error(f"General Error: {e}")
|
||||||
|
|
||||||
|
def on_kick(self, serverMsg: list[str]) -> None:
|
||||||
|
"""When a user is kicked out from a channel
|
||||||
|
|
||||||
|
['@unrealircd.org/issued-by=RPC:admin-for-test@...', ':001', 'KICK', '#jsonrpc', '001ELW13T', ':Kicked', 'from', 'JSONRPC', 'User']
|
||||||
|
Args:
|
||||||
|
serverMsg (list[str]): The server message
|
||||||
|
"""
|
||||||
|
scopy = serverMsg.copy()
|
||||||
|
uid = scopy[4]
|
||||||
|
channel = scopy[3]
|
||||||
|
|
||||||
|
# Delete the user from the channel.
|
||||||
|
self.__Irc.Channel.delete_user_from_channel(channel, uid)
|
||||||
|
return None
|
||||||
@@ -16,9 +16,6 @@ try:
|
|||||||
from core.loader import Loader
|
from core.loader import Loader
|
||||||
loader = Loader()
|
loader = Loader()
|
||||||
loader.Irc.init_irc()
|
loader.Irc.init_irc()
|
||||||
# from core.irc import Irc
|
|
||||||
# ircInstance = Irc(Loader())
|
|
||||||
# ircInstance.init_irc(ircInstance)
|
|
||||||
|
|
||||||
except AssertionError as ae:
|
except AssertionError as ae:
|
||||||
print(f'Assertion Error -> {ae}')
|
print(f'Assertion Error -> {ae}')
|
||||||
|
|||||||
@@ -217,7 +217,7 @@ class Defender:
|
|||||||
|
|
||||||
if response is not None:
|
if response is not None:
|
||||||
q_insert = "INSERT INTO def_trusted (datetime, user, host, vhost) VALUES (?, ?, ?, ?)"
|
q_insert = "INSERT INTO def_trusted (datetime, user, host, vhost) VALUES (?, ?, ?, ?)"
|
||||||
mes_donnees = {'datetime': self.Base.get_datetime(), 'user': nickname, 'host': '*', 'vhost': '*'}
|
mes_donnees = {'datetime': self.Loader.Utils.get_datetime(), 'user': nickname, 'host': '*', 'vhost': '*'}
|
||||||
exec_query = self.Base.db_execute_query(q_insert, mes_donnees)
|
exec_query = self.Base.db_execute_query(q_insert, mes_donnees)
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|||||||
@@ -383,14 +383,11 @@ def action_apply_reputation_santions(uplink: 'Defender') -> None:
|
|||||||
color_red = gconfig.COLORS.red
|
color_red = gconfig.COLORS.red
|
||||||
nogc = gconfig.COLORS.nogc
|
nogc = gconfig.COLORS.nogc
|
||||||
salon_jail = gconfig.SALON_JAIL
|
salon_jail = gconfig.SALON_JAIL
|
||||||
|
|
||||||
if reputation_flag == 0:
|
|
||||||
return None
|
|
||||||
elif reputation_timer == 0:
|
|
||||||
return None
|
|
||||||
|
|
||||||
uid_to_clean = []
|
uid_to_clean = []
|
||||||
|
|
||||||
|
if reputation_flag == 0 or reputation_timer == 0:
|
||||||
|
return None
|
||||||
|
|
||||||
for user in irc.Reputation.UID_REPUTATION_DB:
|
for user in irc.Reputation.UID_REPUTATION_DB:
|
||||||
if not user.isWebirc: # Si il ne vient pas de WebIRC
|
if not user.isWebirc: # Si il ne vient pas de WebIRC
|
||||||
if irc.User.get_user_uptime_in_minutes(user.uid) >= reputation_timer and int(user.score_connexion) <= int(reputation_seuil):
|
if irc.User.get_user_uptime_in_minutes(user.uid) >= reputation_timer and int(user.score_connexion) <= int(reputation_seuil):
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
{
|
{
|
||||||
"version": "6.3.0",
|
"version": "6.3.1",
|
||||||
|
|
||||||
"requests": "2.32.3",
|
"requests": "2.32.3",
|
||||||
"psutil": "6.0.0",
|
"psutil": "6.0.0",
|
||||||
|
|||||||
Reference in New Issue
Block a user