This commit is contained in:
adator
2024-11-11 23:38:05 +01:00
parent bd9713006a
commit 44da01945c
7 changed files with 290 additions and 192 deletions

View File

@@ -1,10 +1,12 @@
from typing import TYPE_CHECKING
from dataclasses import dataclass, fields
from typing import Union, TYPE_CHECKING
from dataclasses import dataclass
from core.classes import user
if TYPE_CHECKING:
from core.irc import Irc
class Command():
class Command:
@dataclass
class ModConfModel:
@@ -20,6 +22,9 @@ class Command():
# Add Irc Object to the module (Mandatory)
self.Irc = ircInstance
# Add Loader Object to the module (Mandatory)
self.Loader = ircInstance.Loader
# Add Protocol object to the module (Mandatory)
self.Protocol = ircInstance.Protocol
@@ -40,14 +45,14 @@ class Command():
# Create module commands (Mandatory)
self.commands_level = {
1: ['join', 'part','owner', 'deowner', 'protect', 'deprotect', 'op',
1: ['join', 'part','owner', 'deowner', 'protect', 'deprotect', 'op',
'deop', 'halfop', 'dehalfop', 'voice','devoice', 'topic'],
2: ['opall', 'deopall', 'devoiceall', 'voiceall', 'ban',
'unban','kick', 'kickban', 'umode', 'mode', 'get_mode', 'svsjoin', 'svspart', 'svsnick',
2: ['opall', 'deopall', 'devoiceall', 'voiceall', 'ban', 'automode',
'unban', 'kick', 'kickban', 'umode',
'mode', 'get_mode', 'svsjoin', 'svspart', 'svsnick',
'wallops', 'globops','gnotice','whois', 'names', 'invite', 'inviteme',
'sajoin', 'sapart',
'kill', 'gline', 'ungline', 'kline', 'unkline', 'shun', 'unshun',
'glinelist', 'shunlist', 'klinelist'],
'sajoin', 'sapart', 'kill', 'gline', 'ungline', 'kline',
'unkline', 'shun', 'unshun', 'glinelist', 'shunlist', 'klinelist'],
3: ['map']
}
@@ -74,7 +79,7 @@ class Command():
return None
def __set_commands(self, commands:dict[int, list[str]]) -> None:
def __set_commands(self, commands: dict[int, list[str]]) -> None:
"""### Rajoute les commandes du module au programme principal
Args:
@@ -82,7 +87,7 @@ class Command():
"""
for level, com in commands.items():
for c in commands[level]:
if not c in self.Irc.commands:
if c not in self.Irc.commands:
self.Irc.commands_level[level].append(c)
self.Irc.commands.append(c)
@@ -98,14 +103,17 @@ class Command():
None: Aucun retour n'es attendu
"""
table_logs = '''CREATE TABLE IF NOT EXISTS test_logs (
table_automode = '''CREATE TABLE IF NOT EXISTS command_automode (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
server_msg TEXT
created_on TEXT,
updated_on TEXT,
nickname TEXT,
channel TEXT,
mode TEXT
)
'''
# self.Base.db_execute_query(table_logs)
self.Base.db_execute_query(table_automode)
return None
def __load_module_configuration(self) -> None:
@@ -136,107 +144,203 @@ class Command():
return None
def cmd(self, data: list) -> None:
def cmd(self, data: list[str]) -> None:
try:
# service_id = self.Config.SERVICE_ID
dnickname = self.Config.SERVICE_NICKNAME
# dchanlog = self.Config.SERVICE_CHANLOG
red = self.Config.COLORS.red
green = self.Config.COLORS.green
bold = self.Config.COLORS.bold
nogc = self.Config.COLORS.nogc
cmd = list(data).copy()
service_id = self.Config.SERVICE_ID
dnickname = self.Config.SERVICE_NICKNAME
dchanlog = self.Config.SERVICE_CHANLOG
red = self.Config.COLORS.red
green = self.Config.COLORS.green
bold = self.Config.COLORS.bold
nogc = self.Config.COLORS.nogc
cmd = list(data).copy()
if len(cmd) < 2:
return None
if len(cmd) < 2:
return None
match cmd[1]:
# [':irc.deb.biz.st', '403', 'Dev-PyDefender', '#Z', ':No', 'such', 'channel']
case '403' | '401':
try:
message = ' '.join(cmd[3:])
self.Protocol.send_notice(
nick_from=dnickname,
nick_to=self.user_to_notice,
msg=f"[{red}ERROR MSG{nogc}] {message}"
)
self.Logs.error(f"{cmd[1]} - {message}")
except KeyError as ke:
self.Logs.error(ke)
except Exception as err:
self.Logs.warning(f'Unknown Error: {str(err)}')
match cmd[1]:
# [':irc.deb.biz.st', '403', 'Dev-PyDefender', '#Z', ':No', 'such', 'channel']
case '403' | '401':
try:
message = ' '.join(cmd[3:])
self.Protocol.send_notice(
nick_from=dnickname,
nick_to=self.user_to_notice,
msg=f"[{red}ERROR MSG{nogc}] {message}"
)
self.Logs.error(f"{cmd[1]} - {message}")
except KeyError as ke:
self.Logs.error(ke)
except Exception as err:
self.Logs.warning(f'Unknown Error: {str(err)}')
case '006' | '018':
try:
# [':irc.deb.biz.st', '006', 'Dev-PyDefender', ':`-services.deb.biz.st', '------', '|', 'Users:', '9', '(47.37%)', '[00B]']
# [':irc.deb.biz.st', '018', 'Dev-PyDefender', ':4', 'servers', 'and', '19', 'users,', 'average', '4.75', 'users', 'per', 'server']
message = ' '.join(cmd[3:])
self.Protocol.send_notice(
nick_from=dnickname,
nick_to=self.user_to_notice,
msg=f"[{green}SERVER MSG{nogc}] {message}"
)
except KeyError as ke:
self.Logs.error(ke)
except Exception as err:
self.Logs.warning(f'Unknown Error: {str(err)}')
case '006' | '018':
try:
# [':irc.deb.biz.st', '006', 'Dev-PyDefender', ':`-services.deb.biz.st', '------', '|', 'Users:', '9', '(47.37%)', '[00B]']
# [':irc.deb.biz.st', '018', 'Dev-PyDefender', ':4', 'servers', 'and', '19', 'users,', 'average', '4.75', 'users', 'per', 'server']
message = ' '.join(cmd[3:])
self.Protocol.send_notice(
nick_from=dnickname,
nick_to=self.user_to_notice,
msg=f"[{green}SERVER MSG{nogc}] {message}"
)
except KeyError as ke:
self.Logs.error(ke)
except Exception as err:
self.Logs.warning(f'Unknown Error: {str(err)}')
case '219':
try:
# [':irc.deb.biz.st', '219', 'Dev-PyDefender', 's', ':End', 'of', '/STATS', 'report']
if not self.show_219:
# If there is a result in 223 then stop here
self.show_219 = True
return None
case '219':
try:
# [':irc.deb.biz.st', '219', 'Dev-PyDefender', 's', ':End', 'of', '/STATS', 'report']
if not self.show_219:
# If there is a result in 223 then stop here
self.show_219 = True
return None
type_of_stats = str(cmd[3])
type_of_stats = str(cmd[3])
match type_of_stats:
case 's':
self.Protocol.send_notice(nick_from=dnickname,nick_to=self.user_to_notice, msg="No shun")
case 'G':
self.Protocol.send_notice(nick_from=dnickname,nick_to=self.user_to_notice, msg="No gline")
case 'k':
self.Protocol.send_notice(nick_from=dnickname,nick_to=self.user_to_notice, msg="No kline")
match type_of_stats:
case 's':
self.Protocol.send_notice(nick_from=dnickname,nick_to=self.user_to_notice, msg="No shun")
case 'G':
self.Protocol.send_notice(nick_from=dnickname,nick_to=self.user_to_notice, msg="No gline")
case 'k':
self.Protocol.send_notice(nick_from=dnickname,nick_to=self.user_to_notice, msg="No kline")
except KeyError as ke:
self.Logs.error(ke)
except Exception as err:
self.Logs.warning(f'Unknown Error: {str(err)}')
except KeyError as ke:
self.Logs.error(ke)
except Exception as err:
self.Logs.warning(f'Unknown Error: {str(err)}')
case '223':
try:
# [':irc.deb.biz.st', '223', 'Dev-PyDefender', 'G', '*@162.142.125.217', '67624', '18776', 'irc.deb.biz.st', ':Proxy/Drone', 'detected.', 'Check', 'https://dronebl.org/lookup?ip=162.142.125.217', 'for', 'details.']
self.show_219 = False
host = str(cmd[4])
author = str(cmd[7])
reason = ' '.join(cmd[8:])
case '223':
try:
# [':irc.deb.biz.st', '223', 'Dev-PyDefender', 'G', '*@162.142.125.217', '67624', '18776', 'irc.deb.biz.st', ':Proxy/Drone', 'detected.', 'Check', 'https://dronebl.org/lookup?ip=162.142.125.217', 'for', 'details.']
self.show_219 = False
host = str(cmd[4])
author = str(cmd[7])
reason = ' '.join(cmd[8:])
self.Protocol.send_notice(nick_from=dnickname,nick_to=self.user_to_notice,
msg=f"{bold}Author{nogc}: {author} - {bold}Host{nogc}: {host} - {bold}Reason{nogc}: {reason}"
)
self.Protocol.send_notice(nick_from=dnickname,nick_to=self.user_to_notice,
msg=f"{bold}Author{nogc}: {author} - {bold}Host{nogc}: {host} - {bold}Reason{nogc}: {reason}"
)
except KeyError as ke:
self.Logs.error(ke)
except Exception as err:
self.Logs.warning(f'Unknown Error: {str(err)}')
except KeyError as ke:
self.Logs.error(ke)
except Exception as err:
self.Logs.warning(f'Unknown Error: {str(err)}')
case _:
pass
case _:
pass
if len(cmd) < 3:
return None
return None
match cmd[2]:
def _hcmds(self, user: str, channel: any, cmd: list, fullcmd: list = []) -> None:
case 'SJOIN':
# ['@msgid=yldTlbwAGbzCGUcCIHi3ku;time=2024-11-11T17:56:24.297Z', ':001', 'SJOIN', '1728815963', '#znc', ':001LQ0L0C']
# Check if the user has an automode
try:
if len(cmd) < 6:
return None
user_uid = self.User.clean_uid(cmd[5])
userObj = self.User.get_User(user_uid)
channel_name = cmd[4] if self.Channel.Is_Channel(cmd[4]) else None
if userObj is None:
return None
if 'r' not in userObj.umodes:
return None
db_data = {"nickname": userObj.nickname, "channel": channel_name}
db_query = self.Base.db_execute_query("SELECT id, mode FROM command_automode WHERE nickname = :nickname AND channel = :channel", db_data)
db_result = db_query.fetchone()
if db_result is not None:
id, mode = db_result
self.Protocol.send2socket(f":{self.Config.SERVICE_ID} MODE {channel_name} {mode} {userObj.nickname}")
except KeyError as ke:
self.Logs.error(f"Key Error: {err}")
except Exception as err:
self.Logs.error(f"General Error: {err}")
def _hcmds(self, uidornickname: str, channel_name: Union[str, None], cmd: list, fullcmd: list = []) -> None:
command = str(cmd[0]).lower()
dnickname = self.Config.SERVICE_NICKNAME
service_id = self.Config.SERVICE_ID
dchanlog = self.Config.SERVICE_CHANLOG
self.user_to_notice = user
fromuser = user
fromchannel = channel
self.user_to_notice = uidornickname
fromuser = uidornickname
fromchannel = channel_name
match command:
case "automode":
# automode nickname +/-mode #channel
# automode adator +o #channel
try:
allowed_modes = ['q','a','o','h','v']
userObj = self.User.get_User(str(cmd[1]))
if len(cmd) != 4:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} [nickname] [+/-mode] [#channel]")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"AutoModes available: {' / '.join(allowed_modes)}")
return None
mode = str(cmd[2])
flag_found: bool = False
if mode.startswith( ('+', '-') ):
flag_found = True
if not flag_found:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="You must provide the flag mode + or -")
return None
mode = str(cmd[2])
chan = cmd[3] if self.Channel.Is_Channel(cmd[3]) else None
if mode.lstrip('+-') not in allowed_modes:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You should use one of those modes {' / '.join(allowed_modes)}")
return None
if chan is None:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You should use one of those modes {' / '.join(allowed_modes)}")
return None
db_data: dict[str, str] = {"nickname": userObj.nickname, "channel": chan}
db_query = self.Base.db_execute_query(query="SELECT id FROM command_automode WHERE nickname = :nickname and channel = :channel", params=db_data)
db_result = db_query.fetchone()
if db_result is not None:
db_data = {"updated_on": self.Base.get_datetime(), "nickname": userObj.nickname, "channel": chan, "mode": mode}
db_result = self.Base.db_execute_query(query="UPDATE command_automode SET mode = :mode, updated_on = :updated_on WHERE nickname = :nickname and channel = :channel",
params=db_data)
if db_result.rowcount > 0:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Automode {mode} edited for {userObj.nickname} in {chan}")
return None
# Instert a new automode
db_data = {"created_on": self.Base.get_datetime(), "updated_on": self.Base.get_datetime(), "nickname": userObj.nickname, "channel": chan, "mode": mode}
db_query = self.Base.db_execute_query(
query="INSERT INTO command_automode (created_on, updated_on, nickname, channel, mode) VALUES (:created_on, :updated_on, :nickname, :channel, :mode)",
params=db_data
)
if db_query.rowcount > 0:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Automode {mode} applied to {userObj.nickname} in {chan}")
self.Protocol.send2socket(f":{service_id} MODE {chan} {mode} {userObj.nickname}")
except KeyError:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} [nickname] [+/-mode] [#channel]")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"AutoModes available: {' / '.join(allowed_modes)}")
except Exception as err:
self.Logs.error(f"General Error: {err}")
case 'deopall':
try:
@@ -679,7 +783,7 @@ class Command():
self.Protocol.send_part_chan(uidornickname=dnickname, channel=sent_channel)
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" {dnickname} LEFT {sent_channel}")
self.Channel.db_query_channel('del', self.module_name, sent_channel)
except IndexError as ie:
@@ -696,7 +800,7 @@ class Command():
chan = str(cmd[1])
if not self.Channel.Is_Channel(chan):
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"The channel must start with #")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="The channel must start with #")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} TOPIC #channel THE_TOPIC_MESSAGE")
return None
@@ -705,7 +809,7 @@ class Command():
if topic_msg:
self.Protocol.send2socket(f':{dnickname} TOPIC {chan} :{topic_msg}')
else:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You need to specify the topic")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="You need to specify the topic")
except KeyError as ke:
self.Logs.error(ke)
@@ -723,7 +827,7 @@ class Command():
if wallops_msg:
self.Protocol.send2socket(f':{dnickname} WALLOPS {wallops_msg} ({dnickname})')
else:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You need to specify the wallops message")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="You need to specify the wallops message")
except KeyError as ke:
self.Logs.error(ke)
@@ -741,7 +845,7 @@ class Command():
if globops_msg:
self.Protocol.send2socket(f':{dnickname} GLOBOPS {globops_msg} ({dnickname})')
else:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You need to specify the globops message")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="You need to specify the globops message")
except KeyError as ke:
self.Logs.error(ke)
@@ -759,7 +863,7 @@ class Command():
if gnotice_msg:
self.Protocol.send_notice(nick_from=dnickname, nick_to='$*.*', msg=f"[{self.Config.COLORS.red}GLOBAL NOTICE{self.Config.COLORS.nogc}] {gnotice_msg}")
else:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You need to specify the global notice message")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="You need to specify the global notice message")
except KeyError as ke:
self.Logs.error(ke)
@@ -776,7 +880,7 @@ class Command():
nickname = str(cmd[1])
if self.User.get_nickname(nickname) is None:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Nickname not found !")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="Nickname not found !")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {str(cmd[0]).upper()} NICKNAME")
return None
@@ -796,7 +900,7 @@ class Command():
chan = str(cmd[1])
if not self.Channel.Is_Channel(chan):
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"The channel must start with #")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="The channel must start with #")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {str(cmd[0]).upper()} #channel")
return None
@@ -817,12 +921,12 @@ class Command():
chan = str(cmd[2])
if not self.Channel.Is_Channel(chan):
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"The channel must start with #")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="The channel must start with #")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {str(cmd[0]).upper()} NICKNAME #CHANNEL")
return None
if self.User.get_nickname(nickname) is None:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Nickname not found !")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="Nickname not found !")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {str(cmd[0]).upper()} NICKNAME #CHANNEL")
return None
@@ -906,8 +1010,10 @@ class Command():
self.Logs.warning(f'Unknown Error: {str(err)}')
case 'get_mode':
self.Protocol.send2socket(f'MODE {channel}')
try:
self.Protocol.send2socket(f'MODE {fromchannel}')
except Exception as err:
self.Logs.error(f"General Error {err}")
case 'svsjoin':
try:
@@ -927,7 +1033,7 @@ class Command():
case 'svspart':
try:
# .svspart nickname #channel
# svspart nickname #channel
nickname = str(cmd[1])
channel = str(cmd[2])
if len(cmd) != 3:
@@ -1191,4 +1297,4 @@ class Command():
self.Logs.warning(f'Unknown Error: {str(err)}')
case _:
pass
pass