mirror of
https://github.com/iio612/DEFENDER.git
synced 2026-02-13 19:24:23 +00:00
V6.0.2
This commit is contained in:
@@ -306,7 +306,7 @@ class Clone():
|
||||
except Exception as err:
|
||||
self.Logs.error(f'General Error: {err}')
|
||||
|
||||
def _hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
|
||||
def hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
|
||||
|
||||
try:
|
||||
command = str(cmd[0]).lower()
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
from typing import Union, TYPE_CHECKING
|
||||
from dataclasses import dataclass
|
||||
|
||||
from core.classes import user
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.irc import Irc
|
||||
from core.definition import MUser
|
||||
from sqlalchemy import CursorResult, Row, Sequence
|
||||
|
||||
class Command:
|
||||
|
||||
@@ -246,7 +246,7 @@ class Command:
|
||||
return None
|
||||
|
||||
user_uid = self.User.clean_uid(cmd[5])
|
||||
userObj = self.User.get_User(user_uid)
|
||||
userObj: MUser = self.User.get_User(user_uid)
|
||||
channel_name = cmd[4] if self.Channel.Is_Channel(cmd[4]) else None
|
||||
|
||||
if userObj is None:
|
||||
@@ -255,7 +255,7 @@ class Command:
|
||||
if 'r' not in userObj.umodes:
|
||||
return None
|
||||
|
||||
db_data = {"nickname": userObj.nickname, "channel": channel_name}
|
||||
db_data: dict[str, str] = {"nickname": userObj.nickname, "channel": channel_name}
|
||||
db_query = self.Base.db_execute_query("SELECT id, mode FROM command_automode WHERE nickname = :nickname AND channel = :channel", db_data)
|
||||
db_result = db_query.fetchone()
|
||||
if db_result is not None:
|
||||
@@ -264,11 +264,10 @@ class Command:
|
||||
except KeyError as ke:
|
||||
self.Logs.error(f"Key Error: {err}")
|
||||
|
||||
|
||||
except Exception as err:
|
||||
self.Logs.error(f"General Error: {err}")
|
||||
|
||||
def _hcmds(self, uidornickname: str, channel_name: Union[str, None], cmd: list, fullcmd: list = []) -> None:
|
||||
def hcmds(self, uidornickname: str, channel_name: Union[str, None], cmd: list, fullcmd: list = []) -> None:
|
||||
|
||||
command = str(cmd[0]).lower()
|
||||
dnickname = self.Config.SERVICE_NICKNAME
|
||||
@@ -280,65 +279,95 @@ class Command:
|
||||
|
||||
match command:
|
||||
case "automode":
|
||||
# automode nickname +/-mode #channel
|
||||
# automode adator +o #channel
|
||||
# automode set nickname [+/-mode] #channel
|
||||
# automode set adator +o #channel
|
||||
try:
|
||||
option: str = str(cmd[1]).lower()
|
||||
match option:
|
||||
case 'set':
|
||||
if len(cmd) < 5:
|
||||
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} [nickname] [+/-mode] [#channel]")
|
||||
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"AutoModes available: {' / '.join(allowed_modes)}")
|
||||
return None
|
||||
|
||||
allowed_modes = ['q','a','o','h','v']
|
||||
userObj = self.User.get_User(str(cmd[1]))
|
||||
allowed_modes: list[str] = self.Base.Settings.PROTOCTL_PREFIX # ['q','a','o','h','v']
|
||||
# userObj: MUser = self.User.get_User(str(cmd[2]))
|
||||
nickname = str(cmd[2])
|
||||
mode = str(cmd[3])
|
||||
chan: str = str(cmd[4]).lower() if self.Channel.Is_Channel(cmd[4]) else None
|
||||
sign = mode[0] if mode.startswith( ('+', '-')) else None
|
||||
clean_mode = mode[1:] if len(mode) > 0 else None
|
||||
|
||||
if len(cmd) != 4:
|
||||
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} [nickname] [+/-mode] [#channel]")
|
||||
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"AutoModes available: {' / '.join(allowed_modes)}")
|
||||
return None
|
||||
if sign is None:
|
||||
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="You must provide the flag mode + or -")
|
||||
return None
|
||||
|
||||
mode = str(cmd[2])
|
||||
flag_found: bool = False
|
||||
if mode.startswith( ('+', '-') ):
|
||||
flag_found = True
|
||||
if clean_mode not in allowed_modes:
|
||||
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You should use one of those modes {' / '.join(allowed_modes)}")
|
||||
return None
|
||||
|
||||
if not flag_found:
|
||||
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="You must provide the flag mode + or -")
|
||||
return None
|
||||
if chan is None:
|
||||
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You should use one of those modes {' / '.join(allowed_modes)}")
|
||||
return None
|
||||
|
||||
mode = str(cmd[2])
|
||||
chan = cmd[3] if self.Channel.Is_Channel(cmd[3]) else None
|
||||
db_data: dict[str, str] = {"nickname": nickname, "channel": chan}
|
||||
db_query = self.Base.db_execute_query(query="SELECT id FROM command_automode WHERE nickname = :nickname and channel = :channel", params=db_data)
|
||||
db_result = db_query.fetchone()
|
||||
|
||||
if mode.lstrip('+-') not in allowed_modes:
|
||||
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You should use one of those modes {' / '.join(allowed_modes)}")
|
||||
return None
|
||||
if db_result is not None:
|
||||
if sign == '+':
|
||||
db_data = {"updated_on": self.Base.get_datetime(), "nickname": nickname, "channel": chan, "mode": mode}
|
||||
db_result = self.Base.db_execute_query(query="UPDATE command_automode SET mode = :mode, updated_on = :updated_on WHERE nickname = :nickname and channel = :channel",
|
||||
params=db_data)
|
||||
if db_result.rowcount > 0:
|
||||
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Automode {mode} edited for {nickname} in {chan}")
|
||||
elif sign == '-':
|
||||
db_data = {"nickname": nickname, "channel": chan, "mode": f"+{clean_mode}"}
|
||||
db_result = self.Base.db_execute_query(query="DELETE FROM command_automode WHERE nickname = :nickname and channel = :channel and mode = :mode",
|
||||
params=db_data)
|
||||
if db_result.rowcount > 0:
|
||||
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Automode {mode} deleted for {nickname} in {chan}")
|
||||
else:
|
||||
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"The mode [{mode}] has not been found for {nickname} in channel {chan}")
|
||||
|
||||
if chan is None:
|
||||
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You should use one of those modes {' / '.join(allowed_modes)}")
|
||||
return None
|
||||
return None
|
||||
|
||||
db_data: dict[str, str] = {"nickname": userObj.nickname, "channel": chan}
|
||||
db_query = self.Base.db_execute_query(query="SELECT id FROM command_automode WHERE nickname = :nickname and channel = :channel", params=db_data)
|
||||
db_result = db_query.fetchone()
|
||||
# Instert a new automode
|
||||
if sign == '+':
|
||||
db_data = {"created_on": self.Base.get_datetime(), "updated_on": self.Base.get_datetime(), "nickname": nickname, "channel": chan, "mode": mode}
|
||||
db_query = self.Base.db_execute_query(
|
||||
query="INSERT INTO command_automode (created_on, updated_on, nickname, channel, mode) VALUES (:created_on, :updated_on, :nickname, :channel, :mode)",
|
||||
params=db_data
|
||||
)
|
||||
|
||||
if db_result is not None:
|
||||
db_data = {"updated_on": self.Base.get_datetime(), "nickname": userObj.nickname, "channel": chan, "mode": mode}
|
||||
db_result = self.Base.db_execute_query(query="UPDATE command_automode SET mode = :mode, updated_on = :updated_on WHERE nickname = :nickname and channel = :channel",
|
||||
params=db_data)
|
||||
if db_result.rowcount > 0:
|
||||
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Automode {mode} edited for {userObj.nickname} in {chan}")
|
||||
if db_query.rowcount > 0:
|
||||
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Automode {mode} applied to {nickname} in {chan}")
|
||||
self.Protocol.send2socket(f":{service_id} MODE {chan} {mode} {nickname}")
|
||||
else:
|
||||
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"AUTOMODE {mode} cannot be added to {nickname} in {chan} because it doesn't exist")
|
||||
|
||||
return None
|
||||
case 'list':
|
||||
db_query: CursorResult = self.Base.db_execute_query("SELECT nickname, channel, mode FROM command_automode")
|
||||
db_results: Sequence[Row] = db_query.fetchall()
|
||||
|
||||
# Instert a new automode
|
||||
db_data = {"created_on": self.Base.get_datetime(), "updated_on": self.Base.get_datetime(), "nickname": userObj.nickname, "channel": chan, "mode": mode}
|
||||
db_query = self.Base.db_execute_query(
|
||||
query="INSERT INTO command_automode (created_on, updated_on, nickname, channel, mode) VALUES (:created_on, :updated_on, :nickname, :channel, :mode)",
|
||||
params=db_data
|
||||
)
|
||||
if not db_results:
|
||||
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,
|
||||
msg="There is no automode to display.")
|
||||
|
||||
if db_query.rowcount > 0:
|
||||
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Automode {mode} applied to {userObj.nickname} in {chan}")
|
||||
self.Protocol.send2socket(f":{service_id} MODE {chan} {mode} {userObj.nickname}")
|
||||
for db_result in db_results:
|
||||
db_nickname, db_channel, db_mode = db_result
|
||||
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,
|
||||
msg=f"Nickname: {db_nickname} | Channel: {db_channel} | Mode: {db_mode}")
|
||||
|
||||
except KeyError:
|
||||
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} [nickname] [+/-mode] [#channel]")
|
||||
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"AutoModes available: {' / '.join(allowed_modes)}")
|
||||
case _:
|
||||
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} SET [nickname] [+/-mode] [#channel]")
|
||||
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} LIST")
|
||||
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"[AUTOMODES AVAILABLE] are {' / '.join(allowed_modes)}")
|
||||
|
||||
except IndexError:
|
||||
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} SET [nickname] [+/-mode] [#channel]")
|
||||
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} LIST")
|
||||
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"[AUTOMODES AVAILABLE] are {' / '.join(self.Base.Settings.PROTOCTL_PREFIX)}")
|
||||
except Exception as err:
|
||||
self.Logs.error(f"General Error: {err}")
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ import core.definition as df
|
||||
# 4 . Créer vos tables, en utilisant toujours le nom des votre classe en minuscule ==> defender_votre-table
|
||||
# 3. Methode suivantes:
|
||||
# cmd(self, data:list)
|
||||
# _hcmds(self, user:str, cmd: list)
|
||||
# hcmds(self, user:str, cmd: list)
|
||||
# unload(self)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -990,7 +990,7 @@ class Defender():
|
||||
match cmd[1]:
|
||||
|
||||
case 'REPUTATION':
|
||||
# :001 REPUTATION 91.168.141.239 118
|
||||
# :001 REPUTATION 8.8.8.8 118
|
||||
try:
|
||||
self.reputation_first_connexion['ip'] = cmd[2]
|
||||
self.reputation_first_connexion['score'] = cmd[3]
|
||||
@@ -1183,7 +1183,7 @@ class Defender():
|
||||
except Exception as err:
|
||||
self.Logs.error(f"General Error: {err}")
|
||||
|
||||
def _hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
|
||||
def hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
|
||||
|
||||
command = str(cmd[0]).lower()
|
||||
fromuser = user
|
||||
@@ -1213,12 +1213,12 @@ class Defender():
|
||||
case 'show_reputation':
|
||||
|
||||
if not self.Reputation.UID_REPUTATION_DB:
|
||||
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=" No one is suspected")
|
||||
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="No one is suspected")
|
||||
|
||||
for suspect in self.Reputation.UID_REPUTATION_DB:
|
||||
self.Protocol.send_notice(nick_from=dnickname,
|
||||
nick_to=fromuser,
|
||||
msg=f" Uid: {suspect.uid} | Nickname: {suspect.nickname} | Reputation: {suspect.score_connexion} | Secret code: {suspect.secret_code} | Connected on: {suspect.connected_datetime}")
|
||||
msg=f" Uid: {suspect.uid} | Nickname: {suspect.nickname} | Reputation: {suspect.score_connexion} | Secret code: {suspect.secret_code} | Connected on: {suspect.connexion_datetime}")
|
||||
|
||||
case 'code':
|
||||
try:
|
||||
|
||||
@@ -208,7 +208,7 @@ class Jsonrpc():
|
||||
|
||||
return None
|
||||
|
||||
def _hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
|
||||
def hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
|
||||
|
||||
command = str(cmd[0]).lower()
|
||||
dnickname = self.Config.SERVICE_NICKNAME
|
||||
|
||||
@@ -147,7 +147,7 @@ class Test():
|
||||
except Exception as err:
|
||||
self.Logs.error(f"General Error: {err}")
|
||||
|
||||
def _hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
|
||||
def hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
|
||||
|
||||
command = str(cmd[0]).lower()
|
||||
dnickname = self.Config.SERVICE_NICKNAME
|
||||
|
||||
@@ -274,7 +274,7 @@ class Votekick():
|
||||
except Exception as err:
|
||||
self.Logs.error(f"General Error: {err}")
|
||||
|
||||
def _hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
|
||||
def hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
|
||||
# cmd is the command starting from the user command
|
||||
# full cmd is sending the entire server response
|
||||
|
||||
|
||||
Reference in New Issue
Block a user