7 Commits

Author SHA1 Message Date
adator
cb042a5411 V6.1.0 update the help command 2024-12-08 12:52:36 +01:00
adator
a3edf48120 Update on_version method 2024-11-22 23:26:39 +01:00
adator
f7664c9874 V6.0.4 2024-11-18 23:49:45 +01:00
adator
d37c152160 Update to version 6.0.3 2024-11-17 21:09:14 +01:00
adator
b81f502b95 V6.0.2 2024-11-15 22:14:11 +01:00
adator
44da01945c V6.0.1 2024-11-11 23:38:05 +01:00
adator
bd9713006a Update the version 6 2024-11-11 15:38:38 +01:00
19 changed files with 1920 additions and 1151 deletions

View File

@@ -194,14 +194,19 @@ class Base:
file_hanlder = logging.FileHandler(f'logs{self.Config.OS_SEP}defender.log',encoding='UTF-8')
file_hanlder.setLevel(self.Config.DEBUG_LEVEL)
stdout_handler = logging.StreamHandler()
stdout_handler.setLevel(50)
# Define log format
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(filename)s - %(lineno)d - %(funcName)s - %(message)s')
# Apply log format
file_hanlder.setFormatter(formatter)
stdout_handler.setFormatter(formatter)
# Add handler to logs
self.logs.addHandler(file_hanlder)
self.logs.addHandler(stdout_handler)
# Apply the filter
self.logs.addFilter(self.replace_filter)
@@ -656,10 +661,25 @@ class Base:
)
'''
table_core_client = f'''CREATE TABLE IF NOT EXISTS {self.Config.TABLE_CLIENT} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
createdOn TEXT,
account TEXT,
nickname TEXT,
hostname TEXT,
vhost TEXT,
realname TEXT,
email TEXT,
password TEXT,
level INTEGER
)
'''
self.db_execute_query(table_core_log)
self.db_execute_query(table_core_log_command)
self.db_execute_query(table_core_module)
self.db_execute_query(table_core_admin)
self.db_execute_query(table_core_client)
self.db_execute_query(table_core_channel)
self.db_execute_query(table_core_config)
@@ -721,6 +741,23 @@ class Base:
except TypeError:
return value
def convert_to_int(self, value: any) -> Union[int, None]:
"""Convert a value to int
Args:
value (any): Value to convert to int if possible
Returns:
Union[int, None]: Return the int value or None if not possible
"""
try:
response = int(value)
return response
except ValueError:
return None
except TypeError:
return None
def is_valid_ip(self, ip_to_control:str) -> bool:
try:
@@ -785,6 +822,27 @@ class Base:
# Vider le dictionnaire de fonction
self.periodic_func.clear()
def execute_dynamic_method(self, obj: object, method_name: str, params: list) -> None:
"""#### Ajouter les méthodes a éxecuter dans un dictionnaire
Les methodes seront exécuter par heartbeat.
Args:
obj (object): Une instance de la classe qui va etre executer
method_name (str): Le nom de la méthode a executer
params (list): les parametres a faire passer
Returns:
None: aucun retour attendu
"""
self.periodic_func[len(self.periodic_func) + 1] = {
'object': obj,
'method_name': method_name,
'param': params
}
self.logs.debug(f'Method to execute : {str(self.periodic_func)}')
return None
def clean_uid(self, uid:str) -> Union[str, None]:
"""Clean UID by removing @ / % / + / ~ / * / :

View File

@@ -2,6 +2,8 @@ from re import findall
from typing import Union, Literal, TYPE_CHECKING
from dataclasses import asdict
from core.classes import user
if TYPE_CHECKING:
from core.definition import MChannel
from core.base import Base
@@ -31,6 +33,10 @@ class Channel:
result = False
exist = False
if not self.Is_Channel(newChan.name):
self.Logs.error(f"The channel {newChan.name} is not valid, channel must start with #")
return False
for record in self.UID_CHANNEL_DB:
if record.name.lower() == newChan.name.lower():
# If the channel exist, update the user list and do not go further
@@ -129,6 +135,29 @@ class Channel:
except Exception as err:
self.Logs.error(f'{err}')
def is_user_present_in_channel(self, channel_name: str, uid: str) -> bool:
"""Check if a user is present in the channel
Args:
channel_name (str): The channel to check
uid (str): The UID
Returns:
bool: True if the user is present in the channel
"""
user_found = False
chan = self.get_Channel(channel_name=channel_name)
if chan is None:
return user_found
clean_uid = self.Base.clean_uid(uid=uid)
for chan_uid in chan.uids:
if self.Base.clean_uid(chan_uid) == clean_uid:
user_found = True
break
return user_found
def clean_channel(self) -> None:
"""Remove Channels if empty
"""

243
core/classes/client.py Normal file
View File

@@ -0,0 +1,243 @@
from re import sub
from typing import Union, TYPE_CHECKING
from dataclasses import asdict
if TYPE_CHECKING:
from core.base import Base
from core.definition import MClient
class Client:
CLIENT_DB: list['MClient'] = []
def __init__(self, baseObj: 'Base') -> None:
self.Logs = baseObj.logs
self.Base = baseObj
return None
def insert(self, newUser: 'MClient') -> bool:
"""Insert a new User object
Args:
newUser (UserModel): New userModel object
Returns:
bool: True if inserted
"""
userObj = self.get_Client(newUser.uid)
if not userObj is None:
# User already created return False
return False
self.CLIENT_DB.append(newUser)
return True
def update(self, uid: str, newNickname: str) -> bool:
"""Update the nickname starting from the UID
Args:
uid (str): UID of the user
newNickname (str): New nickname
Returns:
bool: True if updated
"""
userObj = self.get_Client(uidornickname=uid)
if userObj is None:
return False
userObj.nickname = newNickname
return True
def update_mode(self, uidornickname: str, modes: str) -> bool:
"""Updating user mode
Args:
uidornickname (str): The UID or Nickname of the user
modes (str): new modes to update
Returns:
bool: True if user mode has been updaed
"""
response = True
userObj = self.get_Client(uidornickname=uidornickname)
if userObj is None:
return False
action = modes[0]
new_modes = modes[1:]
existing_umodes = userObj.umodes
umodes = userObj.umodes
if action == '+':
for nm in new_modes:
if nm not in existing_umodes:
umodes += nm
elif action == '-':
for nm in new_modes:
if nm in existing_umodes:
umodes = umodes.replace(nm, '')
else:
return False
liste_umodes = list(umodes)
final_umodes_liste = [x for x in self.Base.Settings.PROTOCTL_USER_MODES if x in liste_umodes]
final_umodes = ''.join(final_umodes_liste)
userObj.umodes = f"+{final_umodes}"
return response
def delete(self, uid: str) -> bool:
"""Delete the User starting from the UID
Args:
uid (str): UID of the user
Returns:
bool: True if deleted
"""
userObj = self.get_Client(uidornickname=uid)
if userObj is None:
return False
self.CLIENT_DB.remove(userObj)
return True
def get_Client(self, uidornickname: str) -> Union['MClient', None]:
"""Get The Client Object model
Args:
uidornickname (str): UID or Nickname
Returns:
UserModel|None: The UserModel Object | None
"""
User = None
for record in self.CLIENT_DB:
if record.uid == uidornickname:
User = record
elif record.nickname == uidornickname:
User = record
return User
def get_uid(self, uidornickname:str) -> Union[str, None]:
"""Get the UID of the user starting from the UID or the Nickname
Args:
uidornickname (str): UID or Nickname
Returns:
str|None: Return the UID
"""
userObj = self.get_Client(uidornickname=uidornickname)
if userObj is None:
return None
return userObj.uid
def get_nickname(self, uidornickname:str) -> Union[str, None]:
"""Get the Nickname starting from UID or the nickname
Args:
uidornickname (str): UID or Nickname of the user
Returns:
str|None: the nickname
"""
userObj = self.get_Client(uidornickname=uidornickname)
if userObj is None:
return None
return userObj.nickname
def get_Client_AsDict(self, uidornickname: str) -> Union[dict[str, any], None]:
"""Transform User Object to a dictionary
Args:
uidornickname (str): The UID or The nickname
Returns:
Union[dict[str, any], None]: User Object as a dictionary or None
"""
userObj = self.get_Client(uidornickname=uidornickname)
if userObj is None:
return None
return asdict(userObj)
def is_exist(self, uidornikname: str) -> bool:
"""Check if the UID or the nickname exist in the USER DB
Args:
uidornickname (str): The UID or the NICKNAME
Returns:
bool: True if exist
"""
userObj = self.get_Client(uidornickname=uidornikname)
if userObj is None:
return False
return True
def db_is_account_exist(self, account: str) -> bool:
"""Check if the account exist in the database
Args:
account (str): The account to check
Returns:
bool: True if exist
"""
table_client = self.Base.Config.TABLE_CLIENT
account_to_check = {'account': account.lower()}
account_to_check_query = self.Base.db_execute_query(f"""
SELECT id FROM {table_client} WHERE LOWER(account) = :account
""", account_to_check)
account_to_check_result = account_to_check_query.fetchone()
if account_to_check_result:
self.Logs.error(f"Account ({account}) already exist")
return True
return False
def clean_uid(self, uid: str) -> Union[str, None]:
"""Clean UID by removing @ / % / + / ~ / * / :
Args:
uid (str): The UID to clean
Returns:
str: Clean UID without any sign
"""
pattern = fr'[:|@|%|\+|~|\*]*'
parsed_UID = sub(pattern, '', uid)
if not parsed_UID:
return None
return parsed_UID

View File

@@ -46,7 +46,7 @@ class Inspircd:
except AttributeError as ae:
self.__Base.logs.critical(f"Attribute Error: {ae}")
def sendPrivMsg(self, nick_from: str, msg: str, channel: str = None, nick_to: str = None):
def send_priv_msg(self, nick_from: str, msg: str, channel: str = None, nick_to: str = None):
"""Sending PRIVMSG to a channel or to a nickname by batches
could be either channel or nickname not both together
Args:
@@ -76,7 +76,7 @@ class Inspircd:
except Exception as err:
self.__Base.logs.error(f"General Error: {err}")
def sendNotice(self, nick_from: str, nick_to: str, msg: str) -> None:
def send_notice(self, nick_from: str, nick_to: str, msg: str) -> None:
"""Sending NOTICE by batches
Args:
@@ -179,7 +179,7 @@ class Inspircd:
self.__Irc.Channel.insert(self.__Irc.Loader.Definition.MChannel(name=channel, uids=[self.__Config.SERVICE_ID]))
return None
def sendQuit(self, uid: str, reason: str, print_log: True) -> None:
def send_quit(self, uid: str, reason: str, print_log: True) -> None:
"""Send quit message
Args:
@@ -205,7 +205,7 @@ class Inspircd:
return None
def sendUID(self, nickname:str, username: str, hostname: str, uid:str, umodes: str, vhost: str, remote_ip: str, realname: str, print_log: bool = True) -> None:
def send_uid(self, nickname:str, username: str, hostname: str, uid:str, umodes: str, vhost: str, remote_ip: str, realname: str, print_log: bool = True) -> None:
"""Send UID to the server
Args:
@@ -243,7 +243,7 @@ class Inspircd:
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
def sendChanJoin(self, uidornickname: str, channel: str, password: str = None, print_log: bool = True) -> None:
def send_join_chan(self, uidornickname: str, channel: str, password: str = None, print_log: bool = True) -> None:
"""Joining a channel
Args:
@@ -269,7 +269,7 @@ class Inspircd:
self.__Irc.Channel.insert(self.__Irc.Loader.Definition.MChannel(name=channel, uids=[userObj.uid]))
return None
def sendChanPart(self, uidornickname:str, channel: str, print_log: bool = True) -> None:
def send_part_chan(self, uidornickname:str, channel: str, print_log: bool = True) -> None:
"""Part from a channel
Args:
@@ -633,7 +633,7 @@ class Inspircd:
ping_response = current_unixtime - recieved_unixtime
# self.__Irc.send2socket(f':{dnickname} NOTICE {nickname} :\x01PING {ping_response} secs\x01')
self.sendNotice(
self.send_notice(
nick_from=dnickname,
nick_to=nickname,
msg=f"\x01PING {ping_response} secs\x01"

View File

@@ -1,8 +1,7 @@
from re import match, findall
from re import match, findall, search
from datetime import datetime
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Union
from ssl import SSLEOFError, SSLError
from dataclasses import dataclass
if TYPE_CHECKING:
from core.irc import Irc
@@ -17,6 +16,12 @@ class Unrealircd6:
self.__Base = ircInstance.Base
self.__Settings = ircInstance.Base.Settings
self.known_protocol = ['SJOIN', 'UID', 'MD', 'QUIT', 'SQUIT',
'EOS', 'PRIVMSG', 'MODE', 'UMODE2',
'VERSION', 'REPUTATION', 'SVS2MODE',
'SLOG', 'NICK', 'PART', 'PONG'
]
self.__Base.logs.info(f"** Loading protocol [{__name__}]")
def send2socket(self, message: str, print_log: bool = True) -> None:
@@ -48,7 +53,7 @@ class Unrealircd6:
except AttributeError as ae:
self.__Base.logs.critical(f"Attribute Error: {ae}")
def sendPrivMsg(self, nick_from: str, msg: str, channel: str = None, nick_to: str = None):
def send_priv_msg(self, nick_from: str, msg: str, channel: str = None, nick_to: str = None):
"""Sending PRIVMSG to a channel or to a nickname by batches
could be either channel or nickname not both together
Args:
@@ -80,7 +85,7 @@ class Unrealircd6:
self.__Base.logs.error(f"General Error: {err}")
self.__Base.logs.error(f"General Error: {nick_from} - {channel} - {nick_to}")
def sendNotice(self, nick_from: str, nick_to: str, msg: str) -> None:
def send_notice(self, nick_from: str, nick_to: str, msg: str) -> None:
"""Sending NOTICE by batches
Args:
@@ -104,6 +109,40 @@ class Unrealircd6:
except Exception as err:
self.__Base.logs.error(f"General Error: {err}")
def parse_server_msg(self, server_msg: list[str]) -> Union[str, None]:
"""Parse the server message and return the command
Args:
server_msg (list[str]): The Original server message >>
Returns:
Union[str, None]: Return the command protocol name
"""
protocol_exception = ['PING', 'SERVER', 'PROTOCTL']
increment = 0
server_msg_copy = server_msg.copy()
first_index = 0
second_index = 0
for index, element in enumerate(server_msg_copy):
# Handle the protocol exceptions ex. ping, server ....
if element in protocol_exception and index == 0:
return element
if element.startswith(':'):
increment += 1
first_index = index + 1 if increment == 1 else first_index
second_index = index if increment == 2 else second_index
second_index = len(server_msg_copy) if second_index == 0 else second_index
parsed_msg = server_msg_copy[first_index:second_index]
for cmd in parsed_msg:
if cmd in self.known_protocol:
return cmd
return None
def link(self):
"""Créer le link et envoyer les informations nécessaires pour la
connexion au serveur.
@@ -206,7 +245,7 @@ class Unrealircd6:
self.__Irc.Channel.insert(self.__Irc.Loader.Definition.MChannel(name=channel, uids=[self.__Config.SERVICE_ID]))
return None
def sendSapart(self, nick_to_sapart: str, channel_name: str) -> None:
def send_sapart(self, nick_to_sapart: str, channel_name: str) -> None:
"""_summary_
Args:
@@ -231,7 +270,7 @@ class Unrealircd6:
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
def sendSajoin(self, nick_to_sajoin: str, channel_name: str) -> None:
def send_sajoin(self, nick_to_sajoin: str, channel_name: str) -> None:
"""_summary_
Args:
@@ -268,7 +307,7 @@ class Unrealircd6:
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
def sendSvsmode(self, nickname: str, user_mode: str) -> None:
def send_svs_mode(self, nickname: str, user_mode: str) -> None:
try:
userObj = self.__Irc.User.get_User(uidornickname=nickname)
@@ -287,7 +326,7 @@ class Unrealircd6:
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
def sendQuit(self, uid: str, reason: str, print_log: True) -> None:
def send_quit(self, uid: str, reason: str, print_log: True) -> None:
"""Send quit message
- Delete uid from User object
- Delete uid from Clone object
@@ -316,7 +355,7 @@ class Unrealircd6:
return None
def sendUID(self, nickname:str, username: str, hostname: str, uid:str, umodes: str, vhost: str, remote_ip: str, realname: str, print_log: bool = True) -> None:
def send_uid(self, nickname:str, username: str, hostname: str, uid:str, umodes: str, vhost: str, remote_ip: str, realname: str, print_log: bool = True) -> None:
"""Send UID to the server
- Insert User to User Object
Args:
@@ -354,7 +393,7 @@ class Unrealircd6:
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
def sendChanJoin(self, uidornickname: str, channel: str, password: str = None, print_log: bool = True) -> None:
def send_join_chan(self, uidornickname: str, channel: str, password: str = None, print_log: bool = True) -> None:
"""Joining a channel
Args:
@@ -381,9 +420,21 @@ class Unrealircd6:
# Add defender to the channel uids list
self.__Irc.Channel.insert(self.__Irc.Loader.Definition.MChannel(name=channel, uids=[userObj.uid]))
# Set the automode to the user
if 'r' not in userObj.umodes and 'o' not in userObj.umodes:
return None
def sendChanPart(self, uidornickname:str, channel: str, print_log: bool = True) -> None:
db_data: dict[str, str] = {"nickname": userObj.nickname, "channel": channel}
db_query = self.__Base.db_execute_query("SELECT id, mode FROM command_automode WHERE nickname = :nickname AND channel = :channel", db_data)
db_result = db_query.fetchone()
if db_result is not None:
id, mode = db_result
self.send2socket(f":{self.__Config.SERVICE_ID} MODE {channel} {mode} {userObj.nickname}")
return None
def send_part_chan(self, uidornickname:str, channel: str, print_log: bool = True) -> None:
"""Part from a channel
Args:
@@ -408,6 +459,53 @@ class Unrealircd6:
self.__Irc.Channel.delete_user_from_channel(channel, userObj.uid)
return None
def send_raw(self, raw_command: str) -> None:
self.send2socket(f":{self.__Config.SERVICE_NICKNAME} {raw_command}")
return None
#####################
# HANDLE EVENTS #
#####################
def on_svs2mode(self, serverMsg: list[str]) -> None:
"""Handle svs2mode coming from a server
Args:
serverMsg (list[str]): Original server message
"""
try:
# >> [':00BAAAAAG', 'SVS2MODE', '001U01R03', '-r']
uid_user_to_edit = serverMsg[2]
umode = serverMsg[3]
userObj = self.__Irc.User.get_User(uid_user_to_edit)
if userObj is None:
return None
if self.__Irc.User.update_mode(userObj.uid, umode):
return None
return None
except IndexError as ie:
self.__Base.logs.error(f"{__name__} - Index Error: {ie}")
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
def on_mode(self, serverMsg: list[str]) -> None:
"""Handle mode coming from a server
Args:
serverMsg (list[str]): Original server message
"""
#['@msgid=d0ySx56Yd0nc35oHts2SkC-/J9mVUA1hfM6...', ':001', 'MODE', '#a', '+nt', '1723207536']
#['@unrealircd.org/userhost=adator@localhost;...', ':001LQ0L0C', 'MODE', '#services', '-l']
return None
def on_umode2(self, serverMsg: list[str]) -> None:
"""Handle umode2 coming from a server
@@ -451,6 +549,7 @@ class Unrealircd6:
self.__Irc.Channel.delete_user_from_all_channel(uid_who_quit)
self.__Irc.User.delete(uid_who_quit)
self.__Irc.Client.delete(uid_who_quit)
self.__Irc.Reputation.delete(uid_who_quit)
self.__Irc.Clone.delete(uid_who_quit)
@@ -490,12 +589,32 @@ class Unrealircd6:
serverMsg (list[str]): Original server message
"""
# ['PROTOCTL', 'CHANMODES=beI,fkL,lFH,cdimnprstzCDGKMNOPQRSTVZ', 'USERMODES=diopqrstwxzBDGHIRSTWZ', 'BOOTED=1728815798', 'PREFIX=(qaohv)~&@%+', 'SID=001', 'MLOCK', 'TS=1730662755', 'EXTSWHOIS']
if len(serverMsg) > 5:
if '=' in serverMsg[5]:
serveur_hosting_id = str(serverMsg[5]).split('=')
self.__Config.HSID = serveur_hosting_id[1]
if 'USERMODES=' in serverMsg[2]:
self.__Settings.USER_MODES = list(serverMsg[2].split('=')[1])
user_modes: str = None
prefix: str = None
host_server_id: str = None
for msg in serverMsg:
pattern = None
if msg.startswith('PREFIX='):
pattern = r'^PREFIX=\((.*)\).*$'
find_match = match(pattern, msg)
prefix = find_match.group(1) if find_match else None
if find_match:
prefix = find_match.group(1)
elif msg.startswith('USERMODES='):
pattern = r'^USERMODES=(.*)$'
find_match = match(pattern, msg)
user_modes = find_match.group(1) if find_match else None
elif msg.startswith('SID='):
host_server_id = msg.split('=')[1]
if user_modes is None or prefix is None or host_server_id is None:
return None
self.__Config.HSID = host_server_id
self.__Settings.PROTOCTL_USER_MODES = list(user_modes)
self.__Settings.PROTOCTL_PREFIX = list(prefix)
return None
@@ -513,6 +632,7 @@ class Unrealircd6:
uid = str(serverMsg[1]).lstrip(':')
newnickname = serverMsg[3]
self.__Irc.User.update(uid, newnickname)
self.__Irc.Client.update(uid, newnickname)
return None
@@ -534,8 +654,11 @@ class Unrealircd6:
# ':001T6VU3F', '001JGWB2K', '@11ZAAAAAB',
# '001F16WGR', '001X9YMGQ', '*+001DYPFGP', '@00BAAAAAJ', '001AAGOG9', '001FMFVG8', '001DAEEG7',
# '&~G:unknown-users', '"~G:websocket-users', '"~G:known-users', '"~G:webirc-users']
# [':00B', 'SJOIN', '1731872579', '#services', '+', ':00BAAAAAB']
serverMsg_copy = serverMsg.copy()
if serverMsg_copy[0].startswith('@'):
serverMsg_copy.pop(0)
channel = str(serverMsg_copy[3]).lower()
len_cmd = len(serverMsg_copy)
list_users:list = []
@@ -592,6 +715,105 @@ class Unrealircd6:
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
def on_eos(self, serverMsg: list[str]) -> None:
"""Handle EOS coming from a server
Args:
serverMsg (list[str]): Original server message
"""
try:
# [':001', 'EOS']
server_msg_copy = serverMsg.copy()
hsid = str(server_msg_copy[0]).replace(':','')
if hsid == self.__Config.HSID:
if self.__Config.DEFENDER_INIT == 1:
current_version = self.__Config.CURRENT_VERSION
latest_version = self.__Config.LATEST_VERSION
if self.__Base.check_for_new_version(False):
version = f'{current_version} >>> {latest_version}'
else:
version = f'{current_version}'
print(f"################### DEFENDER ###################")
print(f"# SERVICE CONNECTE ")
print(f"# SERVEUR : {self.__Config.SERVEUR_IP} ")
print(f"# PORT : {self.__Config.SERVEUR_PORT} ")
print(f"# SSL : {self.__Config.SERVEUR_SSL} ")
print(f"# SSL VER : {self.__Config.SSL_VERSION} ")
print(f"# NICKNAME : {self.__Config.SERVICE_NICKNAME} ")
print(f"# CHANNEL : {self.__Config.SERVICE_CHANLOG} ")
print(f"# VERSION : {version} ")
print(f"################################################")
self.__Base.logs.info(f"################### DEFENDER ###################")
self.__Base.logs.info(f"# SERVICE CONNECTE ")
self.__Base.logs.info(f"# SERVEUR : {self.__Config.SERVEUR_IP} ")
self.__Base.logs.info(f"# PORT : {self.__Config.SERVEUR_PORT} ")
self.__Base.logs.info(f"# SSL : {self.__Config.SERVEUR_SSL} ")
self.__Base.logs.info(f"# SSL VER : {self.__Config.SSL_VERSION} ")
self.__Base.logs.info(f"# NICKNAME : {self.__Config.SERVICE_NICKNAME} ")
self.__Base.logs.info(f"# CHANNEL : {self.__Config.SERVICE_CHANLOG} ")
self.__Base.logs.info(f"# VERSION : {version} ")
self.__Base.logs.info(f"################################################")
if self.__Base.check_for_new_version(False):
self.send_priv_msg(
nick_from=self.__Config.SERVICE_NICKNAME,
msg=f" New Version available {version}",
channel=self.__Config.SERVICE_CHANLOG
)
# Initialisation terminé aprés le premier PING
self.send_priv_msg(
nick_from=self.__Config.SERVICE_NICKNAME,
msg=f"[{self.__Config.COLORS.green}INFORMATION{self.__Config.COLORS.nogc}] >> Defender is ready",
channel=self.__Config.SERVICE_CHANLOG
)
self.__Config.DEFENDER_INIT = 0
# Send EOF to other modules
for classe_name, classe_object in self.__Irc.loaded_classes.items():
classe_object.cmd(server_msg_copy)
return None
except IndexError as ie:
self.__Base.logs.error(f"{__name__} - Key Error: {ie}")
except KeyError as ke:
self.__Base.logs.error(f"{__name__} - Key Error: {ke}")
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
def on_reputation(self, serverMsg: list[str]) -> None:
"""Handle REPUTATION coming from a server
Args:
serverMsg (list[str]): Original server message
"""
try:
# :001 REPUTATION 127.0.0.1 118
server_msg_copy = serverMsg.copy()
self.__Irc.first_connexion_ip = server_msg_copy[2]
self.__Irc.first_score = 0
if str(server_msg_copy[3]).find('*') != -1:
# If * available, it means that an ircop changed the repurtation score
# means also that the user exist will try to update all users with same IP
self.__Irc.first_score = int(str(server_msg_copy[3]).replace('*',''))
for user in self.__Irc.User.UID_DB:
if user.remote_ip == self.__Irc.first_connexion_ip:
user.score_connexion = self.__Irc.first_score
else:
self.__Irc.first_score = int(server_msg_copy[3])
# Possibilité de déclancher les bans a ce niveau.
except IndexError as ie:
self.Logs.error(f'Index Error {__name__}: {ie}')
except ValueError as ve:
self.__Irc.first_score = 0
self.Logs.error(f'Value Error {__name__}: {ve}')
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
def on_uid(self, serverMsg: list[str]) -> None:
"""Handle uid message coming from the server
@@ -630,7 +852,7 @@ class Unrealircd6:
else:
geoip = None
score_connexion = 0
score_connexion = self.__Irc.first_score
self.__Irc.User.insert(
self.__Irc.Loader.Definition.MUser(
@@ -655,6 +877,103 @@ class Unrealircd6:
except Exception as err:
self.__Base.logs.error(f"{__name__} - General Error: {err}")
def on_privmsg(self, serverMsg: list[str]) -> None:
"""Handle PRIVMSG message coming from the server
Args:
serverMsg (list[str]): Original server message
"""
try:
srv_msg = serverMsg.copy()
cmd = serverMsg.copy()
# Supprimer la premiere valeur si MTAGS activé
if cmd[0].startswith('@'):
cmd.pop(0)
# Hide auth logs
if len(cmd) == 7:
if cmd[2] == 'PRIVMSG' and cmd[4] == ':auth':
data_copy = cmd.copy()
data_copy[6] = '**********'
self.__Base.logs.debug(f">> {data_copy}")
else:
self.__Base.logs.debug(f">> {cmd}")
else:
self.__Base.logs.debug(f">> {cmd}")
get_uid_or_nickname = str(cmd[0].replace(':',''))
user_trigger = self.__Irc.User.get_nickname(get_uid_or_nickname)
dnickname = self.__Config.SERVICE_NICKNAME
pattern = fr'(:\{self.__Config.SERVICE_PREFIX})(.*)$'
hcmds = search(pattern, ' '.join(cmd)) # va matcher avec tout les caractéres aprés le .
if hcmds: # Commande qui commencent par le point
liste_des_commandes = list(hcmds.groups())
convert_to_string = ' '.join(liste_des_commandes)
arg = convert_to_string.split()
arg.remove(f':{self.__Config.SERVICE_PREFIX}')
if not arg[0].lower() in self.__Irc.module_commands_list:
self.__Base.logs.debug(f"This command {arg[0]} is not available")
self.send_notice(
nick_from=self.__Config.SERVICE_NICKNAME,
nick_to=user_trigger,
msg=f"This command [{self.__Config.COLORS.bold}{arg[0]}{self.__Config.COLORS.bold}] is not available"
)
return None
cmd_to_send = convert_to_string.replace(':','')
self.__Base.log_cmd(user_trigger, cmd_to_send)
fromchannel = str(cmd[2]).lower() if self.__Irc.Channel.Is_Channel(cmd[2]) else None
self.__Irc.hcmds(user_trigger, fromchannel, arg, cmd)
if cmd[2] == self.__Config.SERVICE_ID:
pattern = fr'^:.*?:(.*)$'
hcmds = search(pattern, ' '.join(cmd))
if hcmds: # par /msg defender [commande]
liste_des_commandes = list(hcmds.groups())
convert_to_string = ' '.join(liste_des_commandes)
arg = convert_to_string.split()
# Réponse a un CTCP VERSION
if arg[0] == '\x01VERSION\x01':
self.on_version(srv_msg)
return False
# Réponse a un TIME
if arg[0] == '\x01TIME\x01':
self.on_time(srv_msg)
return False
# Réponse a un PING
if arg[0] == '\x01PING':
self.on_ping(srv_msg)
return False
if not arg[0].lower() in self.__Irc.module_commands_list:
self.__Base.logs.debug(f"This command {arg[0]} sent by {user_trigger} is not available")
return False
cmd_to_send = convert_to_string.replace(':','')
self.__Base.log_cmd(user_trigger, cmd_to_send)
fromchannel = None
if len(arg) >= 2:
fromchannel = str(arg[1]).lower() if self.__Irc.Channel.Is_Channel(arg[1]) else None
self.__Irc.hcmds(user_trigger, fromchannel, arg, cmd)
return None
except KeyError as ke:
self.__Base.logs.error(f"Key Error: {ke}")
except AttributeError as ae:
self.__Base.logs.error(f"Attribute Error: {ae}")
except Exception as err:
self.__Base.logs.error(f"General Error: {err} - {srv_msg}")
def on_server_ping(self, serverMsg: list[str]) -> None:
"""Send a PONG message to the server
@@ -662,7 +981,7 @@ class Unrealircd6:
serverMsg (list[str]): List of str coming from the server
"""
try:
#
pong = str(serverMsg[1]).replace(':','')
self.send2socket(f"PONG :{pong}", print_log=False)
@@ -742,7 +1061,7 @@ class Unrealircd6:
ping_response = current_unixtime - recieved_unixtime
# self.__Irc.send2socket(f':{dnickname} NOTICE {nickname} :\x01PING {ping_response} secs\x01')
self.sendNotice(
self.send_notice(
nick_from=dnickname,
nick_to=nickname,
msg=f"\x01PING {ping_response} secs\x01"
@@ -754,7 +1073,7 @@ class Unrealircd6:
def on_version_msg(self, serverMsg: list[str]) -> None:
"""Handle version coming from the server
\n ex. /version Defender
Args:
serverMsg (list[str]): Original message from the server
"""
@@ -776,7 +1095,7 @@ class Unrealircd6:
response_005 = ' | '.join(modules)
self.send2socket(f':{self.__Config.SERVICE_HOST} 005 {getUser.nickname} {response_005} are supported by this server')
response_005 = ''.join(self.__Settings.USER_MODES)
response_005 = ''.join(self.__Settings.PROTOCTL_USER_MODES)
self.send2socket(f":{self.__Config.SERVICE_HOST} 005 {getUser.nickname} {response_005} are supported by this server")
return None

View File

@@ -11,4 +11,5 @@ class Settings:
CONSOLE: bool = False
USER_MODES: list[str] = []
PROTOCTL_USER_MODES: list[str] = []
PROTOCTL_PREFIX: list[str] = []

View File

@@ -92,7 +92,7 @@ class User:
return False
liste_umodes = list(umodes)
final_umodes_liste = [x for x in self.Base.Settings.USER_MODES if x in liste_umodes]
final_umodes_liste = [x for x in self.Base.Settings.PROTOCTL_USER_MODES if x in liste_umodes]
final_umodes = ''.join(final_umodes_liste)
userObj.umodes = f"+{final_umodes}"

View File

@@ -3,6 +3,24 @@ from dataclasses import dataclass, field
from typing import Literal
from os import sep
@dataclass
class MClient:
"""Model Client for registred nickname"""
uid: str = None
account: str = None
nickname: str = None
username: str = None
realname: str = None
hostname: str = None
umodes: str = None
vhost: str = None
isWebirc: bool = False
isWebsocket: bool = False
remote_ip: str = None
score_connexion: int = 0
geoip: str = None
connexion_datetime: datetime = field(default=datetime.now())
@dataclass
class MUser:
"""Model User"""
@@ -83,6 +101,7 @@ class ColorModel:
yellow: str = "\x0306"
bold: str = "\x02"
nogc: str = "\x03"
underline: str = "\x1F"
@dataclass
class MConfig:
@@ -214,8 +233,11 @@ class MConfig:
LOGGING_NAME: str = "defender"
"""The name of the Logging instance"""
TABLE_CLIENT: str = "core_client"
"""Core Client table"""
TABLE_ADMIN: str = "core_admin"
"""Admin table"""
"""Core Admin table"""
TABLE_COMMAND: str = "core_command"
"""Core command table"""

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,4 @@
from core.classes import user, admin, channel, clone, reputation, settings
from core.classes import user, admin, client, channel, clone, reputation, settings
import core.definition as df
import core.base as baseModule
import core.classes.config as confModule
@@ -23,6 +23,8 @@ class Loader:
self.User: user.User = user.User(self.Base)
self.Client: client.Client = client.Client(self.Base)
self.Admin: admin.Admin = admin.Admin(self.Base)
self.Channel: channel.Channel = channel.Channel(self.Base)

74
core/utils.py Normal file
View File

@@ -0,0 +1,74 @@
from typing import Literal, Union
from datetime import datetime
from time import time
from random import choice
from hashlib import md5, sha3_512
def convert_to_int(value: any) -> Union[int, None]:
"""Convert a value to int
Args:
value (any): Value to convert to int if possible
Returns:
Union[int, None]: Return the int value or None if not possible
"""
try:
value_to_int = int(value)
return value_to_int
except ValueError:
return None
except Exception:
return None
def get_unixtime() -> int:
"""Cette fonction retourne un UNIXTIME de type 12365456
Returns:
int: Current time in seconds since the Epoch (int)
"""
return int(time())
def get_datetime() -> str:
"""Retourne une date au format string (24-12-2023 20:50:59)
Returns:
str: Current datetime in this format %d-%m-%Y %H:%M:%S
"""
currentdate = datetime.now().strftime('%d-%m-%Y %H:%M:%S')
return currentdate
def generate_random_string(lenght: int) -> str:
"""Retourn une chaîne aléatoire en fonction de la longueur spécifiée.
Returns:
str: The random string
"""
caracteres = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
randomize = ''.join(choice(caracteres) for _ in range(lenght))
return randomize
def hash(password: str, algorithm: Literal["md5, sha3_512"] = 'md5') -> str:
"""Retourne un mot de passe chiffré en fonction de l'algorithme utilisé
Args:
password (str): Le password en clair
algorithm (str): L'algorithm a utilisé
Returns:
str: Le password haché
"""
match algorithm:
case 'md5':
password = md5(password.encode()).hexdigest()
return password
case 'sha3_512':
password = sha3_512(password.encode()).hexdigest()
return password
case _:
password = md5(password.encode()).hexdigest()
return password

View File

@@ -1,30 +1,21 @@
from core import installation
#############################################
# @Version : 1 #
# @Version : 6 #
# Requierements : #
# Python3.10 or higher #
# SQLAlchemy, requests, psutil #
# UnrealIRCD 6.2.2 or higher #
#############################################
#########################
# LANCEMENT DE DEFENDER #
#########################
# 1. Chargement de la configuration
# 2. Chargement de l'ensemble des classes
# 3.
#
try:
installation.Install()
from core.loader import Loader
from core.irc import Irc
loader = Loader()
ircInstance = Irc(loader)
# loader = Loader()
ircInstance = Irc(Loader())
ircInstance.init_irc(ircInstance)
except AssertionError as ae:

View File

@@ -1,7 +1,5 @@
from dataclasses import dataclass, fields, field
import copy
from dataclasses import dataclass
import random, faker, time, logging
from datetime import datetime
from typing import TYPE_CHECKING
if TYPE_CHECKING:
@@ -45,9 +43,7 @@ class Clone():
self.Definition = ircInstance.Loader.Definition
# Créer les nouvelles commandes du module
self.commands_level = {
1: ['clone']
}
self.Irc.build_command(1, self.module_name, 'clone', 'Connect, join, part, kill and say clones')
# Init the module (Mandatory)
self.__init_module()
@@ -57,9 +53,6 @@ class Clone():
def __init_module(self) -> None:
# Enrigstrer les nouvelles commandes dans le code
self.__set_commands(self.commands_level)
# Créer les tables necessaire a votre module (ce n'es pas obligatoire)
self.__create_tables()
@@ -73,26 +66,12 @@ class Clone():
self.__load_module_configuration()
self.Channel.db_query_channel(action='add', module_name=self.module_name, channel_name=self.Config.CLONE_CHANNEL)
self.Protocol.sendChanJoin(self.Config.SERVICE_NICKNAME, self.Config.CLONE_CHANNEL)
self.Protocol.send_join_chan(self.Config.SERVICE_NICKNAME, self.Config.CLONE_CHANNEL)
self.Protocol.send2socket(f":{self.Config.SERVICE_NICKNAME} SAMODE {self.Config.CLONE_CHANNEL} +o {self.Config.SERVICE_NICKNAME}")
self.Protocol.send2socket(f":{self.Config.SERVICE_NICKNAME} MODE {self.Config.CLONE_CHANNEL} +nts")
self.Protocol.send2socket(f":{self.Config.SERVICE_NICKNAME} MODE {self.Config.CLONE_CHANNEL} +k {self.Config.CLONE_CHANNEL_PASSWORD}")
def __set_commands(self, commands:dict[int, list[str]]) -> None:
"""### Rajoute les commandes du module au programme principal
Args:
commands (list): Liste des commandes du module
"""
for level, com in commands.items():
for c in commands[level]:
if not c in self.Irc.commands:
self.Irc.commands_level[level].append(c)
self.Irc.commands.append(c)
return None
def __create_tables(self) -> None:
"""Methode qui va créer la base de donnée si elle n'existe pas.
Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module
@@ -140,7 +119,7 @@ class Clone():
self.Channel.db_query_channel(action='del', module_name=self.module_name, channel_name=self.Config.CLONE_CHANNEL)
self.Protocol.send2socket(f":{self.Config.SERVICE_NICKNAME} MODE {self.Config.CLONE_CHANNEL} -nts")
self.Protocol.send2socket(f":{self.Config.SERVICE_NICKNAME} MODE {self.Config.CLONE_CHANNEL} -k {self.Config.CLONE_CHANNEL_PASSWORD}")
self.Protocol.sendChanPart(self.Config.SERVICE_NICKNAME, self.Config.CLONE_CHANNEL)
self.Protocol.send_part_chan(self.Config.SERVICE_NICKNAME, self.Config.CLONE_CHANNEL)
return None
@@ -155,7 +134,7 @@ class Clone():
vhost = ''.join(rand_1) + '.' + ''.join(rand_2) + '.' + ''.join(rand_3) + '.IP'
return vhost
def generate_clones(self, group: str = 'Default') -> None:
def generate_clones(self, group: str = 'Default', auto_remote_ip: bool = False) -> None:
try:
fakeEN = self.fakeEN
@@ -188,7 +167,7 @@ class Clone():
department = fakeFR.department_name()
realname = f'{age} {gender} {department}'
decoded_ip = fakeEN.ipv4_private()
decoded_ip = fakeEN.ipv4_private() if auto_remote_ip else '127.0.0.1'
hostname = fakeEN.hostname()
vhost = self.generate_vhost()
@@ -231,10 +210,10 @@ class Clone():
except Exception as err:
self.Logs.error(f"General Error: {err}")
def thread_connect_clones(self, number_of_clones:int , group: str, interval: float = 0.2) -> None:
def thread_connect_clones(self, number_of_clones:int , group: str = 'Default', auto_remote_ip: bool = False, interval: float = 0.2) -> None:
for i in range(0, number_of_clones):
self.generate_clones(group=group)
self.generate_clones(group=group, auto_remote_ip=auto_remote_ip)
for clone in self.Clone.UID_CLONE_DB:
@@ -244,8 +223,8 @@ class Clone():
break
if not clone.connected:
self.Protocol.sendUID(clone.nickname, clone.username, clone.hostname, clone.uid, clone.umodes, clone.vhost, clone.remote_ip, clone.realname, print_log=False)
self.Protocol.sendChanJoin(uidornickname=clone.uid, channel=self.Config.CLONE_CHANNEL, password=self.Config.CLONE_CHANNEL_PASSWORD, print_log=False)
self.Protocol.send_uid(clone.nickname, clone.username, clone.hostname, clone.uid, clone.umodes, clone.vhost, clone.remote_ip, clone.realname, print_log=False)
self.Protocol.send_join_chan(uidornickname=clone.uid, channel=self.Config.CLONE_CHANNEL, password=self.Config.CLONE_CHANNEL_PASSWORD, print_log=False)
time.sleep(interval)
clone.connected = True
@@ -257,7 +236,7 @@ class Clone():
clone_to_kill.append(clone.uid)
for clone_uid in clone_to_kill:
self.Protocol.sendQuit(clone_uid, 'Gooood bye', print_log=False)
self.Protocol.send_quit(clone_uid, 'Gooood bye', print_log=False)
del clone_to_kill
@@ -297,7 +276,7 @@ class Clone():
if getClone.uid != self.Config.SERVICE_ID:
final_message = f"{senderObj.nickname}!{senderObj.username}@{senderObj.hostname} > {senderMsg.lstrip(':')}"
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=getClone.uid,
msg=final_message,
channel=self.Config.CLONE_CHANNEL
@@ -306,7 +285,7 @@ class Clone():
except Exception as err:
self.Logs.error(f'General Error: {err}')
def _hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
def hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
try:
command = str(cmd[0]).lower()
@@ -319,11 +298,11 @@ class Clone():
case 'clone':
if len(cmd) == 1:
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone connect NUMBER GROUP_NAME INTERVAL")
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone kill [all | nickname]")
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone join [all | nickname] #channel")
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone part [all | nickname] #channel")
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone list")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone connect NUMBER GROUP_NAME INTERVAL")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone kill [all | nickname]")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone join [all | nickname] #channel")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone part [all | nickname] #channel")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone list")
option = str(cmd[1]).lower()
@@ -331,21 +310,21 @@ class Clone():
case 'connect':
try:
# clone connect 5 Group 3
# clone connect 5 GroupName 3
self.stop = False
number_of_clones = int(cmd[2])
group = str(cmd[3]).lower()
connection_interval = int(cmd[4]) if len(cmd) == 5 else 0.5
connection_interval = int(cmd[4]) if len(cmd) == 5 else 0.2
self.Base.create_thread(
func=self.thread_connect_clones,
func_args=(number_of_clones, group, connection_interval)
func_args=(number_of_clones, group, False, connection_interval)
)
except Exception as err:
self.Logs.error(f'{err}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone connect [number of clone you want to connect] [Group]")
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f"Exemple /msg {dnickname} clone connect 6 Ambiance")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone connect [number of clone you want to connect] [Group]")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Exemple /msg {dnickname} clone connect 6 Ambiance")
case 'kill':
try:
@@ -360,12 +339,12 @@ class Clone():
else:
cloneObj = self.Clone.get_Clone(clone_name)
if not cloneObj is None:
self.Protocol.sendQuit(cloneObj.uid, 'Goood bye', print_log=False)
self.Protocol.send_quit(cloneObj.uid, 'Goood bye', print_log=False)
except Exception as err:
self.Logs.error(f'{err}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone kill all")
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone kill clone_nickname")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone kill all")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone kill clone_nickname")
case 'join':
try:
@@ -376,17 +355,17 @@ class Clone():
if clone_name.lower() == 'all':
for clone in self.Clone.UID_CLONE_DB:
self.Protocol.sendChanJoin(uidornickname=clone.uid, channel=clone_channel_to_join, print_log=False)
self.Protocol.send_join_chan(uidornickname=clone.uid, channel=clone_channel_to_join, print_log=False)
else:
if self.Clone.exists(clone_name):
if not self.Clone.get_uid(clone_name) is None:
self.Protocol.sendChanJoin(uidornickname=clone_name, channel=clone_channel_to_join, print_log=False)
self.Protocol.send_join_chan(uidornickname=clone_name, channel=clone_channel_to_join, print_log=False)
except Exception as err:
self.Logs.error(f'{err}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone join all #channel")
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone join clone_nickname #channel")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone join all #channel")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone join clone_nickname #channel")
case 'part':
try:
@@ -397,25 +376,25 @@ class Clone():
if clone_name.lower() == 'all':
for clone in self.Clone.UID_CLONE_DB:
self.Protocol.sendChanPart(uidornickname=clone.uid, channel=clone_channel_to_part, print_log=False)
self.Protocol.send_part_chan(uidornickname=clone.uid, channel=clone_channel_to_part, print_log=False)
else:
if self.Clone.exists(clone_name):
clone_uid = self.Clone.get_uid(clone_name)
if not clone_uid is None:
self.Protocol.sendChanPart(uidornickname=clone_uid, channel=clone_channel_to_part, print_log=False)
self.Protocol.send_part_chan(uidornickname=clone_uid, channel=clone_channel_to_part, print_log=False)
except Exception as err:
self.Logs.error(f'{err}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone part all #channel")
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone part clone_nickname #channel")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone part all #channel")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone part clone_nickname #channel")
case 'list':
try:
clone_count = len(self.Clone.UID_CLONE_DB)
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f">> Number of connected clones: {clone_count}")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f">> Number of connected clones: {clone_count}")
for clone_name in self.Clone.UID_CLONE_DB:
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,
msg=f">> Nickname: {clone_name.nickname} | Username: {clone_name.username} | Realname: {clone_name.realname} | Vhost: {clone_name.vhost} | UID: {clone_name.uid} | Group: {clone_name.group} | Connected: {clone_name.connected}")
except Exception as err:
self.Logs.error(f'{err}')
@@ -429,7 +408,7 @@ class Clone():
final_message = ' '.join(cmd[4:])
if clone_channel is None or not self.Clone.exists(clone_name):
self.Protocol.sendNotice(
self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg=f"/msg {dnickname} clone say [clone_nickname] #channel message"
@@ -437,22 +416,22 @@ class Clone():
return None
if self.Clone.exists(clone_name):
self.Protocol.sendPrivMsg(nick_from=clone_name, msg=final_message, channel=clone_channel)
self.Protocol.send_priv_msg(nick_from=clone_name, msg=final_message, channel=clone_channel)
except Exception as err:
self.Logs.error(f'{err}')
self.Protocol.sendNotice(
self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg=f"/msg {dnickname} clone say [clone_nickname] #channel message"
)
case _:
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone connect NUMBER GROUP_NAME INTERVAL")
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone kill [all | nickname]")
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone join [all | nickname] #channel")
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone part [all | nickname] #channel")
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone list")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone connect NUMBER GROUP_NAME INTERVAL")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone kill [all | nickname]")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone join [all | nickname] #channel")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone part [all | nickname] #channel")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} clone list")
except IndexError as ie:
self.Logs.error(f'Index Error: {ie}')

File diff suppressed because it is too large Load Diff

View File

@@ -4,10 +4,9 @@ import time
import re
import psutil
import requests
from dataclasses import dataclass, fields, field
from dataclasses import dataclass
from datetime import datetime
from typing import Union, TYPE_CHECKING
from sys import exit
import core.definition as df
# Le module crée devra réspecter quelques conditions
@@ -19,7 +18,7 @@ import core.definition as df
# 4 . Créer vos tables, en utilisant toujours le nom des votre classe en minuscule ==> defender_votre-table
# 3. Methode suivantes:
# cmd(self, data:list)
# _hcmds(self, user:str, cmd: list)
# hcmds(self, user:str, cmd: list)
# unload(self)
if TYPE_CHECKING:
@@ -82,11 +81,16 @@ class Defender():
self.Reputation = ircInstance.Reputation
# Create module commands (Mandatory)
self.commands_level = {
0: ['code'],
1: ['info', 'autolimit'],
3: ['reputation','proxy_scan', 'flood', 'status', 'timer','show_reputation', 'sentinel']
}
self.Irc.build_command(0, self.module_name, 'code', 'Display the code or key for access')
self.Irc.build_command(1, self.module_name, 'info', 'Provide information about the channel or server')
self.Irc.build_command(1, self.module_name, 'autolimit', 'Automatically set channel user limits')
self.Irc.build_command(3, self.module_name, 'reputation', 'Check or manage user reputation')
self.Irc.build_command(3, self.module_name, 'proxy_scan', 'Scan users for proxy connections')
self.Irc.build_command(3, self.module_name, 'flood', 'Handle flood detection and mitigation')
self.Irc.build_command(3, self.module_name, 'status', 'Check the status of the server or bot')
self.Irc.build_command(3, self.module_name, 'timer', 'Set or manage timers')
self.Irc.build_command(3, self.module_name, 'show_reputation', 'Display reputation information')
self.Irc.build_command(3, self.module_name, 'sentinel', 'Monitor and guard the channel or server')
# Init the module (Mandatory)
self.__init_module()
@@ -96,9 +100,6 @@ class Defender():
def __init_module(self) -> None:
# Insert module commands into the core one (Mandatory)
self.__set_commands(self.commands_level)
# Create you own tables if needed (Mandatory)
self.__create_tables()
@@ -141,6 +142,8 @@ class Defender():
self.Base.create_thread(func=self.thread_local_scan)
self.Base.create_thread(func=self.thread_psutil_scan)
self.Base.create_thread(func=self.thread_reputation_timer)
if self.ModConfig.autolimit == 1:
self.Base.create_thread(func=self.thread_autolimit)
if self.ModConfig.reputation == 1:
@@ -149,20 +152,6 @@ class Defender():
return None
def __set_commands(self, commands:dict[int, list[str]]) -> None:
"""### Rajoute les commandes du module au programme principal
Args:
commands (list): Liste des commandes du module
"""
for level, com in commands.items():
for c in commands[level]:
if not c in self.Irc.commands:
self.Irc.commands_level[level].append(c)
self.Irc.commands.append(c)
return None
def __create_tables(self) -> None:
"""Methode qui va créer la base de donnée si elle n'existe pas.
Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module
@@ -173,31 +162,15 @@ class Defender():
None: Aucun retour n'es attendu
"""
table_channel = '''CREATE TABLE IF NOT EXISTS def_channels (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
channel TEXT
)
'''
# table_autoop = '''CREATE TABLE IF NOT EXISTS defender_autoop (
# id INTEGER PRIMARY KEY AUTOINCREMENT,
# datetime TEXT,
# nickname TEXT,
# channel TEXT
# )
# '''
table_config = '''CREATE TABLE IF NOT EXISTS def_config (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
parameter TEXT,
value TEXT
)
'''
table_trusted = '''CREATE TABLE IF NOT EXISTS def_trusted (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
user TEXT,
host TEXT,
vhost TEXT
)
'''
# self.Base.db_execute_query(table_channel)
# self.Base.db_execute_query(table_autoop)
# self.Base.db_execute_query(table_config)
# self.Base.db_execute_query(table_trusted)
return None
@@ -255,7 +228,7 @@ class Defender():
exec_query = self.Base.db_execute_query(query, {"user": nickname})
response = exec_query.fetchone()
if not response is None:
if response is not None:
q_insert = "INSERT INTO def_trusted (datetime, user, host, vhost) VALUES (?, ?, ?, ?)"
mes_donnees = {'datetime': self.Base.get_datetime(), 'user': nickname, 'host': '*', 'vhost': '*'}
exec_query = self.Base.db_execute_query(q_insert, mes_donnees)
@@ -301,7 +274,8 @@ class Defender():
# Convertir la date enregistrée dans UID_DB en un objet {datetime}
connected_time_string = get_user.connexion_datetime
if type(connected_time_string) == datetime:
if isinstance(connected_time_string, datetime):
connected_time = connected_time_string
else:
connected_time = datetime.strptime(connected_time_string, "%Y-%m-%d %H:%M:%S.%f")
@@ -351,12 +325,12 @@ class Defender():
if not get_reputation.isWebirc:
# Si le user ne vient pas de webIrc
self.Protocol.sendSajoin(nick_to_sajoin=jailed_nickname, channel_name=salon_jail)
self.Protocol.sendPrivMsg(nick_from=self.Config.SERVICE_NICKNAME,
self.Protocol.send_sajoin(nick_to_sajoin=jailed_nickname, channel_name=salon_jail)
self.Protocol.send_priv_msg(nick_from=self.Config.SERVICE_NICKNAME,
msg=f" [{color_red} REPUTATION {nogc}] : Connexion de {jailed_nickname} ({jailed_score}) ==> {salon_jail}",
channel=salon_logs
)
self.Protocol.sendNotice(
self.Protocol.send_notice(
nick_from=self.Config.SERVICE_NICKNAME,
nick_to=jailed_nickname,
msg=f"[{color_red} {jailed_nickname} {color_black}] : Merci de tapez la commande suivante {color_bold}{service_prefix}code {code}{color_bold}"
@@ -386,7 +360,6 @@ class Defender():
service_id = self.Config.SERVICE_ID
dchanlog = self.Config.SERVICE_CHANLOG
color_red = self.Config.COLORS.red
color_black = self.Config.COLORS.black
nogc = self.Config.COLORS.nogc
salon_jail = self.Config.SALON_JAIL
@@ -400,7 +373,7 @@ class Defender():
for user in self.Reputation.UID_REPUTATION_DB:
if not user.isWebirc: # Si il ne vient pas de WebIRC
if self.get_user_uptime_in_minutes(user.uid) >= reputation_timer and int(user.score_connexion) <= int(reputation_seuil):
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=service_id,
msg=f"[{color_red} REPUTATION {nogc}] : Action sur {user.nickname} aprés {str(reputation_timer)} minutes d'inactivité",
channel=dchanlog
@@ -480,7 +453,7 @@ class Defender():
unixtime = self.Base.get_unixtime()
get_diff_secondes = 0
if not get_detected_uid in self.flood_system:
if get_detected_uid not in self.flood_system:
self.flood_system[get_detected_uid] = {
'nbr_msg': 0,
'first_msg_time': unixtime
@@ -495,7 +468,7 @@ class Defender():
elif self.flood_system[get_detected_uid]['nbr_msg'] > flood_message:
self.Irc.Base.logs.info('system de flood detecté')
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=dnickname,
msg=f"{color_red} {color_bold} Flood detected. Apply the +m mode (Ô_o)",
channel=channel
@@ -516,13 +489,13 @@ class Defender():
for param in res.fetchall():
if param[0] == 'reputation':
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=service_id,
msg=f" ===> {param[0]}",
channel=dchanlog
)
else:
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=service_id,
msg=f"{param[0]}",
channel=dchanlog
@@ -555,7 +528,7 @@ class Defender():
connection = (remote_ip, self.Base.int_if_possible(port))
newSocket.connect(connection)
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=self.Config.SERVICE_NICKNAME,
msg=f"[ {self.Config.COLORS.red}PROXY_SCAN{self.Config.COLORS.nogc} ] {fullname} ({remote_ip}) : Port [{str(port)}] ouvert sur l'adresse ip [{remote_ip}]",
channel=self.Config.SERVICE_CHANLOG
@@ -622,7 +595,7 @@ class Defender():
self.Logs.info(f"Connexion of {fullname} ({remote_ip}) using ports : {str(matching_ports)}")
if matching_ports:
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=self.Config.SERVICE_NICKNAME,
msg=f"[ {self.Config.COLORS.red}PSUTIL_SCAN{self.Config.COLORS.black} ] {fullname} ({remote_ip}) : is using ports {matching_ports}",
channel=self.Config.SERVICE_CHANLOG
@@ -694,7 +667,7 @@ class Defender():
# Formatted output
decodedResponse = json.loads(response.text)
if not 'data' in decodedResponse:
if 'data' not in decodedResponse:
return None
result = {
@@ -707,13 +680,12 @@ class Defender():
service_id = self.Config.SERVICE_ID
service_chanlog = self.Config.SERVICE_CHANLOG
color_red = self.Config.COLORS.red
color_black = self.Config.COLORS.black
nogc = self.Config.COLORS.nogc
# pseudo!ident@host
fullname = f'{nickname}!{username}@{hostname}'
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=service_id,
msg=f"[ {color_red}ABUSEIPDB_SCAN{nogc} ] : Connexion de {fullname} ({remote_ip}) ==> Score: {str(result['score'])} | Country : {result['country']} | Tor : {str(result['isTor'])} | Total Reports : {str(result['totalReports'])}",
channel=service_chanlog
@@ -780,7 +752,6 @@ class Defender():
service_id = self.Config.SERVICE_ID
service_chanlog = self.Config.SERVICE_CHANLOG
color_red = self.Config.COLORS.red
color_black = self.Config.COLORS.black
nogc = self.Config.COLORS.nogc
url = f'https://freeipapi.com/api/json/{remote_ip}'
@@ -797,7 +768,7 @@ class Defender():
status_code = response.status_code
if status_code == 429:
self.Logs.warning(f'Too Many Requests - The rate limit for the API has been exceeded.')
self.Logs.warning('Too Many Requests - The rate limit for the API has been exceeded.')
return None
elif status_code != 200:
self.Logs.warning(f'status code = {str(status_code)}')
@@ -811,7 +782,7 @@ class Defender():
# pseudo!ident@host
fullname = f'{nickname}!{username}@{hostname}'
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=service_id,
msg=f"[ {color_red}FREEIPAPI_SCAN{nogc} ] : Connexion de {fullname} ({remote_ip}) ==> Proxy: {str(result['isProxy'])} | Country : {str(result['countryCode'])}",
channel=service_chanlog
@@ -873,10 +844,9 @@ class Defender():
service_id = self.Config.SERVICE_ID
service_chanlog = self.Config.SERVICE_CHANLOG
color_red = self.Config.COLORS.red
color_black = self.Config.COLORS.black
nogc = self.Config.COLORS.nogc
url = f"https://developers18334.cloudfilt.com/"
url = "https://developers18334.cloudfilt.com/"
data = {
'ip': remote_ip,
@@ -902,7 +872,7 @@ class Defender():
# pseudo!ident@host
fullname = f'{nickname}!{username}@{hostname}'
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=service_id,
msg=f"[ {color_red}CLOUDFILT_SCAN{nogc} ] : Connexion de {fullname} ({remote_ip}) ==> Host: {str(result['host'])} | country: {str(result['countryiso'])} | listed: {str(result['listed'])} | listed by : {str(result['listed_by'])}",
channel=service_chanlog
@@ -941,7 +911,7 @@ class Defender():
def thread_autolimit(self) -> None:
if self.ModConfig.autolimit == 0:
self.Logs.debug(f"autolimit deactivated ... canceling the thread")
self.Logs.debug("autolimit deactivated ... canceling the thread")
return None
while self.Irc.autolimit_started:
@@ -958,7 +928,7 @@ class Defender():
while self.autolimit_isRunning:
if self.ModConfig.autolimit == 0:
self.Logs.debug(f"autolimit deactivated ... stopping the current thread")
self.Logs.debug("autolimit deactivated ... stopping the current thread")
break
for chan in self.Channel.UID_CHANNEL_DB:
@@ -967,14 +937,14 @@ class Defender():
self.Protocol.send2socket(f":{self.Config.SERVICE_ID} MODE {chan.name} +l {len(chan.uids) + self.ModConfig.autolimit_amount}")
chan_copy["uids_count"] = len(chan.uids)
if not chan.name in chan_list:
if chan.name not in chan_list:
chan_list.append(chan.name)
chanObj_copy.append({"name": chan.name, "uids_count": 0})
# Verifier si un salon a été vidé
current_chan_in_list = [d.name for d in self.Channel.UID_CHANNEL_DB]
for c in chan_list:
if not c in current_chan_in_list:
if c not in current_chan_in_list:
chan_list.remove(c)
# Si c'est la premiere execution
@@ -1000,7 +970,7 @@ class Defender():
return None
def cmd(self, data: list) -> None:
def cmd(self, data: list[str]) -> None:
try:
service_id = self.Config.SERVICE_ID # Defender serveur id
cmd = list(data).copy()
@@ -1008,7 +978,7 @@ class Defender():
match cmd[1]:
case 'REPUTATION':
# :001 REPUTATION 91.168.141.239 118
# :001 REPUTATION 8.8.8.8 118
try:
self.reputation_first_connexion['ip'] = cmd[2]
self.reputation_first_connexion['score'] = cmd[3]
@@ -1059,12 +1029,12 @@ class Defender():
_User = self.User.get_User(str(cmd[7]))
# If user is not service or IrcOp then scan them
if not re.match(fr'^.*[S|o?].*$', _User.umodes):
self.abuseipdb_UserModel.append(_User) if self.ModConfig.abuseipdb_scan == 1 and not _User.remote_ip in self.Config.WHITELISTED_IP else None
self.freeipapi_UserModel.append(_User) if self.ModConfig.freeipapi_scan == 1 and not _User.remote_ip in self.Config.WHITELISTED_IP else None
self.cloudfilt_UserModel.append(_User) if self.ModConfig.cloudfilt_scan == 1 and not _User.remote_ip in self.Config.WHITELISTED_IP else None
self.psutil_UserModel.append(_User) if self.ModConfig.psutil_scan == 1 and not _User.remote_ip in self.Config.WHITELISTED_IP else None
self.localscan_UserModel.append(_User) if self.ModConfig.local_scan == 1 and not _User.remote_ip in self.Config.WHITELISTED_IP else None
if not re.match(r'^.*[S|o?].*$', _User.umodes):
self.abuseipdb_UserModel.append(_User) if self.ModConfig.abuseipdb_scan == 1 and _User.remote_ip not in self.Config.WHITELISTED_IP else None
self.freeipapi_UserModel.append(_User) if self.ModConfig.freeipapi_scan == 1 and _User.remote_ip not in self.Config.WHITELISTED_IP else None
self.cloudfilt_UserModel.append(_User) if self.ModConfig.cloudfilt_scan == 1 and _User.remote_ip not in self.Config.WHITELISTED_IP else None
self.psutil_UserModel.append(_User) if self.ModConfig.psutil_scan == 1 and _User.remote_ip not in self.Config.WHITELISTED_IP else None
self.localscan_UserModel.append(_User) if self.ModConfig.local_scan == 1 and _User.remote_ip not in self.Config.WHITELISTED_IP else None
if _User is None:
self.Logs.critical(f'This UID: [{cmd[7]}] is not available please check why')
@@ -1075,9 +1045,9 @@ class Defender():
if self.Config.DEFENDER_INIT == 0:
# Si le user n'es pas un service ni un IrcOP
if not re.match(fr'^.*[S|o?].*$', _User.umodes):
if not re.match(r'^.*[S|o?].*$', _User.umodes):
if reputation_flag == 1 and _User.score_connexion <= reputation_seuil:
currentDateTime = self.Base.get_datetime()
# currentDateTime = self.Base.get_datetime()
self.Reputation.insert(
self.Loader.Definition.MReputation(
**_User.__dict__,
@@ -1107,12 +1077,12 @@ class Defender():
self.Protocol.send2socket(f":{service_id} MODE {parsed_chan} +b ~security-group:unknown-users")
self.Protocol.send2socket(f":{service_id} MODE {parsed_chan} +eee ~security-group:webirc-users ~security-group:known-users ~security-group:websocket-users")
if not get_reputation is None:
if get_reputation is not None:
isWebirc = get_reputation.isWebirc
if not isWebirc:
if parsed_chan != self.Config.SALON_JAIL:
self.Protocol.sendSapart(nick_to_sapart=get_reputation.nickname, channel_name=parsed_chan)
self.Protocol.send_sapart(nick_to_sapart=get_reputation.nickname, channel_name=parsed_chan)
if self.ModConfig.reputation_ban_all_chan == 1 and not isWebirc:
if parsed_chan != self.Config.SALON_JAIL:
@@ -1187,7 +1157,7 @@ class Defender():
get_user_reputation = self.Reputation.get_Reputation(final_UID)
if not get_user_reputation is None:
if get_user_reputation is not None:
final_nickname = get_user_reputation.nickname
for chan in self.Channel.UID_CHANNEL_DB:
if chan.name != jail_salon and ban_all_chan == 1:
@@ -1201,7 +1171,7 @@ class Defender():
except Exception as err:
self.Logs.error(f"General Error: {err}")
def _hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
def hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
command = str(cmd[0]).lower()
fromuser = user
@@ -1231,12 +1201,12 @@ class Defender():
case 'show_reputation':
if not self.Reputation.UID_REPUTATION_DB:
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=" No one is suspected")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="No one is suspected")
for suspect in self.Reputation.UID_REPUTATION_DB:
self.Protocol.sendNotice(nick_from=dnickname,
self.Protocol.send_notice(nick_from=dnickname,
nick_to=fromuser,
msg=f" Uid: {suspect.uid} | Nickname: {suspect.nickname} | Reputation: {suspect.score} | Secret code: {suspect.secret_code} | Connected on: {suspect.connected_datetime}")
msg=f" Uid: {suspect.uid} | Nickname: {suspect.nickname} | Reputation: {suspect.score_connexion} | Secret code: {suspect.secret_code} | Connected on: {suspect.connexion_datetime}")
case 'code':
try:
@@ -1246,7 +1216,7 @@ class Defender():
get_reputation = self.Reputation.get_Reputation(jailed_UID)
if get_reputation is None:
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=" No code is requested ...")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=" No code is requested ...")
return False
jailed_IP = get_reputation.remote_ip
@@ -1260,7 +1230,7 @@ class Defender():
color_black = self.Config.COLORS.black
if release_code == get_reputation.secret_code:
self.Protocol.sendPrivMsg(nick_from=dnickname, msg="Bon mot de passe. Allez du vent !", channel=jailed_salon)
self.Protocol.send_priv_msg(nick_from=dnickname, msg="Bon mot de passe. Allez du vent !", channel=jailed_salon)
if self.ModConfig.reputation_ban_all_chan == 1:
for chan in self.Channel.UID_CHANNEL_DB:
@@ -1269,21 +1239,21 @@ class Defender():
self.Reputation.delete(jailed_UID)
self.Logs.debug(f'{jailed_UID} - {jailed_nickname} removed from REPUTATION_DB')
self.Protocol.sendSapart(nick_to_sapart=jailed_nickname, channel_name=jailed_salon)
self.Protocol.sendSajoin(nick_to_sajoin=jailed_nickname, channel_name=welcome_salon)
self.Protocol.send_sapart(nick_to_sapart=jailed_nickname, channel_name=jailed_salon)
self.Protocol.send_sajoin(nick_to_sajoin=jailed_nickname, channel_name=welcome_salon)
self.Protocol.send2socket(f":{link} REPUTATION {jailed_IP} {self.ModConfig.reputation_score_after_release}")
self.User.get_User(jailed_UID).score_connexion = reputation_seuil + 1
self.Protocol.sendPrivMsg(nick_from=dnickname,
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[{color_green} MOT DE PASS CORRECT {color_black}] : You have now the right to enjoy the network !",
nick_to=jailed_nickname)
else:
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=dnickname,
msg="Mauvais password",
channel=jailed_salon
)
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=dnickname,
msg=f"[{color_green} MAUVAIS PASSWORD {color_black}] You have typed a wrong code. for recall your password is: {self.Config.SERVICE_PREFIX}code {get_reputation.secret_code}",
nick_to=jailed_nickname
@@ -1291,7 +1261,7 @@ class Defender():
except IndexError as ie:
self.Logs.error(f'Index Error: {ie}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} code [code]")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} code [code]")
except KeyError as ke:
self.Logs.error(f'_hcmd code: KeyError {ke}')
@@ -1299,6 +1269,10 @@ class Defender():
try:
# autolimit on
# autolimit set [amount] [interval]
if len(cmd) < 2:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} ON")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} SET [AMOUNT] [INTERVAL]")
return None
arg = str(cmd[1]).lower()
@@ -1308,16 +1282,16 @@ class Defender():
self.__update_configuration('autolimit', 1)
self.autolimit_isRunning = True
self.Base.create_thread(self.thread_autolimit)
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[{self.Config.COLORS.green}AUTOLIMIT{self.Config.COLORS.nogc}] Activated", channel=self.Config.SERVICE_CHANLOG)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[{self.Config.COLORS.green}AUTOLIMIT{self.Config.COLORS.nogc}] Activated", channel=self.Config.SERVICE_CHANLOG)
else:
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[{self.Config.COLORS.red}AUTOLIMIT{self.Config.COLORS.nogc}] Already activated", channel=self.Config.SERVICE_CHANLOG)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[{self.Config.COLORS.red}AUTOLIMIT{self.Config.COLORS.nogc}] Already activated", channel=self.Config.SERVICE_CHANLOG)
case 'off':
if self.ModConfig.autolimit == 1:
self.__update_configuration('autolimit', 0)
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[{self.Config.COLORS.green}AUTOLIMIT{self.Config.COLORS.nogc}] Deactivated", channel=self.Config.SERVICE_CHANLOG)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[{self.Config.COLORS.green}AUTOLIMIT{self.Config.COLORS.nogc}] Deactivated", channel=self.Config.SERVICE_CHANLOG)
else:
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[{self.Config.COLORS.red}AUTOLIMIT{self.Config.COLORS.nogc}] Already Deactivated", channel=self.Config.SERVICE_CHANLOG)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[{self.Config.COLORS.red}AUTOLIMIT{self.Config.COLORS.nogc}] Already Deactivated", channel=self.Config.SERVICE_CHANLOG)
case 'set':
amount = int(cmd[2])
@@ -1325,19 +1299,19 @@ class Defender():
self.__update_configuration('autolimit_amount', amount)
self.__update_configuration('autolimit_interval', interval)
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=dnickname,
msg=f"[{self.Config.COLORS.green}AUTOLIMIT{self.Config.COLORS.nogc}] Amount set to ({amount}) | Interval set to ({interval})",
channel=self.Config.SERVICE_CHANLOG
)
case _:
self.Protocol.sendNotice(nick_from=dnickname, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} ON", nickname=fromuser)
self.Protocol.sendNotice(nick_from=dnickname, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} SET [AMOUNT] [INTERVAL]", nickname=fromuser)
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} ON")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} SET [AMOUNT] [INTERVAL]")
except Exception as err:
self.Protocol.sendNotice(nick_from=dnickname, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} ON", nickname=fromuser)
self.Protocol.sendNotice(nick_from=dnickname, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} SET [AMOUNT] [INTERVAL]", nickname=fromuser)
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} ON")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} SET [AMOUNT] [INTERVAL]")
self.Logs.error(f"Value Error -> {err}")
case 'reputation':
@@ -1355,15 +1329,15 @@ class Defender():
if activation == 'on':
if self.ModConfig.reputation == 1:
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}REPUTATION{self.Config.COLORS.black} ] : Already activated", channel=dchanlog)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}REPUTATION{self.Config.COLORS.black} ] : Already activated", channel=dchanlog)
return False
# self.update_db_configuration('reputation', 1)
self.__update_configuration(key, 1)
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}REPUTATION{self.Config.COLORS.black} ] : Activated by {fromuser}", channel=dchanlog)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}REPUTATION{self.Config.COLORS.black} ] : Activated by {fromuser}", channel=dchanlog)
self.Protocol.sendChanJoin(uidornickname=dnickname, channel=jail_chan)
self.Protocol.send_join_chan(uidornickname=dnickname, channel=jail_chan)
self.Protocol.send2socket(f":{service_id} SAMODE {jail_chan} +{dumodes} {dnickname}")
self.Protocol.send2socket(f":{service_id} MODE {jail_chan} +{jail_chan_mode}")
@@ -1378,7 +1352,7 @@ class Defender():
if activation == 'off':
if self.ModConfig.reputation == 0:
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=dnickname,
msg=f"[ {self.Config.COLORS.green}REPUTATION{self.Config.COLORS.black} ] : Already deactivated",
channel=dchanlog
@@ -1387,7 +1361,7 @@ class Defender():
self.__update_configuration(key, 0)
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=dnickname,
msg=f"[ {self.Config.COLORS.red}REPUTATION{self.Config.COLORS.black} ] : Deactivated by {fromuser}",
channel=dchanlog
@@ -1419,7 +1393,7 @@ class Defender():
if get_value == 'on':
if self.ModConfig.reputation_ban_all_chan == 1:
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=dnickname,
msg=f"[ {self.Config.COLORS.red}BAN ON ALL CHANS{self.Config.COLORS.black} ] : Already activated",
channel=dchanlog
@@ -1429,7 +1403,7 @@ class Defender():
# self.update_db_configuration(key, 1)
self.__update_configuration(key, 1)
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=dnickname,
msg=f"[ {self.Config.COLORS.green}BAN ON ALL CHANS{self.Config.COLORS.black} ] : Activated by {fromuser}",
channel=dchanlog
@@ -1437,7 +1411,7 @@ class Defender():
elif get_value == 'off':
if self.ModConfig.reputation_ban_all_chan == 0:
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=dnickname,
msg=f"[ {self.Config.COLORS.red}BAN ON ALL CHANS{self.Config.COLORS.black} ] : Already deactivated",
channel=dchanlog
@@ -1447,7 +1421,7 @@ class Defender():
# self.update_db_configuration(key, 0)
self.__update_configuration(key, 0)
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=dnickname,
msg=f"[ {self.Config.COLORS.green}BAN ON ALL CHANS{self.Config.COLORS.black} ] : Deactivated by {fromuser}",
channel=dchanlog
@@ -1460,69 +1434,69 @@ class Defender():
# self.update_db_configuration(key, reputation_seuil)
self.__update_configuration(key, reputation_seuil)
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=dnickname,
msg=f"[ {self.Config.COLORS.green}REPUTATION SEUIL{self.Config.COLORS.black} ] : Limit set to {str(reputation_seuil)} by {fromuser}",
channel=dchanlog
)
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f" Reputation set to {reputation_seuil}")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Reputation set to {reputation_seuil}")
case 'timer':
reputation_timer = int(cmd[3])
key = 'reputation_timer'
self.__update_configuration(key, reputation_timer)
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=dnickname,
msg=f"[ {self.Config.COLORS.green}REPUTATION TIMER{self.Config.COLORS.black} ] : Timer set to {str(reputation_timer)} minute(s) by {fromuser}",
channel=dchanlog
)
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f" Reputation set to {reputation_timer}")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Reputation set to {reputation_timer}")
case 'score_after_release':
reputation_score_after_release = int(cmd[3])
key = 'reputation_score_after_release'
self.__update_configuration(key, reputation_score_after_release)
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=dnickname,
msg=f"[ {self.Config.COLORS.green}REPUTATION SCORE AFTER RELEASE{self.Config.COLORS.black} ] : Reputation score after release set to {str(reputation_score_after_release)} by {fromuser}",
channel=dchanlog
)
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f" Reputation score after release set to {reputation_score_after_release}")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Reputation score after release set to {reputation_score_after_release}")
case 'security_group':
reputation_sg = int(cmd[3])
key = 'reputation_sg'
self.__update_configuration(key, reputation_sg)
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=dnickname,
msg=f"[ {self.Config.COLORS.green}REPUTATION SECURITY-GROUP{self.Config.COLORS.black} ] : Reputation Security-group set to {str(reputation_sg)} by {fromuser}",
channel=dchanlog
)
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f" Reputation score after release set to {reputation_sg}")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Reputation score after release set to {reputation_sg}")
case _:
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation [ON/OFF]")
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set banallchan [ON/OFF]")
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set limit [1234]")
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set score_after_release [1234]")
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set timer [1234]")
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set action [kill|None]")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation [ON/OFF]")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set banallchan [ON/OFF]")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set limit [1234]")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set score_after_release [1234]")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set timer [1234]")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set action [kill|None]")
except IndexError as ie:
self.Logs.warning(f'{ie}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation [ON/OFF]")
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set banallchan [ON/OFF]")
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set limit [1234]")
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set score_after_release [1234]")
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set timer [1234]")
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set action [kill|None]")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation [ON/OFF]")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set banallchan [ON/OFF]")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set limit [1234]")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set score_after_release [1234]")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set timer [1234]")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" Right command : /msg {dnickname} reputation set action [kill|None]")
except ValueError as ve:
self.Logs.warning(f'{ve}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f" La valeur devrait etre un entier >= 0")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=" La valeur devrait etre un entier >= 0")
case 'proxy_scan':
@@ -1538,11 +1512,11 @@ class Defender():
set_key = str(cmd[1]).lower()
if set_key != 'set':
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set local_scan [ON/OFF]')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set psutil_scan [ON/OFF]')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set abuseipdb_scan [ON/OFF]')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set freeipapi_scan [ON/OFF]')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set cloudfilt_scan [ON/OFF]')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set local_scan [ON/OFF]')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set psutil_scan [ON/OFF]')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set abuseipdb_scan [ON/OFF]')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set freeipapi_scan [ON/OFF]')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set cloudfilt_scan [ON/OFF]')
option = str(cmd[2]).lower() # => local_scan, psutil_scan, abuseipdb_scan
action = str(cmd[3]).lower() # => on / off
@@ -1551,105 +1525,105 @@ class Defender():
case 'local_scan':
if action == 'on':
if self.ModConfig.local_scan == 1:
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
return None
self.__update_configuration(option, 1)
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
elif action == 'off':
if self.ModConfig.local_scan == 0:
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog)
return None
self.__update_configuration(option, 0)
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
case 'psutil_scan':
if action == 'on':
if self.ModConfig.psutil_scan == 1:
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
return None
self.__update_configuration(option, 1)
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
elif action == 'off':
if self.ModConfig.psutil_scan == 0:
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog)
return None
self.__update_configuration(option, 0)
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
case 'abuseipdb_scan':
if action == 'on':
if self.ModConfig.abuseipdb_scan == 1:
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
return None
self.__update_configuration(option, 1)
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
elif action == 'off':
if self.ModConfig.abuseipdb_scan == 0:
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog)
return None
self.__update_configuration(option, 0)
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
case 'freeipapi_scan':
if action == 'on':
if self.ModConfig.freeipapi_scan == 1:
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
return None
self.__update_configuration(option, 1)
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
elif action == 'off':
if self.ModConfig.freeipapi_scan == 0:
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog)
return None
self.__update_configuration(option, 0)
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
case 'cloudfilt_scan':
if action == 'on':
if self.ModConfig.cloudfilt_scan == 1:
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
return None
self.__update_configuration(option, 1)
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
elif action == 'off':
if self.ModConfig.cloudfilt_scan == 0:
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog)
return None
self.__update_configuration(option, 0)
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
case _:
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set local_scan [ON/OFF]')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set psutil_scan [ON/OFF]')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set abuseipdb_scan [ON/OFF]')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set freeipapi_scan [ON/OFF]')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set cloudfilt_scan [ON/OFF]')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set local_scan [ON/OFF]')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set psutil_scan [ON/OFF]')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set abuseipdb_scan [ON/OFF]')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set freeipapi_scan [ON/OFF]')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set cloudfilt_scan [ON/OFF]')
else:
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set local_scan [ON/OFF]')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set psutil_scan [ON/OFF]')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set abuseipdb_scan [ON/OFF]')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set freeipapi_scan [ON/OFF]')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set cloudfilt_scan [ON/OFF]')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set local_scan [ON/OFF]')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set psutil_scan [ON/OFF]')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set abuseipdb_scan [ON/OFF]')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set freeipapi_scan [ON/OFF]')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Right command : /msg {dnickname} proxy_scan set cloudfilt_scan [ON/OFF]')
case 'flood':
# .flood on/off
@@ -1664,21 +1638,21 @@ class Defender():
key = 'flood'
if activation == 'on':
if self.ModConfig.flood == 1:
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Already activated", channel=dchanlog)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Already activated", channel=dchanlog)
return False
self.__update_configuration(key, 1)
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Activated by {fromuser}", channel=dchanlog)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Activated by {fromuser}", channel=dchanlog)
if activation == 'off':
if self.ModConfig.flood == 0:
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.red}FLOOD{self.Config.COLORS.black} ] : Already Deactivated", channel=dchanlog)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.red}FLOOD{self.Config.COLORS.black} ] : Already Deactivated", channel=dchanlog)
return False
self.__update_configuration(key, 0)
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Deactivated by {fromuser}", channel=dchanlog)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Deactivated by {fromuser}", channel=dchanlog)
if len_cmd == 4:
set_key = str(cmd[2]).lower()
@@ -1690,7 +1664,7 @@ class Defender():
set_value = int(cmd[3])
self.__update_configuration(key, set_value)
self.Protocol.sendPrivMsg(nick_from=dnickname,
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Flood message set to {set_value} by {fromuser}",
channel=dchanlog)
@@ -1699,7 +1673,7 @@ class Defender():
set_value = int(cmd[3])
self.__update_configuration(key, set_value)
self.Protocol.sendPrivMsg(nick_from=dnickname,
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Flood time set to {set_value} by {fromuser}",
channel=dchanlog)
@@ -1708,7 +1682,7 @@ class Defender():
set_value = int(cmd[3])
self.__update_configuration(key, set_value)
self.Protocol.sendPrivMsg(nick_from=dnickname,
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Flood timer set to {set_value} by {fromuser}",
channel=dchanlog)
@@ -1724,22 +1698,22 @@ class Defender():
color_black = self.Config.COLORS.black
nogc = self.Config.COLORS.nogc
try:
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' [{color_green if self.ModConfig.reputation == 1 else color_red}Reputation{nogc}] ==> {self.ModConfig.reputation}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' reputation_seuil ==> {self.ModConfig.reputation_seuil}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' reputation_after_release ==> {self.ModConfig.reputation_score_after_release}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' reputation_ban_all_chan ==> {self.ModConfig.reputation_ban_all_chan}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' reputation_timer ==> {self.ModConfig.reputation_timer}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' [Proxy_scan]')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' {color_green if self.ModConfig.local_scan == 1 else color_red}local_scan{nogc} ==> {self.ModConfig.local_scan}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' {color_green if self.ModConfig.psutil_scan == 1 else color_red}psutil_scan{nogc} ==> {self.ModConfig.psutil_scan}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' {color_green if self.ModConfig.abuseipdb_scan == 1 else color_red}abuseipdb_scan{nogc} ==> {self.ModConfig.abuseipdb_scan}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' {color_green if self.ModConfig.freeipapi_scan == 1 else color_red}freeipapi_scan{nogc} ==> {self.ModConfig.freeipapi_scan}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' {color_green if self.ModConfig.cloudfilt_scan == 1 else color_red}cloudfilt_scan{nogc} ==> {self.ModConfig.cloudfilt_scan}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' [{color_green if self.ModConfig.flood == 1 else color_red}Flood{nogc}] ==> {self.ModConfig.flood}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' flood_action ==> Coming soon')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' flood_message ==> {self.ModConfig.flood_message}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' flood_time ==> {self.ModConfig.flood_time}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' flood_timer ==> {self.ModConfig.flood_timer}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' [{color_green if self.ModConfig.reputation == 1 else color_red}Reputation{nogc}] ==> {self.ModConfig.reputation}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' reputation_seuil ==> {self.ModConfig.reputation_seuil}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' reputation_after_release ==> {self.ModConfig.reputation_score_after_release}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' reputation_ban_all_chan ==> {self.ModConfig.reputation_ban_all_chan}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' reputation_timer ==> {self.ModConfig.reputation_timer}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=' [Proxy_scan]')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' {color_green if self.ModConfig.local_scan == 1 else color_red}local_scan{nogc} ==> {self.ModConfig.local_scan}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' {color_green if self.ModConfig.psutil_scan == 1 else color_red}psutil_scan{nogc} ==> {self.ModConfig.psutil_scan}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' {color_green if self.ModConfig.abuseipdb_scan == 1 else color_red}abuseipdb_scan{nogc} ==> {self.ModConfig.abuseipdb_scan}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' {color_green if self.ModConfig.freeipapi_scan == 1 else color_red}freeipapi_scan{nogc} ==> {self.ModConfig.freeipapi_scan}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' {color_green if self.ModConfig.cloudfilt_scan == 1 else color_red}cloudfilt_scan{nogc} ==> {self.ModConfig.cloudfilt_scan}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' [{color_green if self.ModConfig.flood == 1 else color_red}Flood{nogc}] ==> {self.ModConfig.flood}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=' flood_action ==> Coming soon')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' flood_message ==> {self.ModConfig.flood_message}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' flood_time ==> {self.ModConfig.flood_time}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' flood_timer ==> {self.ModConfig.flood_timer}')
except KeyError as ke:
self.Logs.error(f"Key Error : {ke}")
@@ -1748,29 +1722,29 @@ class Defender():
nickoruid = cmd[1]
UserObject = self.User.get_User(nickoruid)
if not UserObject is None:
if UserObject is not None:
channels: list = []
for chan in self.Channel.UID_CHANNEL_DB:
for uid_in_chan in chan.uids:
if self.Base.clean_uid(uid_in_chan) == UserObject.uid:
channels.append(chan.name)
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' UID : {UserObject.uid}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' NICKNAME : {UserObject.nickname}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' USERNAME : {UserObject.username}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' REALNAME : {UserObject.realname}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' HOSTNAME : {UserObject.hostname}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' VHOST : {UserObject.vhost}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' IP : {UserObject.remote_ip}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' Country : {UserObject.geoip}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' WebIrc : {UserObject.isWebirc}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' WebWebsocket : {UserObject.isWebsocket}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' REPUTATION : {UserObject.score_connexion}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' MODES : {UserObject.umodes}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' CHANNELS : {channels}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f' CONNECTION TIME : {UserObject.connexion_datetime}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' UID : {UserObject.uid}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' NICKNAME : {UserObject.nickname}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' USERNAME : {UserObject.username}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' REALNAME : {UserObject.realname}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' HOSTNAME : {UserObject.hostname}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' VHOST : {UserObject.vhost}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' IP : {UserObject.remote_ip}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' Country : {UserObject.geoip}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' WebIrc : {UserObject.isWebirc}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' WebWebsocket : {UserObject.isWebsocket}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' REPUTATION : {UserObject.score_connexion}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' MODES : {UserObject.umodes}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' CHANNELS : {channels}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' CONNECTION TIME : {UserObject.connexion_datetime}')
else:
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f":{dnickname} NOTICE {fromuser} : This user {nickoruid} doesn't exist")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f":{dnickname} NOTICE {fromuser} : This user {nickoruid} doesn't exist")
except KeyError as ke:
self.Logs.warning(f"Key error info user : {ke}")
@@ -1784,10 +1758,10 @@ class Defender():
if activation == 'on':
for chan in self.Channel.UID_CHANNEL_DB:
if not chan.name in channel_to_dont_quit:
self.Protocol.sendChanJoin(uidornickname=dnickname, channel=chan.name)
if chan.name not in channel_to_dont_quit:
self.Protocol.send_join_chan(uidornickname=dnickname, channel=chan.name)
if activation == 'off':
for chan in self.Channel.UID_CHANNEL_DB:
if not chan.name in channel_to_dont_quit:
if chan.name not in channel_to_dont_quit:
self.Protocol.part(uidornickname=dnickname, channel=chan.name)
self.join_saved_channels()

View File

@@ -42,9 +42,8 @@ class Jsonrpc():
self.Channel = ircInstance.Channel
# Create module commands (Mandatory)
self.commands_level = {
1: ['jsonrpc', 'jruser']
}
self.Irc.build_command(1, self.module_name, 'jsonrpc', 'Activate the JSON RPC Live connection [ON|OFF]')
self.Irc.build_command(1, self.module_name, 'jruser', 'Get Information about a user using JSON RPC')
# Init the module
self.__init_module()
@@ -54,9 +53,8 @@ class Jsonrpc():
def __init_module(self) -> None:
# Insert module commands into the core one (Mandatory)
self.__set_commands(self.commands_level)
logging.getLogger('websockets').setLevel(logging.WARNING)
logging.getLogger('unrealircd-rpc-py').setLevel(logging.CRITICAL)
# Create you own tables (Mandatory)
# self.__create_tables()
@@ -65,13 +63,6 @@ class Jsonrpc():
self.__load_module_configuration()
# End of mandatory methods you can start your customization #
# self.UnrealIrcdRpcLive: Live = Live(
# req_method='unixsocket',
# path_to_socket_file=self.Config.JSONRPC_PATH_TO_SOCKET_FILE,
# callback_object_instance=self,
# callback_method_name='callback_sent_to_irc'
# )
self.UnrealIrcdRpcLive: Live = Live(
req_method='websocket',
url=self.Config.JSONRPC_URL,
@@ -91,14 +82,14 @@ class Jsonrpc():
self.subscribed = False
if self.Rpc.Error.code != 0:
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=self.Config.SERVICE_NICKNAME,
msg=f"[{self.Config.COLORS.red}ERROR{self.Config.COLORS.nogc}] {self.Rpc.Error.message}",
channel=self.Config.SERVICE_CHANLOG
)
if self.UnrealIrcdRpcLive.Error.code != 0:
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=self.Config.SERVICE_NICKNAME,
msg=f"[{self.Config.COLORS.red}ERROR{self.Config.COLORS.nogc}] {self.UnrealIrcdRpcLive.Error.message}",
channel=self.Config.SERVICE_CHANLOG
@@ -109,20 +100,6 @@ class Jsonrpc():
return None
def __set_commands(self, commands:dict[int, list[str]]) -> None:
"""### Rajoute les commandes du module au programme principal
Args:
commands (list): Liste des commandes du module
"""
for level, com in commands.items():
for c in commands[level]:
if not c in self.Irc.commands:
self.Irc.commands_level[level].append(c)
self.Irc.commands.append(c)
return None
def __create_tables(self) -> None:
"""Methode qui va créer la base de donnée si elle n'existe pas.
Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module
@@ -153,7 +130,7 @@ class Jsonrpc():
red = self.Config.COLORS.red
if json_response.result == True:
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=self.Config.SERVICE_NICKNAME,
msg=f"[{bold}{green}JSONRPC{nogc}{bold}] Event activated",
channel=dchanlog)
@@ -167,7 +144,7 @@ class Jsonrpc():
build_msg = f"{green}{log_source}{nogc}: [{bold}{level}{bold}] {subsystem}.{event_id} - {msg}"
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=build_msg, channel=dchanlog)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=build_msg, channel=dchanlog)
def thread_start_jsonrpc(self):
@@ -175,7 +152,7 @@ class Jsonrpc():
self.UnrealIrcdRpcLive.subscribe(["all"])
self.subscribed = True
else:
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=self.Config.SERVICE_NICKNAME,
msg=f"[{self.Config.COLORS.red}ERROR{self.Config.COLORS.nogc}] {self.UnrealIrcdRpcLive.Error.message}",
channel=self.Config.SERVICE_CHANLOG
@@ -214,7 +191,7 @@ class Jsonrpc():
return None
def _hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
def hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
command = str(cmd[0]).lower()
dnickname = self.Config.SERVICE_NICKNAME
@@ -229,22 +206,27 @@ class Jsonrpc():
option = str(cmd[1]).lower()
if len(command) == 1:
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f'/msg {dnickname} jsonrpc on')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f'/msg {dnickname} jsonrpc off')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'/msg {dnickname} jsonrpc on')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'/msg {dnickname} jsonrpc off')
match option:
case 'on':
# for logger_name, logger in logging.root.manager.loggerDict.items():
# if isinstance(logger, logging.Logger):
# self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"{logger_name} - {logger.level}")
for thread in self.Base.running_threads:
if thread.getName() == 'thread_start_jsonrpc':
if thread.is_alive():
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=self.Config.SERVICE_NICKNAME,
msg=f"Thread {thread.getName()} is running",
channel=dchannel
)
else:
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=self.Config.SERVICE_NICKNAME,
msg=f"Thread {thread.getName()} is not running, wait untill the process will be cleaned up",
channel=dchannel
@@ -265,7 +247,7 @@ class Jsonrpc():
option = str(cmd[1]).lower()
if len(command) == 1:
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f'/msg {dnickname} jruser get nickname')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'/msg {dnickname} jruser get nickname')
match option:
@@ -279,37 +261,37 @@ class Jsonrpc():
UserInfo = rpc.User.get(uid_to_get)
if rpc.Error.code != 0:
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f'{rpc.Error.message}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'{rpc.Error.message}')
return None
chan_list = []
for chan in UserInfo.user.channels:
chan_list.append(chan.name)
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f'UID : {UserInfo.id}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f'NICKNAME : {UserInfo.name}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f'USERNAME : {UserInfo.user.username}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f'REALNAME : {UserInfo.user.realname}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f'MODES : {UserInfo.user.modes}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f'CHANNELS : {chan_list}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f'SECURITY GROUP : {UserInfo.user.security_groups}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f'REPUTATION : {UserInfo.user.reputation}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'UID : {UserInfo.id}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'NICKNAME : {UserInfo.name}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'USERNAME : {UserInfo.user.username}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'REALNAME : {UserInfo.user.realname}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'MODES : {UserInfo.user.modes}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'CHANNELS : {chan_list}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'SECURITY GROUP : {UserInfo.user.security_groups}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'REPUTATION : {UserInfo.user.reputation}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f'IP : {UserInfo.ip}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f'COUNTRY CODE : {UserInfo.geoip.country_code}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f'ASN : {UserInfo.geoip.asn}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f'ASNAME : {UserInfo.geoip.asname}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f'CLOAKED HOST : {UserInfo.user.cloakedhost}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f'HOSTNAME : {UserInfo.hostname}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f'VHOST : {UserInfo.user.vhost}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f'CLIENT PORT : {UserInfo.client_port}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f'SERVER PORT : {UserInfo.server_port}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'IP : {UserInfo.ip}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'COUNTRY CODE : {UserInfo.geoip.country_code}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'ASN : {UserInfo.geoip.asn}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'ASNAME : {UserInfo.geoip.asname}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'CLOAKED HOST : {UserInfo.user.cloakedhost}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'HOSTNAME : {UserInfo.hostname}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'VHOST : {UserInfo.user.vhost}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'CLIENT PORT : {UserInfo.client_port}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'SERVER PORT : {UserInfo.server_port}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f'CERTFP : {UserInfo.tls.certfp}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f'CIPHER : {UserInfo.tls.cipher}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'CERTFP : {UserInfo.tls.certfp}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'CIPHER : {UserInfo.tls.cipher}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f'IDLE SINCE : {UserInfo.idle_since}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f'CONNECTED SINCE : {UserInfo.connected_since}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'IDLE SINCE : {UserInfo.idle_since}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'CONNECTED SINCE : {UserInfo.connected_since}')
except IndexError as ie:
self.Logs.error(ie)
@@ -319,11 +301,11 @@ class Jsonrpc():
self.Base.create_thread(self.thread_ask_ia, ('',))
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg=f" This is a notice to the sender ...")
self.Protocol.sendPrivMsg(nick_from=dnickname, msg="This is private message to the sender ...", nick_to=fromuser)
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" This is a notice to the sender ...")
self.Protocol.send_priv_msg(nick_from=dnickname, msg="This is private message to the sender ...", nick_to=fromuser)
if not fromchannel is None:
self.Protocol.sendPrivMsg(nick_from=dnickname, msg="This is channel message to the sender ...", channel=fromchannel)
self.Protocol.send_priv_msg(nick_from=dnickname, msg="This is channel message to the sender ...", channel=fromchannel)
# How to update your module configuration
self.__update_configuration('param_exemple2', 7)

View File

@@ -46,12 +46,11 @@ class Test():
self.Reputation = ircInstance.Reputation
# Create module commands (Mandatory)
self.commands_level = {
0: ['test-command'],
1: ['test_level_1'],
2: ['test_level_2'],
3: ['test_level_3']
}
self.Irc.build_command(0, self.module_name, 'test-command', 'Execute a test command')
self.Irc.build_command(1, self.module_name, 'test_level_1', 'Execute a level 1 test command')
self.Irc.build_command(2, self.module_name, 'test_level_2', 'Execute a level 2 test command')
self.Irc.build_command(3, self.module_name, 'test_level_3', 'Execute a level 3 test command')
# Init the module
self.__init_module()
@@ -61,9 +60,6 @@ class Test():
def __init_module(self) -> None:
# Insert module commands into the core one (Mandatory)
self.__set_commands(self.commands_level)
# Create you own tables (Mandatory)
self.__create_tables()
@@ -73,20 +69,6 @@ class Test():
return None
def __set_commands(self, commands:dict[int, list[str]]) -> None:
"""### Rajoute les commandes du module au programme principal
Args:
commands (list): Liste des commandes du module
"""
for level, com in commands.items():
for c in commands[level]:
if not c in self.Irc.commands:
self.Irc.commands_level[level].append(c)
self.Irc.commands.append(c)
return None
def __create_tables(self) -> None:
"""Methode qui va créer la base de donnée si elle n'existe pas.
Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module
@@ -147,7 +129,7 @@ class Test():
except Exception as err:
self.Logs.error(f"General Error: {err}")
def _hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
def hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
command = str(cmd[0]).lower()
dnickname = self.Config.SERVICE_NICKNAME
@@ -159,11 +141,11 @@ class Test():
case 'test-command':
try:
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser, msg="This is a notice to the sender ...")
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"This is private message to the sender ...", nick_to=fromuser)
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="This is a notice to the sender ...")
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"This is private message to the sender ...", nick_to=fromuser)
if not fromchannel is None:
self.Protocol.sendPrivMsg(nick_from=dnickname, msg=f"This is private message to the sender ...", channel=fromchannel)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"This is private message to the sender ...", channel=fromchannel)
# How to update your module configuration
self.__update_configuration('param_exemple2', 7)

View File

@@ -56,9 +56,7 @@ class Votekick():
self.Channel = ircInstance.Channel
# Créer les nouvelles commandes du module
self.commands_level = {
0: ['vote']
}
self.Irc.build_command(1, self.module_name, 'vote', 'The kick vote module')
# Init the module
self.__init_module()
@@ -70,27 +68,11 @@ class Votekick():
# Add admin object to retrieve admin users
self.Admin = self.Irc.Admin
self.__set_commands(self.commands_level)
self.__create_tables()
self.join_saved_channels()
return None
def __set_commands(self, commands:dict[int, list[str]]) -> None:
"""### Rajoute les commandes du module au programme principal
Args:
commands (list): Liste des commandes du module
"""
for level, com in commands.items():
for c in commands[level]:
if not c in self.Irc.commands:
self.Irc.commands_level[level].append(c)
self.Irc.commands.append(c)
return None
def __create_tables(self) -> None:
"""Methode qui va créer la base de donnée si elle n'existe pas.
Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module
@@ -122,7 +104,7 @@ class Votekick():
def unload(self) -> None:
try:
for chan in self.VOTE_CHANNEL_DB:
self.Protocol.sendChanPart(uidornickname=self.Config.SERVICE_ID, channel=chan.channel_name)
self.Protocol.send_part_chan(uidornickname=self.Config.SERVICE_ID, channel=chan.channel_name)
self.VOTE_CHANNEL_DB = []
self.Logs.debug(f'Delete memory DB VOTE_CHANNEL_DB: {self.VOTE_CHANNEL_DB}')
@@ -238,7 +220,7 @@ class Votekick():
if chan.channel_name == channel:
target_user = self.User.get_nickname(chan.target_user)
if chan.vote_for > chan.vote_against:
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=dnickname,
msg=f"User {self.Config.COLORS.bold}{target_user}{self.Config.COLORS.nogc} has {chan.vote_against} votes against and {chan.vote_for} votes for. For this reason, it'll be kicked from the channel",
channel=channel
@@ -246,7 +228,7 @@ class Votekick():
self.Protocol.send2socket(f":{dnickname} KICK {channel} {target_user} Following the vote, you are not welcome in {channel}")
self.Channel.delete_user_from_channel(channel, self.User.get_uid(target_user))
elif chan.vote_for <= chan.vote_against:
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=dnickname,
msg=f"User {self.Config.COLORS.bold}{target_user}{self.Config.COLORS.nogc} has {chan.vote_against} votes against and {chan.vote_for} votes for. For this reason, it\'ll remain in the channel",
channel=channel
@@ -254,7 +236,7 @@ class Votekick():
# Init the system
if self.init_vote_system(channel):
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=dnickname,
msg="System vote re initiated",
channel=channel
@@ -274,7 +256,7 @@ class Votekick():
except Exception as err:
self.Logs.error(f"General Error: {err}")
def _hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
def hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
# cmd is the command starting from the user command
# full cmd is sending the entire server response
@@ -288,14 +270,14 @@ class Votekick():
case 'vote':
if len(cmd) == 1:
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote activate #channel')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote deactivate #channel')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote +')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote -')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote cancel')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote status')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote submit nickname')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote verdict')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote activate #channel')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote deactivate #channel')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote +')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote -')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote cancel')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote status')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote submit nickname')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote verdict')
return None
option = str(cmd[1]).lower()
@@ -306,12 +288,12 @@ class Votekick():
try:
# vote activate #channel
if self.Admin.get_Admin(fromuser) is None:
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' :Your are not allowed to execute this command')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' :Your are not allowed to execute this command')
return None
sentchannel = str(cmd[2]).lower() if self.Channel.Is_Channel(str(cmd[2]).lower()) else None
if sentchannel is None:
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f" The correct command is {self.Config.SERVICE_PREFIX}{command} {option} #CHANNEL")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f" The correct command is {self.Config.SERVICE_PREFIX}{command} {option} #CHANNEL")
self.insert_vote_channel(
self.VoteChannelModel(
@@ -325,30 +307,30 @@ class Votekick():
self.Channel.db_query_channel('add', self.module_name, sentchannel)
self.Protocol.sendChanJoin(uidornickname=dnickname, channel=sentchannel)
self.Protocol.send_join_chan(uidornickname=dnickname, channel=sentchannel)
self.Protocol.send2socket(f":{dnickname} SAMODE {sentchannel} +o {dnickname}")
self.Protocol.sendPrivMsg(nick_from=dnickname,
self.Protocol.send_priv_msg(nick_from=dnickname,
msg="You can now use !submit <nickname> to decide if he will stay or not on this channel ",
channel=sentchannel
)
except Exception as err:
self.Logs.error(f'{err}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} {command} {option} #channel')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' Exemple /msg {dnickname} {command} {option} #welcome')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} {command} {option} #channel')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' Exemple /msg {dnickname} {command} {option} #welcome')
case 'deactivate':
try:
# vote deactivate #channel
if self.Admin.get_Admin(fromuser) is None:
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f" Your are not allowed to execute this command")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f" Your are not allowed to execute this command")
return None
sentchannel = str(cmd[2]).lower() if self.Channel.Is_Channel(str(cmd[2]).lower()) else None
if sentchannel is None:
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f" The correct command is {self.Config.SERVICE_PREFIX}{command} {option} #CHANNEL")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f" The correct command is {self.Config.SERVICE_PREFIX}{command} {option} #CHANNEL")
self.Protocol.send2socket(f":{dnickname} SAMODE {sentchannel} -o {dnickname}")
self.Protocol.sendChanPart(uidornickname=dnickname, channel=sentchannel)
self.Protocol.send_part_chan(uidornickname=dnickname, channel=sentchannel)
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == sentchannel:
@@ -358,8 +340,8 @@ class Votekick():
self.Logs.debug(f"The Channel {sentchannel} has been deactivated from the vote system")
except Exception as err:
self.Logs.error(f'{err}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f" /msg {dnickname} {command} {option} #channel")
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f" Exemple /msg {dnickname} {command} {option} #welcome")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f" /msg {dnickname} {command} {option} #channel")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f" Exemple /msg {dnickname} {command} {option} #welcome")
case '+':
try:
@@ -368,21 +350,21 @@ class Votekick():
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
if fromuser in chan.voter_users:
self.Protocol.sendPrivMsg(nick_from=dnickname,
self.Protocol.send_priv_msg(nick_from=dnickname,
msg="You already submitted a vote",
channel=channel
)
else:
chan.vote_for += 1
chan.voter_users.append(fromuser)
self.Protocol.sendPrivMsg(nick_from=dnickname,
self.Protocol.send_priv_msg(nick_from=dnickname,
msg="Vote recorded, thank you",
channel=channel
)
except Exception as err:
self.Logs.error(f'{err}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} {command} {option}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' Exemple /msg {dnickname} {command} {option}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} {command} {option}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' Exemple /msg {dnickname} {command} {option}')
case '-':
try:
@@ -391,65 +373,65 @@ class Votekick():
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
if fromuser in chan.voter_users:
self.Protocol.sendPrivMsg(nick_from=dnickname,
self.Protocol.send_priv_msg(nick_from=dnickname,
msg="You already submitted a vote",
channel=channel
)
else:
chan.vote_against += 1
chan.voter_users.append(fromuser)
self.Protocol.sendPrivMsg(nick_from=dnickname,
self.Protocol.send_priv_msg(nick_from=dnickname,
msg="Vote recorded, thank you",
channel=channel
)
except Exception as err:
self.Logs.error(f'{err}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} {command} {option}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' Exemple /msg {dnickname} {command} {option}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} {command} {option}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' Exemple /msg {dnickname} {command} {option}')
case 'cancel':
try:
# vote cancel
if self.Admin.get_Admin(fromuser) is None:
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' Your are not allowed to execute this command')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' Your are not allowed to execute this command')
return None
if channel is None:
self.Logs.error(f"The channel is not known, defender can't cancel the vote")
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' You need to specify the channel => /msg {dnickname} vote_cancel #channel')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' You need to specify the channel => /msg {dnickname} vote_cancel #channel')
for vote in self.VOTE_CHANNEL_DB:
if vote.channel_name == channel:
self.init_vote_system(channel)
self.Protocol.sendPrivMsg(nick_from=dnickname,
self.Protocol.send_priv_msg(nick_from=dnickname,
msg="Vote system re-initiated",
channel=channel
)
except Exception as err:
self.Logs.error(f'{err}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} {command} {option}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' Exemple /msg {dnickname} {command} {option}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} {command} {option}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' Exemple /msg {dnickname} {command} {option}')
case 'status':
try:
# vote status
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
self.Protocol.sendPrivMsg(nick_from=dnickname,
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"Channel: {chan.channel_name} | Target: {self.User.get_nickname(chan.target_user)} | For: {chan.vote_for} | Against: {chan.vote_against} | Number of voters: {str(len(chan.voter_users))}",
channel=channel
)
except Exception as err:
self.Logs.error(f'{err}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} {command} {option}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' Exemple /msg {dnickname} {command} {option}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} {command} {option}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' Exemple /msg {dnickname} {command} {option}')
case 'submit':
try:
# vote submit nickname
if self.Admin.get_Admin(fromuser) is None:
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' Your are not allowed to execute this command')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' Your are not allowed to execute this command')
return None
nickname_submitted = cmd[2]
@@ -462,7 +444,7 @@ class Votekick():
if vote.channel_name == channel:
ongoing_user = self.User.get_nickname(vote.target_user)
self.Protocol.sendPrivMsg(nick_from=dnickname,
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"There is an ongoing vote on {ongoing_user}",
channel=channel
)
@@ -470,7 +452,7 @@ class Votekick():
# check if the user exist
if user_submitted is None:
self.Protocol.sendPrivMsg(nick_from=dnickname,
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"This nickname <{nickname_submitted}> do not exist",
channel=channel
)
@@ -479,7 +461,7 @@ class Votekick():
uid_cleaned = self.Base.clean_uid(uid_submitted)
ChannelInfo = self.Channel.get_Channel(channel)
if ChannelInfo is None:
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' This channel [{channel}] do not exist in the Channel Object')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' This channel [{channel}] do not exist in the Channel Object')
return False
clean_uids_in_channel: list = []
@@ -487,7 +469,7 @@ class Votekick():
clean_uids_in_channel.append(self.Base.clean_uid(uid))
if not uid_cleaned in clean_uids_in_channel:
self.Protocol.sendPrivMsg(nick_from=dnickname,
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"This nickname <{nickname_submitted}> is not available in this channel",
channel=channel
)
@@ -497,7 +479,7 @@ class Votekick():
pattern = fr'[o|B|S]'
operator_user = re.findall(pattern, user_submitted.umodes)
if operator_user:
self.Protocol.sendPrivMsg(nick_from=dnickname,
self.Protocol.send_priv_msg(nick_from=dnickname,
msg="You cant vote for this user ! he/she is protected",
channel=channel
)
@@ -507,40 +489,40 @@ class Votekick():
if chan.channel_name == channel:
chan.target_user = self.User.get_uid(nickname_submitted)
self.Protocol.sendPrivMsg(nick_from=dnickname,
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"{nickname_submitted} has been targeted for a vote",
channel=channel
)
self.Base.create_timer(60, self.timer_vote_verdict, (channel, ))
self.Protocol.sendPrivMsg(nick_from=dnickname,
self.Protocol.send_priv_msg(nick_from=dnickname,
msg="This vote will end after 60 secondes",
channel=channel
)
except Exception as err:
self.Logs.error(f'{err}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} {command} {option} nickname')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' Exemple /msg {dnickname} {command} {option} adator')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} {command} {option} nickname')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' Exemple /msg {dnickname} {command} {option} adator')
case 'verdict':
try:
# vote verdict
if self.Admin.get_Admin(fromuser) is None:
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f'Your are not allowed to execute this command')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f'Your are not allowed to execute this command')
return None
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
target_user = self.User.get_nickname(chan.target_user)
if chan.vote_for > chan.vote_against:
self.Protocol.sendPrivMsg(nick_from=dnickname,
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"User {self.Config.COLORS.bold}{target_user}{self.Config.COLORS.nogc} has {chan.vote_against} votes against and {chan.vote_for} votes for. For this reason, it\'ll be kicked from the channel",
channel=channel
)
self.Protocol.send2socket(f":{dnickname} KICK {channel} {target_user} Following the vote, you are not welcome in {channel}")
elif chan.vote_for <= chan.vote_against:
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=dnickname,
msg=f"User {self.Config.COLORS.bold}{target_user}{self.Config.COLORS.nogc} has {chan.vote_against} votes against and {chan.vote_for} votes for. For this reason, it\'ll remain in the channel",
channel=channel
@@ -548,22 +530,22 @@ class Votekick():
# Init the system
if self.init_vote_system(channel):
self.Protocol.sendPrivMsg(
self.Protocol.send_priv_msg(
nick_from=dnickname,
msg="System vote re initiated",
channel=channel
)
except Exception as err:
self.Logs.error(f'{err}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} {command} {option}')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' Exemple /msg {dnickname} {command} {option}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} {command} {option}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' Exemple /msg {dnickname} {command} {option}')
case _:
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote activate #channel')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote deactivate #channel')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote +')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote -')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote cancel')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote status')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote submit nickname')
self.Protocol.sendNotice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote verdict')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote activate #channel')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote deactivate #channel')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote +')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote -')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote cancel')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote status')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote submit nickname')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser,msg=f' /msg {dnickname} vote verdict')

View File

@@ -1,9 +1,9 @@
{
"version": "6.0.0",
"version": "6.1.0",
"requests": "2.32.3",
"psutil": "6.0.0",
"unrealircd_rpc_py": "1.0.6",
"unrealircd_rpc_py": "1.0.7",
"sqlalchemy": "2.0.35",
"faker": "30.1.0"
}