mirror of
https://github.com/iio612/DEFENDER.git
synced 2026-02-13 19:24:23 +00:00
1355 lines
65 KiB
Python
1355 lines
65 KiB
Python
import ssl, re, importlib, sys, time, threading, socket
|
|
from ssl import SSLSocket
|
|
from datetime import datetime, timedelta
|
|
from typing import Union, Literal
|
|
from core.loadConf import Config
|
|
from core.Model import User, Admin, Channel, Clones
|
|
from core.base import Base
|
|
|
|
class Irc:
|
|
|
|
def __init__(self) -> 'Irc':
|
|
|
|
self.defender_connexion_datetime = datetime.now() # Date et heure de la premiere connexion de Defender
|
|
self.first_score: int = 100
|
|
self.loaded_classes:dict[str, 'Irc'] = {} # Definir la variable qui contiendra la liste modules chargés
|
|
self.beat = 30 # Lancer toutes les 30 secondes des actions de nettoyages
|
|
self.hb_active = True # Heartbeat active
|
|
self.HSID = '' # ID du serveur qui accueil le service ( Host Serveur Id )
|
|
self.IrcSocket:Union[socket.socket, SSLSocket] = None
|
|
|
|
self.INIT = 1 # Variable d'intialisation | 1 -> indique si le programme est en cours d'initialisation
|
|
self.RESTART = 0 # Variable pour le redemarrage du bot | 0 -> indique que le programme n'es pas en cours de redemarrage
|
|
self.CHARSET = ['utf-8', 'iso-8859-1'] # Charset utiliser pour décoder/encoder les messages
|
|
"""0: utf-8 | 1: iso-8859-1"""
|
|
|
|
self.SSL_VERSION = None # Version SSL
|
|
|
|
self.Config = Config().ConfigObject
|
|
|
|
# Liste des commandes internes du bot
|
|
self.commands_level = {
|
|
0: ['help', 'auth', 'copyright', 'uptime'],
|
|
1: ['load','reload','unload', 'deauth', 'checkversion'],
|
|
2: ['show_modules', 'show_timers', 'show_threads', 'show_channels', 'show_users', 'show_admins'],
|
|
3: ['quit', 'restart','addaccess','editaccess', 'delaccess']
|
|
}
|
|
|
|
# l'ensemble des commandes.
|
|
self.commands = []
|
|
for level, commands in self.commands_level.items():
|
|
for command in self.commands_level[level]:
|
|
self.commands.append(command)
|
|
|
|
self.Base = Base(self.Config)
|
|
self.User = User(self.Base)
|
|
self.Admin = Admin(self.Base)
|
|
self.Channel = Channel(self.Base)
|
|
self.Clones = Clones(self.Base)
|
|
|
|
self.__create_table()
|
|
self.Base.create_thread(func=self.heartbeat, func_args=(self.beat, ))
|
|
|
|
##############################################
|
|
# CONNEXION IRC #
|
|
##############################################
|
|
def init_irc(self, ircInstance:'Irc') -> None:
|
|
"""Create a socket and connect to irc server
|
|
|
|
Args:
|
|
ircInstance (Irc): Instance of Irc object.
|
|
"""
|
|
try:
|
|
self.__create_socket()
|
|
self.__connect_to_irc(ircInstance)
|
|
except AssertionError as ae:
|
|
self.Base.logs.critical(f'Assertion error: {ae}')
|
|
|
|
def __create_socket(self) -> None:
|
|
"""Create a socket to connect SSL or Normal connection
|
|
"""
|
|
try:
|
|
soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM or socket.SOCK_NONBLOCK)
|
|
connexion_information = (self.Config.SERVEUR_IP, self.Config.SERVEUR_PORT)
|
|
|
|
if self.Config.SERVEUR_SSL:
|
|
# Créer un object ssl
|
|
ssl_context = self.__ssl_context()
|
|
ssl_connexion = ssl_context.wrap_socket(soc, server_hostname=self.Config.SERVEUR_HOSTNAME)
|
|
ssl_connexion.connect(connexion_information)
|
|
self.IrcSocket:SSLSocket = ssl_connexion
|
|
self.SSL_VERSION = self.IrcSocket.version()
|
|
self.Base.logs.info(f"Connexion en mode SSL : Version = {self.SSL_VERSION}")
|
|
else:
|
|
soc.connect(connexion_information)
|
|
self.IrcSocket:socket.socket = soc
|
|
self.Base.logs.info("Connexion en mode normal")
|
|
|
|
return None
|
|
|
|
except ssl.SSLEOFError as soe:
|
|
self.Base.logs.critical(f"SSLEOFError __create_socket: {soe} - {soc.fileno()}")
|
|
except ssl.SSLError as se:
|
|
self.Base.logs.critical(f"SSLError __create_socket: {se} - {soc.fileno()}")
|
|
except OSError as oe:
|
|
self.Base.logs.critical(f"OSError __create_socket: {oe} - {soc.fileno()}")
|
|
except AttributeError as ae:
|
|
self.Base.logs.critical(f"AttributeError __create_socket: {ae} - {soc.fileno()}")
|
|
|
|
def __ssl_context(self) -> ssl.SSLContext:
|
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
|
|
self.Base.logs.debug(f'SSLContext initiated with verified mode {ctx.verify_mode}')
|
|
|
|
return ctx
|
|
|
|
def __connect_to_irc(self, ircInstance: 'Irc') -> None:
|
|
try:
|
|
self.ircObject = ircInstance # créer une copie de l'instance Irc
|
|
self.__link(self.IrcSocket) # établir la connexion au serveur IRC
|
|
self.signal = True # Une variable pour initier la boucle infinie
|
|
self.load_existing_modules() # Charger les modules existant dans la base de données
|
|
self.__join_saved_channels() # Join existing channels
|
|
|
|
while self.signal:
|
|
try:
|
|
if self.RESTART == 1:
|
|
self.Base.logs.debug('Restarting Defender ...')
|
|
self.IrcSocket.shutdown(socket.SHUT_RDWR)
|
|
self.IrcSocket.close()
|
|
|
|
while self.IrcSocket.fileno() != -1:
|
|
time.sleep(0.5)
|
|
self.Base.logs.warning('--> Waiting for socket to close ...')
|
|
|
|
# Reload configuration
|
|
self.Base.logs.debug('Reloading configuration')
|
|
self.Config = Config().ConfigObject
|
|
self.Base = Base(self.Config)
|
|
|
|
self.__create_socket()
|
|
self.__link(self.IrcSocket)
|
|
self.load_existing_modules()
|
|
self.RESTART = 0
|
|
|
|
# 4072 max what the socket can grab
|
|
buffer_size = self.IrcSocket.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
|
|
data_in_bytes = self.IrcSocket.recv(buffer_size)
|
|
data = data_in_bytes.splitlines(True)
|
|
count_bytes = len(data_in_bytes)
|
|
|
|
while count_bytes > 4070:
|
|
# If the received message is > 4070 then loop and add the value to the variable
|
|
new_data = self.IrcSocket.recv(buffer_size)
|
|
data_in_bytes += new_data
|
|
count_bytes = len(new_data)
|
|
|
|
data = data_in_bytes.splitlines(True)
|
|
|
|
if not data:
|
|
break
|
|
|
|
self.send_response(data)
|
|
|
|
except ssl.SSLEOFError as soe:
|
|
self.Base.logs.error(f"SSLEOFError __connect_to_irc: {soe} - {data}")
|
|
except ssl.SSLError as se:
|
|
self.Base.logs.error(f"SSLError __connect_to_irc: {se} - {data}")
|
|
except OSError as oe:
|
|
self.Base.logs.error(f"SSLError __connect_to_irc: {oe} - {data}")
|
|
|
|
self.IrcSocket.shutdown(socket.SHUT_RDWR)
|
|
self.IrcSocket.close()
|
|
self.Base.logs.info("--> Fermeture de Defender ...")
|
|
sys.exit(0)
|
|
|
|
except AssertionError as ae:
|
|
self.Base.logs.error(f'AssertionError: {ae}')
|
|
except ValueError as ve:
|
|
self.Base.logs.error(f'ValueError: {ve}')
|
|
except ssl.SSLEOFError as soe:
|
|
self.Base.logs.error(f"SSLEOFError: {soe}")
|
|
except AttributeError as atte:
|
|
self.Base.logs.critical(f"AttributeError: {atte}")
|
|
except Exception as e:
|
|
self.Base.logs.critical(f"Exception: {e}")
|
|
|
|
def __link(self, writer:Union[socket.socket, SSLSocket]) -> None:
|
|
"""Créer le link et envoyer les informations nécessaires pour la
|
|
connexion au serveur.
|
|
|
|
Args:
|
|
writer (StreamWriter): permet l'envoi des informations au serveur.
|
|
"""
|
|
try:
|
|
nickname = self.Config.SERVICE_NICKNAME
|
|
username = self.Config.SERVICE_USERNAME
|
|
realname = self.Config.SERVICE_REALNAME
|
|
chan = self.Config.SERVICE_CHANLOG
|
|
info = self.Config.SERVICE_INFO
|
|
smodes = self.Config.SERVICE_SMODES
|
|
cmodes = self.Config.SERVICE_CMODES
|
|
umodes = self.Config.SERVICE_UMODES
|
|
host = self.Config.SERVICE_HOST
|
|
service_name = self.Config.SERVICE_NAME
|
|
|
|
password = self.Config.SERVEUR_PASSWORD
|
|
link = self.Config.SERVEUR_LINK
|
|
sid = self.Config.SERVEUR_ID
|
|
service_id = self.Config.SERVICE_ID
|
|
|
|
version = self.Config.current_version
|
|
unixtime = self.Base.get_unixtime()
|
|
charset = self.CHARSET[0]
|
|
|
|
# Envoyer un message d'identification
|
|
writer.send(f":{sid} PASS :{password}\r\n".encode(charset))
|
|
writer.send(f":{sid} PROTOCTL SID NOQUIT NICKv2 SJOIN SJ3 NICKIP TKLEXT2 NEXTBANS CLK EXTSWHOIS MLOCK MTAGS\r\n".encode(charset))
|
|
# writer.send(f":{sid} PROTOCTL NICKv2 VHP UMODE2 NICKIP SJOIN SJOIN2 SJ3 NOQUIT TKLEXT MLOCK SID MTAGS\r\n".encode(charset))
|
|
writer.send(f":{sid} PROTOCTL EAUTH={link},,,{service_name}-v{version}\r\n".encode(charset))
|
|
writer.send(f":{sid} PROTOCTL SID={sid}\r\n".encode(charset))
|
|
writer.send(f":{sid} SERVER {link} 1 :{info}\r\n".encode(charset))
|
|
writer.send(f":{sid} {nickname} :Reserved for services\r\n".encode(charset))
|
|
writer.send(f":{sid} UID {nickname} 1 {unixtime} {username} {host} {service_id} * {smodes} * * * :{realname}\r\n".encode(charset))
|
|
writer.send(f":{sid} SJOIN {unixtime} {chan} + :{service_id}\r\n".encode(charset))
|
|
writer.send(f":{sid} TKL + Q * {nickname} {host} 0 {unixtime} :Reserved for services\r\n".encode(charset))
|
|
|
|
writer.send(f":{service_id} MODE {chan} +{cmodes}\r\n".encode(charset))
|
|
writer.send(f":{service_id} MODE {chan} +{umodes} {service_id}\r\n".encode(charset))
|
|
|
|
self.Base.logs.debug('Link information sent to the server')
|
|
|
|
return None
|
|
except AttributeError as ae:
|
|
self.Base.logs.critical(f'{ae}')
|
|
|
|
def __join_saved_channels(self) -> None:
|
|
"""## Joining saved channels"""
|
|
core_table = self.Config.table_channel
|
|
|
|
query = f'''SELECT distinct channel_name FROM {core_table}'''
|
|
exec_query = self.Base.db_execute_query(query)
|
|
result_query = exec_query.fetchall()
|
|
|
|
if result_query:
|
|
for chan_name in result_query:
|
|
chan = chan_name[0]
|
|
self.send2socket(f":{self.Config.SERVEUR_ID} SJOIN {self.Base.get_unixtime()} {chan} + :{self.Config.SERVICE_ID}")
|
|
|
|
def send2socket(self, send_message:str) -> None:
|
|
"""Envoit les commandes à envoyer au serveur.
|
|
|
|
Args:
|
|
string (Str): contient la commande à envoyer au serveur.
|
|
"""
|
|
try:
|
|
with self.Base.lock:
|
|
# print(f">{str(send_message)}")
|
|
self.IrcSocket.send(f"{send_message}\r\n".encode(self.CHARSET[0]))
|
|
self.Base.logs.debug(f'{send_message}')
|
|
|
|
except UnicodeDecodeError:
|
|
self.Base.logs.error(f'Decode Error try iso-8859-1 - message: {send_message}')
|
|
self.IrcSocket.send(f"{send_message}\r\n".encode(self.CHARSET[0],'replace'))
|
|
except UnicodeEncodeError:
|
|
self.Base.logs.error(f'Encode Error try iso-8859-1 - message: {send_message}')
|
|
self.IrcSocket.send(f"{send_message}\r\n".encode(self.CHARSET[0],'replace'))
|
|
except AssertionError as ae:
|
|
self.Base.logs.warning(f'Assertion Error {ae} - message: {send_message}')
|
|
except ssl.SSLEOFError as soe:
|
|
self.Base.logs.error(f"SSLEOFError: {soe} - {send_message}")
|
|
except ssl.SSLError as se:
|
|
self.Base.logs.error(f"SSLError: {se} - {send_message}")
|
|
except OSError as oe:
|
|
self.Base.logs.error(f"OSError: {oe} - {send_message}")
|
|
|
|
def send_response(self, responses:list[bytes]) -> None:
|
|
try:
|
|
# print(data)
|
|
for data in responses:
|
|
response = data.decode(self.CHARSET[0]).split()
|
|
self.cmd(response)
|
|
|
|
except UnicodeEncodeError:
|
|
for data in responses:
|
|
response = data.decode(self.CHARSET[1],'replace').split()
|
|
self.cmd(response)
|
|
except UnicodeDecodeError:
|
|
for data in responses:
|
|
response = data.decode(self.CHARSET[1],'replace').split()
|
|
self.cmd(response)
|
|
except AssertionError as ae:
|
|
self.Base.logs.error(f"Assertion error : {ae}")
|
|
|
|
def unload(self) -> None:
|
|
# This is only to reference the method
|
|
return None
|
|
|
|
##############################################
|
|
# FIN CONNEXION IRC #
|
|
##############################################
|
|
|
|
def __create_table(self):
|
|
"""## Create core tables
|
|
"""
|
|
pass
|
|
|
|
def load_existing_modules(self) -> None:
|
|
"""Charge les modules qui existe déja dans la base de données
|
|
|
|
Returns:
|
|
None: Aucun retour requis, elle charge puis c'est tout
|
|
"""
|
|
result = self.Base.db_execute_query(f"SELECT module_name FROM {self.Config.table_module}")
|
|
for r in result.fetchall():
|
|
self.load_module('sys', r[0], True)
|
|
|
|
return None
|
|
|
|
def get_defender_uptime(self) -> str:
|
|
"""Savoir depuis quand Defender est connecté
|
|
|
|
Returns:
|
|
str: L'écart entre la date du jour et celle de la connexion de Defender
|
|
"""
|
|
current_datetime = datetime.now()
|
|
diff_date = current_datetime - self.defender_connexion_datetime
|
|
uptime = timedelta(days=diff_date.days, seconds=diff_date.seconds)
|
|
|
|
return uptime
|
|
|
|
def heartbeat(self, beat:float) -> None:
|
|
"""Execute certaines commandes de nettoyage toutes les x secondes
|
|
x étant définit a l'initialisation de cette class (self.beat)
|
|
|
|
Args:
|
|
beat (float): Nombre de secondes entre chaque exécution
|
|
"""
|
|
while self.hb_active:
|
|
time.sleep(beat)
|
|
service_id = self.Config.SERVICE_ID
|
|
hsid = self.HSID
|
|
# self.send2socket(f':{service_id} PING :{hsid}')
|
|
self.Base.execute_periodic_action()
|
|
|
|
def create_ping_timer(self, time_to_wait:float, class_name:str, method_name: str, method_args: list=[]) -> None:
|
|
# 1. Timer créer
|
|
# 1.1 Créer la fonction a executer
|
|
# 1.2 Envoyer le ping une fois le timer terminer
|
|
# 2. Executer la fonction
|
|
try:
|
|
if not class_name in self.loaded_classes:
|
|
self.Base.logs.error(f"La class [{class_name} n'existe pas !!]")
|
|
return False
|
|
|
|
class_instance = self.loaded_classes[class_name]
|
|
|
|
t = threading.Timer(interval=time_to_wait, function=self.__create_tasks, args=(class_instance, method_name, method_args))
|
|
t.start()
|
|
|
|
self.Base.running_timers.append(t)
|
|
|
|
self.Base.logs.debug(f"Timer ID : {str(t.ident)} | Running Threads : {len(threading.enumerate())}")
|
|
|
|
except AssertionError as ae:
|
|
self.Base.logs.error(f'Assertion Error -> {ae}')
|
|
except TypeError as te:
|
|
self.Base.logs.error(f"Type error -> {te}")
|
|
|
|
def __create_tasks(self, obj: object, method_name: str, param:list) -> None:
|
|
"""#### Ajouter les méthodes a éxecuter dans un dictionnaire
|
|
|
|
Args:
|
|
obj (object): Une instance de la classe qui va etre executer
|
|
method_name (str): Le nom de la méthode a executer
|
|
param (list): les parametres a faire passer
|
|
|
|
Returns:
|
|
None: aucun retour attendu
|
|
"""
|
|
self.Base.periodic_func[len(self.Base.periodic_func) + 1] = {
|
|
'object': obj,
|
|
'method_name': method_name,
|
|
'param': param
|
|
}
|
|
|
|
self.Base.logs.debug(f'Function to execute : {str(self.Base.periodic_func)}')
|
|
self.send_ping_to_sereur()
|
|
return None
|
|
|
|
def send_ping_to_sereur(self) -> None:
|
|
"""### Envoyer un PING au serveur
|
|
"""
|
|
service_id = self.Config.SERVICE_ID
|
|
hsid = self.HSID
|
|
self.send2socket(f':{service_id} PING :{hsid}')
|
|
|
|
return None
|
|
|
|
def load_module(self, fromuser:str, module_name:str, init:bool = False) -> bool:
|
|
try:
|
|
# module_name : mod_voice
|
|
module_name = module_name.lower()
|
|
class_name = module_name.split('_')[1].capitalize() # ==> Voice
|
|
|
|
# print(self.loaded_classes)
|
|
|
|
# Si le module est déja chargé
|
|
if 'mods.' + module_name in sys.modules:
|
|
self.Base.logs.info("Module déja chargé ...")
|
|
self.Base.logs.info('module name = ' + module_name)
|
|
if class_name in self.loaded_classes:
|
|
# Si le module existe dans la variable globale retourne False
|
|
self.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} :Le module {module_name} est déja chargé ! si vous souhaiter le recharge tapez {self.Config.SERVICE_PREFIX}reload {module_name}")
|
|
return False
|
|
|
|
the_module = sys.modules['mods.' + module_name]
|
|
importlib.reload(the_module)
|
|
my_class = getattr(the_module, class_name, None)
|
|
new_instance = my_class(self.ircObject)
|
|
self.loaded_classes[class_name] = new_instance
|
|
|
|
# Créer le module dans la base de données
|
|
if not init:
|
|
self.Base.db_record_module(fromuser, module_name)
|
|
|
|
self.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} :Module {module_name} chargé")
|
|
return False
|
|
|
|
# Charger le module
|
|
loaded_module = importlib.import_module(f"mods.{module_name}")
|
|
|
|
my_class = getattr(loaded_module, class_name, None) # Récuperer le nom de classe
|
|
create_instance_of_the_class = my_class(self.ircObject) # Créer une nouvelle instance de la classe
|
|
|
|
if not hasattr(create_instance_of_the_class, 'cmd'):
|
|
self.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} :Module {module_name} ne contient pas de méthode cmd")
|
|
self.Base.logs.critical(f"The Module {module_name} has not been loaded because cmd method is not available")
|
|
self.Base.db_delete_module(module_name)
|
|
return False
|
|
|
|
# Charger la nouvelle class dans la variable globale
|
|
self.loaded_classes[class_name] = create_instance_of_the_class
|
|
|
|
# Enregistrer le module dans la base de données
|
|
if not init:
|
|
self.Base.db_record_module(fromuser, module_name)
|
|
self.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} :Module {module_name} chargé")
|
|
|
|
self.Base.logs.info(self.loaded_classes)
|
|
return True
|
|
|
|
except ModuleNotFoundError as moduleNotFound:
|
|
self.Base.logs.error(f"MODULE_NOT_FOUND: {moduleNotFound}")
|
|
self.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} :[ {self.Config.CONFIG_COLOR['rouge']}MODULE_NOT_FOUND{self.Config.CONFIG_COLOR['noire']} ]: {moduleNotFound}")
|
|
except Exception as e:
|
|
self.Base.logs.error(f"Something went wrong with a module you want to load : {e}")
|
|
self.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} :[ {self.Config.CONFIG_COLOR['rouge']}ERROR{self.Config.CONFIG_COLOR['noire']} ]: {e}")
|
|
|
|
def insert_db_admin(self, uid:str, level:int) -> None:
|
|
|
|
if self.User.get_User(uid) is None:
|
|
return None
|
|
|
|
getUser = self.User.get_User(uid)
|
|
|
|
nickname = getUser.nickname
|
|
username = getUser.username
|
|
hostname = getUser.hostname
|
|
umodes = getUser.umodes
|
|
vhost = getUser.vhost
|
|
level = int(level)
|
|
|
|
self.Admin.insert(
|
|
self.Admin.AdminModel(
|
|
uid=uid,
|
|
nickname=nickname,
|
|
username=username,
|
|
hostname=hostname,
|
|
umodes=umodes,
|
|
vhost=vhost,
|
|
level=level,
|
|
connexion_datetime=datetime.now()
|
|
)
|
|
)
|
|
|
|
return None
|
|
|
|
def delete_db_admin(self, uid:str) -> None:
|
|
|
|
if self.Admin.get_Admin(uid) is None:
|
|
return None
|
|
|
|
if not self.Admin.delete(uid):
|
|
self.Base.logs.critical(f'UID: {uid} was not deleted')
|
|
|
|
return None
|
|
|
|
def create_defender_user(self, nickname:str, level: int, password:str) -> str:
|
|
|
|
get_user = self.User.get_User(nickname)
|
|
if get_user is None:
|
|
response = f'This nickname {nickname} does not exist, it is not possible to create this user'
|
|
self.Base.logs.warning(response)
|
|
return response
|
|
|
|
nickname = get_user.nickname
|
|
response = ''
|
|
|
|
if level > 4:
|
|
response = "Impossible d'ajouter un niveau > 4"
|
|
self.Base.logs.warning(response)
|
|
return response
|
|
|
|
hostname = get_user.hostname
|
|
vhost = get_user.vhost
|
|
spassword = self.Base.crypt_password(password)
|
|
|
|
mes_donnees = {'admin': nickname}
|
|
query_search_user = f"SELECT id FROM {self.Config.table_admin} WHERE user=:admin"
|
|
r = self.Base.db_execute_query(query_search_user, mes_donnees)
|
|
exist_user = r.fetchone()
|
|
|
|
# On verifie si le user exist dans la base
|
|
if not exist_user:
|
|
mes_donnees = {'datetime': self.Base.get_datetime(), 'user': nickname, 'password': spassword, 'hostname': hostname, 'vhost': vhost, 'level': level}
|
|
self.Base.db_execute_query(f'''INSERT INTO {self.Config.table_admin}
|
|
(createdOn, user, password, hostname, vhost, level) VALUES
|
|
(:datetime, :user, :password, :hostname, :vhost, :level)
|
|
''', mes_donnees)
|
|
response = f"{nickname} ajouté en tant qu'administrateur de niveau {level}"
|
|
self.send2socket(f':{self.Config.SERVICE_NICKNAME} NOTICE {nickname} : {response}')
|
|
self.Base.logs.info(response)
|
|
return response
|
|
else:
|
|
response = f'{nickname} Existe déjà dans les users enregistrés'
|
|
self.send2socket(f':{self.Config.SERVICE_NICKNAME} NOTICE {nickname} : {response}')
|
|
self.Base.logs.info(response)
|
|
return response
|
|
|
|
def is_cmd_allowed(self, nickname:str, cmd:str) -> bool:
|
|
|
|
# Vérifier si le user est identifié et si il a les droits
|
|
is_command_allowed = False
|
|
uid = self.User.get_uid(nickname)
|
|
get_admin = self.Admin.get_Admin(uid)
|
|
|
|
if not get_admin is None:
|
|
admin_level = get_admin.level
|
|
|
|
for ref_level, ref_commands in self.commands_level.items():
|
|
# print(f"LevelNo: {ref_level} - {ref_commands} - {admin_level}")
|
|
if ref_level <= int(admin_level):
|
|
# print(f"LevelNo: {ref_level} - {ref_commands}")
|
|
if cmd in ref_commands:
|
|
is_command_allowed = True
|
|
else:
|
|
for ref_level, ref_commands in self.commands_level.items():
|
|
if ref_level == 0:
|
|
# print(f"LevelNo: {ref_level} - {ref_commands}")
|
|
if cmd in ref_commands:
|
|
is_command_allowed = True
|
|
|
|
return is_command_allowed
|
|
|
|
def debug(self, debug_msg:str) -> None:
|
|
|
|
# if self.Config.DEBUG == 1:
|
|
# if type(debug_msg) == list:
|
|
# if debug_msg[0] != 'PING':
|
|
# print(f"[{self.Base.get_datetime()}] - {debug_msg}")
|
|
# else:
|
|
#
|
|
print(f"[{self.Base.get_datetime()}] - {debug_msg}")
|
|
|
|
return None
|
|
|
|
def logs(self, log_msg:str) -> None:
|
|
|
|
mes_donnees = {'datetime': self.Base.get_datetime(), 'server_msg': log_msg}
|
|
self.Base.db_execute_query('INSERT INTO sys_logs (datetime, server_msg) VALUES (:datetime, :server_msg)', mes_donnees)
|
|
|
|
return None
|
|
|
|
def thread_check_for_new_version(self, fromuser: str) -> None:
|
|
|
|
dnickname = self.Config.SERVICE_NICKNAME
|
|
|
|
if self.Base.check_for_new_version(True):
|
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : New Version available : {self.Config.current_version} >>> {self.Config.latest_version}')
|
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : Please run (git pull origin main) in the current folder')
|
|
else:
|
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : You have the latest version of defender')
|
|
|
|
return None
|
|
|
|
def cmd(self, data:list) -> None:
|
|
try:
|
|
|
|
cmd_to_send:list[str] = data.copy()
|
|
cmd = data.copy()
|
|
|
|
cmd_to_debug = data.copy()
|
|
cmd_to_debug.pop(0)
|
|
|
|
if len(cmd) == 0 or len(cmd) == 1:
|
|
self.Base.logs.warning(f'Size ({str(len(cmd))}) - {cmd}')
|
|
return False
|
|
|
|
# self.debug(cmd_to_debug)
|
|
if len(data) == 7:
|
|
if data[2] == 'PRIVMSG' and data[4] == ':auth':
|
|
data_copy = data.copy()
|
|
data_copy[6] = '**********'
|
|
self.Base.logs.debug(data_copy)
|
|
else:
|
|
self.Base.logs.debug(data)
|
|
else:
|
|
self.Base.logs.debug(data)
|
|
|
|
match cmd[0]:
|
|
|
|
case 'PING':
|
|
# Sending PONG response to the serveur
|
|
pong = str(cmd[1]).replace(':','')
|
|
self.send2socket(f"PONG :{pong}")
|
|
return None
|
|
|
|
case 'PROTOCTL':
|
|
#['PROTOCTL', 'CHANMODES=beI,fkL,lFH,cdimnprstzCDGKMNOPQRSTVZ', 'USERMODES=diopqrstwxzBDGHIRSTWZ', 'BOOTED=1702138935',
|
|
# 'PREFIX=(qaohv)~&@%+', 'SID=001', 'MLOCK', 'TS=1703793941', 'EXTSWHOIS']
|
|
|
|
# GET SERVER ID HOST
|
|
if len(cmd) > 5:
|
|
if '=' in cmd[5]:
|
|
serveur_hosting_id = str(cmd[5]).split('=')
|
|
self.HSID = serveur_hosting_id[1]
|
|
return False
|
|
|
|
case _:
|
|
pass
|
|
|
|
if len(cmd) < 2:
|
|
return False
|
|
|
|
match cmd[1]:
|
|
|
|
case 'SLOG':
|
|
# self.Base.scan_ports(cmd[7])
|
|
# if self.Config.ABUSEIPDB == 1:
|
|
# self.Base.create_thread(self.abuseipdb_scan, (cmd[7], ))
|
|
pass
|
|
|
|
case 'REPUTATION':
|
|
# :001 REPUTATION 91.168.141.239 118
|
|
try:
|
|
# if self.Config.ABUSEIPDB == 1:
|
|
# self.Base.create_thread(self.abuseipdb_scan, (cmd[2], ))
|
|
self.first_connexion_ip = cmd[2]
|
|
|
|
self.first_score = 0
|
|
if str(cmd[3]).find('*') != -1:
|
|
# If * available, it means that an ircop changed the repurtation score
|
|
# means also that the user exist will try to update all users with same IP
|
|
self.first_score = int(str(cmd[3]).replace('*',''))
|
|
for user in self.User.UID_DB:
|
|
if user.remote_ip == self.first_connexion_ip:
|
|
user.score_connexion = self.first_score
|
|
else:
|
|
self.first_score = int(cmd[3])
|
|
|
|
# Possibilité de déclancher les bans a ce niveau.
|
|
except IndexError as ie:
|
|
self.Base.logs.error(f'{ie}')
|
|
except ValueError as ve:
|
|
self.first_score = 0
|
|
self.Base.logs.error(f'Impossible to convert first_score: {ve}')
|
|
|
|
case '320':
|
|
#:irc.deb.biz.st 320 PyDefender IRCParis07 :is in security-groups: known-users,webirc-users,tls-and-known-users,tls-users
|
|
pass
|
|
|
|
case '318':
|
|
#:irc.deb.biz.st 318 PyDefender IRCParis93 :End of /WHOIS list.
|
|
pass
|
|
|
|
case 'MD':
|
|
# [':001', 'MD', 'client', '001CG0TG7', 'webirc', ':2']
|
|
pass
|
|
|
|
case 'EOS':
|
|
|
|
hsid = str(cmd[0]).replace(':','')
|
|
if hsid == self.HSID:
|
|
if self.INIT == 1:
|
|
current_version = self.Config.current_version
|
|
latest_version = self.Config.latest_version
|
|
if self.Base.check_for_new_version(False):
|
|
version = f'{current_version} >>> {latest_version}'
|
|
else:
|
|
version = f'{current_version}'
|
|
|
|
# self.send2socket(f":{self.Config.SERVICE_NICKNAME} SVSJOIN {self.Config.SERVICE_NICKNAME} {self.Config.SERVICE_CHANLOG}")
|
|
# self.send2socket(f":{self.Config.SERVICE_NICKNAME} MODE {self.Config.SERVICE_CHANLOG} +o {self.Config.SERVICE_NICKNAME}")
|
|
# self.send2socket(f":{self.Config.SERVICE_NICKNAME} MODE {self.Config.SERVICE_CHANLOG} +{self.Config.SERVICE_CMODES}")
|
|
|
|
print(f"################### DEFENDER ###################")
|
|
print(f"# SERVICE CONNECTE ")
|
|
print(f"# SERVEUR : {self.Config.SERVEUR_IP} ")
|
|
print(f"# PORT : {self.Config.SERVEUR_PORT} ")
|
|
print(f"# SSL : {self.Config.SERVEUR_SSL} ")
|
|
print(f"# SSL VER : {self.SSL_VERSION} ")
|
|
print(f"# NICKNAME : {self.Config.SERVICE_NICKNAME} ")
|
|
print(f"# CHANNEL : {self.Config.SERVICE_CHANLOG} ")
|
|
print(f"# VERSION : {version} ")
|
|
print(f"################################################")
|
|
|
|
self.Base.logs.info(f"################### DEFENDER ###################")
|
|
self.Base.logs.info(f"# SERVICE CONNECTE ")
|
|
self.Base.logs.info(f"# SERVEUR : {self.Config.SERVEUR_IP} ")
|
|
self.Base.logs.info(f"# PORT : {self.Config.SERVEUR_PORT} ")
|
|
self.Base.logs.info(f"# SSL : {self.Config.SERVEUR_SSL} ")
|
|
self.Base.logs.info(f"# SSL VER : {self.SSL_VERSION} ")
|
|
self.Base.logs.info(f"# NICKNAME : {self.Config.SERVICE_NICKNAME} ")
|
|
self.Base.logs.info(f"# CHANNEL : {self.Config.SERVICE_CHANLOG} ")
|
|
self.Base.logs.info(f"# VERSION : {version} ")
|
|
self.Base.logs.info(f"################################################")
|
|
|
|
if self.Base.check_for_new_version(False):
|
|
self.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} : New Version available {version}")
|
|
|
|
# Initialisation terminé aprés le premier PING
|
|
self.INIT = 0
|
|
|
|
case _:
|
|
pass
|
|
|
|
if len(cmd) < 3:
|
|
return False
|
|
|
|
match cmd[2]:
|
|
|
|
case 'QUIT':
|
|
# :001N1WD7L QUIT :Quit: free_znc_1
|
|
cmd.pop(0)
|
|
uid_who_quit = str(cmd[0]).replace(':', '')
|
|
self.User.delete(uid_who_quit)
|
|
self.Channel.delete_user_from_all_channel(uid_who_quit)
|
|
|
|
case 'PONG':
|
|
# ['@msgid=aTNJhp17kcPboF5diQqkUL;time=2023-12-28T20:35:58.411Z', ':irc.deb.biz.st', 'PONG', 'irc.deb.biz.st', ':Dev-PyDefender']
|
|
self.Base.execute_periodic_action()
|
|
|
|
case 'NICK':
|
|
# ['@unrealircd.org/geoip=FR;unrealircd.org/', ':001OOU2H3', 'NICK', 'WebIrc', '1703795844']
|
|
# Changement de nickname
|
|
|
|
# Supprimer la premiere valeur de la liste
|
|
cmd.pop(0)
|
|
uid = str(cmd[0]).replace(':','')
|
|
newnickname = cmd[2]
|
|
self.User.update(uid, newnickname)
|
|
|
|
case 'MODE':
|
|
#['@msgid=d0ySx56Yd0nc35oHts2SkC-/J9mVUA1hfM6+Z4494xWUg;time=2024-08-09T12:45:36.651Z',
|
|
# ':001', 'MODE', '#a', '+nt', '1723207536']
|
|
pass
|
|
|
|
case 'SJOIN':
|
|
# ['@msgid=5sTwGdj349D82L96p749SY;time=2024-08-15T09:50:23.528Z', ':001', 'SJOIN', '1721564574', '#welcome', ':001JD94QH']
|
|
# ['@msgid=bvceb6HthbLJapgGLXn1b0;time=2024-08-15T09:50:11.464Z', ':001', 'SJOIN', '1721564574', '#welcome', '+lnrt', '13', ':001CIVLQF', '+11ZAAAAAB', '001QGR10C', '*@0014UE10B', '001NL1O07', '001SWZR05', '001HB8G04', '@00BAAAAAJ', '0019M7101']
|
|
# ['@msgid=SKUeuVzOrTShRDduq8VerX;time=2024-08-23T19:37:04.266Z', ':001', 'SJOIN', '1723993047', '#welcome', '+lnrt', '13',
|
|
# ':001T6VU3F', '001JGWB2K', '@11ZAAAAAB',
|
|
# '001F16WGR', '001X9YMGQ', '*+001DYPFGP', '@00BAAAAAJ', '001AAGOG9', '001FMFVG8', '001DAEEG7',
|
|
# '&~G:unknown-users', '"~G:websocket-users', '"~G:known-users', '"~G:webirc-users']
|
|
cmd.pop(0)
|
|
channel = str(cmd[3]).lower()
|
|
len_cmd = len(cmd)
|
|
list_users:list = []
|
|
occurence = 0
|
|
start_boucle = 0
|
|
|
|
# Trouver le premier user
|
|
for i in range(len_cmd):
|
|
s: list = re.findall(fr':', cmd[i])
|
|
if s:
|
|
occurence += 1
|
|
if occurence == 2:
|
|
start_boucle = i
|
|
|
|
# Boucle qui va ajouter l'ensemble des users (UID)
|
|
for i in range(start_boucle, len(cmd)):
|
|
parsed_UID = str(cmd[i])
|
|
# pattern = fr'[:|@|%|\+|~|\*]*'
|
|
# pattern = fr':'
|
|
# parsed_UID = re.sub(pattern, '', parsed_UID)
|
|
clean_uid = self.Base.clean_uid(parsed_UID)
|
|
if len(clean_uid) == 9:
|
|
list_users.append(parsed_UID)
|
|
|
|
self.Channel.insert(
|
|
self.Channel.ChannelModel(
|
|
name=channel,
|
|
uids=list_users
|
|
)
|
|
)
|
|
|
|
case 'PART':
|
|
# ['@unrealircd.org/geoip=FR;unrealircd.org/userhost=50d6492c@80.214.73.44;unrealircd.org/userip=50d6492c@80.214.73.44;msgid=YSIPB9q4PcRu0EVfC9ci7y-/mZT0+Gj5FLiDSZshH5NCw;time=2024-08-15T15:35:53.772Z',
|
|
# ':001EPFBRD', 'PART', '#welcome', ':WEB', 'IRC', 'Paris']
|
|
uid = str(cmd[1]).replace(':','')
|
|
channel = str(cmd[3]).lower()
|
|
self.Channel.delete_user_from_channel(channel, uid)
|
|
|
|
pass
|
|
|
|
case 'UID':
|
|
# ['@s2s-md/geoip=cc=GB|cd=United\\sKingdom|asn=16276|asname=OVH\\sSAS;s2s-md/tls_cipher=TLSv1.3-TLS_CHACHA20_POLY1305_SHA256;s2s-md/creationtime=1721564601',
|
|
# ':001', 'UID', 'albatros', '0', '1721564597', 'albatros', 'vps-91b2f28b.vps.ovh.net',
|
|
# '001HB8G04', '0', '+iwxz', 'Clk-A62F1D18.vps.ovh.net', 'Clk-A62F1D18.vps.ovh.net', 'MyZBwg==', ':...']
|
|
if 'webirc' in cmd[0]:
|
|
isWebirc = True
|
|
else:
|
|
isWebirc = False
|
|
|
|
uid = str(cmd[8])
|
|
nickname = str(cmd[3])
|
|
username = str(cmd[6])
|
|
hostname = str(cmd[7])
|
|
umodes = str(cmd[10])
|
|
vhost = str(cmd[11])
|
|
if not 'S' in umodes:
|
|
remote_ip = self.Base.decode_ip(str(cmd[13]))
|
|
else:
|
|
remote_ip = '127.0.0.1'
|
|
|
|
score_connexion = self.first_score
|
|
|
|
self.User.insert(
|
|
self.User.UserModel(
|
|
uid=uid,
|
|
nickname=nickname,
|
|
username=username,
|
|
hostname=hostname,
|
|
umodes=umodes,
|
|
vhost=vhost,
|
|
isWebirc=isWebirc,
|
|
remote_ip=remote_ip,
|
|
score_connexion=score_connexion,
|
|
connexion_datetime=datetime.now()
|
|
)
|
|
)
|
|
|
|
for classe_name, classe_object in self.loaded_classes.items():
|
|
classe_object.cmd(cmd_to_send)
|
|
|
|
case 'PRIVMSG':
|
|
try:
|
|
# Supprimer la premiere valeur
|
|
cmd.pop(0)
|
|
get_uid_or_nickname = str(cmd[0].replace(':',''))
|
|
user_trigger = self.User.get_nickname(get_uid_or_nickname)
|
|
dnickname = self.Config.SERVICE_NICKNAME
|
|
|
|
if len(cmd) == 6:
|
|
if cmd[1] == 'PRIVMSG' and str(cmd[3]).replace('.','') == ':auth':
|
|
cmd_copy = cmd.copy()
|
|
cmd_copy[5] = '**********'
|
|
self.Base.logs.info(cmd_copy)
|
|
else:
|
|
self.Base.logs.info(cmd)
|
|
else:
|
|
self.Base.logs.info(f'{cmd}')
|
|
|
|
pattern = fr'(:\{self.Config.SERVICE_PREFIX})(.*)$'
|
|
hcmds = re.search(pattern, ' '.join(cmd)) # va matcher avec tout les caractéres aprés le .
|
|
|
|
if hcmds: # Commande qui commencent par le point
|
|
liste_des_commandes = list(hcmds.groups())
|
|
convert_to_string = ' '.join(liste_des_commandes)
|
|
arg = convert_to_string.split()
|
|
arg.remove(f':{self.Config.SERVICE_PREFIX}')
|
|
if not arg[0].lower() in self.commands:
|
|
self.Base.logs.debug(f"This command {arg[0]} is not available")
|
|
return False
|
|
|
|
cmd_to_send = convert_to_string.replace(':','')
|
|
self.Base.log_cmd(user_trigger, cmd_to_send)
|
|
|
|
fromchannel = str(cmd[2]).lower() if self.Base.Is_Channel(cmd[2]) else None
|
|
self._hcmds(user_trigger, fromchannel, arg, cmd)
|
|
|
|
if cmd[2] == self.Config.SERVICE_ID:
|
|
pattern = fr'^:.*?:(.*)$'
|
|
|
|
hcmds = re.search(pattern, ' '.join(cmd))
|
|
|
|
if hcmds: # par /msg defender [commande]
|
|
liste_des_commandes = list(hcmds.groups())
|
|
convert_to_string = ' '.join(liste_des_commandes)
|
|
arg = convert_to_string.split()
|
|
|
|
# Réponse a un CTCP VERSION
|
|
if arg[0] == '\x01VERSION\x01':
|
|
self.send2socket(f':{dnickname} NOTICE {user_trigger} :\x01VERSION Service {self.Config.SERVICE_NICKNAME} V{self.Config.current_version}\x01')
|
|
return False
|
|
|
|
# Réponse a un TIME
|
|
if arg[0] == '\x01TIME\x01':
|
|
current_datetime = self.Base.get_datetime()
|
|
self.send2socket(f':{dnickname} NOTICE {user_trigger} :\x01TIME {current_datetime}\x01')
|
|
return False
|
|
|
|
# Réponse a un PING
|
|
if arg[0] == '\x01PING':
|
|
recieved_unixtime = int(arg[1].replace('\x01',''))
|
|
current_unixtime = self.Base.get_unixtime()
|
|
ping_response = current_unixtime - recieved_unixtime
|
|
self.send2socket(f':{dnickname} NOTICE {user_trigger} :\x01PING {str(ping_response)} secs\x01')
|
|
return False
|
|
|
|
if not arg[0].lower() in self.commands:
|
|
self.debug(f"This command {arg[0]} is not available")
|
|
return False
|
|
|
|
cmd_to_send = convert_to_string.replace(':','')
|
|
self.Base.log_cmd(self.User.get_nickname(user_trigger), cmd_to_send)
|
|
|
|
fromchannel = None
|
|
if len(arg) >= 2:
|
|
fromchannel = str(arg[1]).lower() if self.Base.Is_Channel(arg[1]) else None
|
|
|
|
self._hcmds(user_trigger, fromchannel, arg, cmd)
|
|
|
|
except IndexError as io:
|
|
self.Base.logs.error(f'{io}')
|
|
|
|
case _:
|
|
pass
|
|
|
|
if cmd[2] != 'UID':
|
|
# Envoyer la commande aux classes dynamiquement chargées
|
|
for classe_name, classe_object in self.loaded_classes.items():
|
|
classe_object.cmd(cmd_to_send)
|
|
|
|
except IndexError as ie:
|
|
self.Base.logs.error(f"{ie} / {cmd} / length {str(len(cmd))}")
|
|
|
|
def _hcmds(self, user: str, channel: Union[str, None], cmd:list, fullcmd: list = []) -> None:
|
|
|
|
fromuser = self.User.get_nickname(user) # Nickname qui a lancé la commande
|
|
uid = self.User.get_uid(fromuser) # Récuperer le uid de l'utilisateur
|
|
|
|
# Defender information
|
|
dnickname = self.Config.SERVICE_NICKNAME # Defender nickname
|
|
dchanlog = self.Config.SERVICE_CHANLOG # Defender chan log
|
|
|
|
if len(cmd) > 0:
|
|
command = str(cmd[0]).lower()
|
|
else:
|
|
return False
|
|
|
|
is_command_allowed = self.is_cmd_allowed(fromuser, command)
|
|
if not is_command_allowed:
|
|
command = 'notallowed'
|
|
|
|
# Envoyer la commande aux classes dynamiquement chargées
|
|
if command != 'notallowed':
|
|
for classe_name, classe_object in self.loaded_classes.items():
|
|
classe_object._hcmds(user, channel, cmd, fullcmd)
|
|
|
|
match command:
|
|
|
|
case 'notallowed':
|
|
try:
|
|
current_command = cmd[0]
|
|
self.send2socket(f':{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR["rouge"]}{current_command}{self.Config.CONFIG_COLOR["noire"]} ] - Accès Refusé à {self.User.get_nickname(fromuser)}')
|
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : Accès Refusé')
|
|
except IndexError as ie:
|
|
self.Base.logs.error(f'{ie}')
|
|
|
|
case 'deauth':
|
|
|
|
current_command = cmd[0]
|
|
uid_to_deauth = self.User.get_uid(fromuser)
|
|
self.delete_db_admin(uid_to_deauth)
|
|
self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['rouge']}{current_command}{self.Config.CONFIG_COLOR['noire']} ] - {self.User.get_nickname(fromuser)} est désormais déconnecter de {dnickname}")
|
|
|
|
case 'auth':
|
|
# ['auth', 'adator', 'password']
|
|
current_command = cmd[0]
|
|
user_to_log = self.User.get_nickname(cmd[1])
|
|
password = cmd[2]
|
|
|
|
if fromuser != user_to_log:
|
|
# If the current nickname is different from the nickname you want to log in with
|
|
self.send2socket(f":{self.Config.SERVICE_NICKNAME} NOTICE {fromuser} :Your current nickname is different from the nickname you want to log in with")
|
|
return False
|
|
|
|
if not user_to_log is None:
|
|
mes_donnees = {'user': user_to_log, 'password': self.Base.crypt_password(password)}
|
|
query = f"SELECT id, level FROM {self.Config.table_admin} WHERE user = :user AND password = :password"
|
|
result = self.Base.db_execute_query(query, mes_donnees)
|
|
user_from_db = result.fetchone()
|
|
|
|
if not user_from_db is None:
|
|
uid_user = self.User.get_uid(user_to_log)
|
|
self.insert_db_admin(uid_user, user_from_db[1])
|
|
self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['verte']}{current_command}{self.Config.CONFIG_COLOR['noire']} ] - {self.User.get_nickname(fromuser)} est désormais connecté a {dnickname}")
|
|
self.send2socket(f":{self.Config.SERVICE_NICKNAME} NOTICE {fromuser} :Connexion a {dnickname} réussie!")
|
|
else:
|
|
self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['rouge']}{current_command}{self.Config.CONFIG_COLOR['noire']} ] - {self.User.get_nickname(fromuser)} a tapé un mauvais mot de pass")
|
|
self.send2socket(f":{self.Config.SERVICE_NICKNAME} NOTICE {fromuser} :Mot de passe incorrecte")
|
|
|
|
else:
|
|
self.send2socket(f":{self.Config.SERVICE_NICKNAME} NOTICE {fromuser} :L'utilisateur {user_to_log} n'existe pas")
|
|
|
|
case 'addaccess':
|
|
try:
|
|
# .addaccess adator 5 password
|
|
newnickname = cmd[1]
|
|
newlevel = self.Base.int_if_possible(cmd[2])
|
|
password = cmd[3]
|
|
|
|
response = self.create_defender_user(newnickname, newlevel, password)
|
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : {response}')
|
|
self.Base.logs.info(response)
|
|
|
|
except IndexError as ie:
|
|
self.Base.logs.error(f'_hcmd addaccess: {ie}')
|
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} addaccess [nickname] [level] [password]')
|
|
except TypeError as te:
|
|
self.Base.logs.error(f'_hcmd addaccess: out of index : {te}')
|
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} addaccess [nickname] [level] [password]')
|
|
|
|
case 'editaccess':
|
|
# .editaccess [USER] [PASSWORD] [LEVEL]
|
|
try:
|
|
user_to_edit = cmd[1]
|
|
user_new_level = int(cmd[3])
|
|
user_password = self.Base.crypt_password(cmd[2])
|
|
|
|
if len(cmd) < 4 or len(cmd) > 4:
|
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : .editaccess [USER] [NEWPASSWORD] [NEWLEVEL]')
|
|
return None
|
|
|
|
get_admin = self.Admin.get_Admin(fromuser)
|
|
if get_admin is None:
|
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : This user {fromuser} has no Admin access')
|
|
return None
|
|
|
|
current_user = self.User.get_nickname(fromuser)
|
|
current_uid = self.User.get_uid(fromuser)
|
|
current_user_level = get_admin.level
|
|
|
|
if user_new_level > 5:
|
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : Maximum authorized level is 5')
|
|
return None
|
|
|
|
# Rechercher le user dans la base de données.
|
|
mes_donnees = {'user': user_to_edit}
|
|
query = f"SELECT user, level FROM {self.Config.table_admin} WHERE user = :user"
|
|
result = self.Base.db_execute_query(query, mes_donnees)
|
|
|
|
isUserExist = result.fetchone()
|
|
if not isUserExist is None:
|
|
|
|
if current_user_level < int(isUserExist[1]):
|
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : You are not allowed to edit this access')
|
|
return None
|
|
|
|
if current_user_level == int(isUserExist[1]) and current_user != user_to_edit:
|
|
self.send2socket(f":{dnickname} NOTICE {fromuser} : You can't edit access of a user with same level")
|
|
return None
|
|
|
|
# Le user existe dans la base de données
|
|
data_to_update = {'user': user_to_edit, 'password': user_password, 'level': user_new_level}
|
|
sql_update = f"UPDATE {self.Config.table_admin} SET level = :level, password = :password WHERE user = :user"
|
|
exec_query = self.Base.db_execute_query(sql_update, data_to_update)
|
|
if exec_query.rowcount > 0:
|
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : User {user_to_edit} has been modified with level {str(user_new_level)}')
|
|
else:
|
|
self.send2socket(f":{dnickname} NOTICE {fromuser} : Impossible de modifier l'utilisateur {str(user_new_level)}")
|
|
|
|
except TypeError as te:
|
|
self.Base.logs.error(f"Type error : {te}")
|
|
except ValueError as ve:
|
|
self.Base.logs.error(f"Value Error : {ve}")
|
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : .editaccess [USER] [NEWPASSWORD] [NEWLEVEL]')
|
|
|
|
case 'delaccess':
|
|
# .delaccess [USER] [CONFIRMUSER]
|
|
user_to_del = cmd[1]
|
|
user_confirmation = cmd[2]
|
|
|
|
if user_to_del != user_confirmation:
|
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : Les user ne sont pas les mêmes, tu dois confirmer le user que tu veux supprimer')
|
|
self.Base.logs.warning(f':{dnickname} NOTICE {fromuser} : Les user ne sont pas les mêmes, tu dois confirmer le user que tu veux supprimer')
|
|
return None
|
|
|
|
if len(cmd) < 3:
|
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : .delaccess [USER] [CONFIRMUSER]')
|
|
return None
|
|
|
|
get_admin = self.Admin.get_Admin(fromuser)
|
|
|
|
if get_admin is None:
|
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : This user {fromuser} has no admin access')
|
|
return None
|
|
|
|
current_user = self.User.get_nickname(fromuser)
|
|
current_uid = self.User.get_uid(fromuser)
|
|
current_user_level = get_admin.level
|
|
|
|
# Rechercher le user dans la base de données.
|
|
mes_donnees = {'user': user_to_del}
|
|
query = f"SELECT user, level FROM {self.Config.table_admin} WHERE user = :user"
|
|
result = self.Base.db_execute_query(query, mes_donnees)
|
|
info_user = result.fetchone()
|
|
|
|
if not info_user is None:
|
|
level_user_to_del = info_user[1]
|
|
if current_user_level <= level_user_to_del:
|
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : You are not allowed to delete this access')
|
|
self.Base.logs.warning(f':{dnickname} NOTICE {fromuser} : You are not allowed to delete this access')
|
|
return None
|
|
|
|
data_to_delete = {'user': user_to_del}
|
|
sql_delete = f"DELETE FROM {self.Config.table_admin} WHERE user = :user"
|
|
exec_query = self.Base.db_execute_query(sql_delete, data_to_delete)
|
|
if exec_query.rowcount > 0:
|
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : User {user_to_del} has been deleted !')
|
|
else:
|
|
self.send2socket(f":{dnickname} NOTICE {fromuser} : Impossible de supprimer l'utilisateur.")
|
|
self.Base.logs.warning(f":{dnickname} NOTICE {fromuser} : Impossible de supprimer l'utilisateur.")
|
|
|
|
case 'help':
|
|
|
|
help = ''
|
|
count_level_definition = 0
|
|
get_admin = self.Admin.get_Admin(uid)
|
|
if not get_admin is None:
|
|
user_level = get_admin.level
|
|
else:
|
|
user_level = 0
|
|
|
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : **************** LIST DES COMMANDES *****************')
|
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : ')
|
|
for levDef in self.commands_level:
|
|
|
|
if int(user_level) >= int(count_level_definition):
|
|
|
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : **************** {self.Config.CONFIG_COLOR["noire"]}[ {self.Config.CONFIG_COLOR["verte"]}LEVEL {str(levDef)} {self.Config.CONFIG_COLOR["noire"]}] ****************')
|
|
count_commands = 0
|
|
help = ''
|
|
for comm in self.commands_level[count_level_definition]:
|
|
|
|
help += f"{comm.upper()}"
|
|
if int(count_commands) < len(self.commands_level[count_level_definition])-1:
|
|
help += ' | '
|
|
count_commands += 1
|
|
|
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : {help}')
|
|
|
|
count_level_definition += 1
|
|
|
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : **************** FIN DES COMMANDES *****************')
|
|
|
|
case 'load':
|
|
|
|
self.load_module(fromuser, str(cmd[1]))
|
|
|
|
case 'unload':
|
|
# unload mod_dktmb
|
|
try:
|
|
module_name = str(cmd[1]).lower() # Le nom du module. exemple: mod_defender
|
|
class_name = module_name.split('_')[1].capitalize() # Nom de la class. exemple: Defender
|
|
|
|
if class_name in self.loaded_classes:
|
|
self.loaded_classes[class_name].unload()
|
|
for level, command in self.loaded_classes[class_name].commands_level.items():
|
|
# Supprimer la commande de la variable commands
|
|
for c in self.loaded_classes[class_name].commands_level[level]:
|
|
self.commands.remove(c)
|
|
self.commands_level[level].remove(c)
|
|
|
|
del self.loaded_classes[class_name]
|
|
|
|
# Supprimer le module de la base de données
|
|
self.Base.db_delete_module(module_name)
|
|
|
|
self.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} :Module {module_name} supprimé")
|
|
except:
|
|
self.Base.logs.error(f"Something went wrong with a module you want to load")
|
|
|
|
case 'reload':
|
|
# reload mod_dktmb
|
|
try:
|
|
module_name = str(cmd[1]).lower() # ==> mod_defender
|
|
class_name = module_name.split('_')[1].capitalize() # ==> Defender
|
|
|
|
if 'mods.' + module_name in sys.modules:
|
|
self.Base.logs.info('Unload the module ...')
|
|
self.loaded_classes[class_name].unload()
|
|
self.Base.logs.info('Module Already Loaded ... reloading the module ...')
|
|
the_module = sys.modules['mods.' + module_name]
|
|
importlib.reload(the_module)
|
|
|
|
# Supprimer la class déja instancier
|
|
if class_name in self.loaded_classes:
|
|
# Supprimer les commandes déclarer dans la classe
|
|
for level, command in self.loaded_classes[class_name].commands_level.items():
|
|
# Supprimer la commande de la variable commands
|
|
for c in self.loaded_classes[class_name].commands_level[level]:
|
|
self.commands.remove(c)
|
|
self.commands_level[level].remove(c)
|
|
|
|
del self.loaded_classes[class_name]
|
|
|
|
my_class = getattr(the_module, class_name, None)
|
|
new_instance = my_class(self.ircObject)
|
|
self.loaded_classes[class_name] = new_instance
|
|
|
|
self.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} :Module {module_name} rechargé")
|
|
return False
|
|
else:
|
|
self.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} :Module {module_name} n'est pas chargé !")
|
|
|
|
except TypeError as te:
|
|
self.Base.logs.error(f"A TypeError raised: {te}")
|
|
self.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} :A TypeError raised: {te}")
|
|
self.Base.db_delete_module(module_name)
|
|
except AttributeError as ae:
|
|
self.Base.logs.error(f"Missing Attribute: {ae}")
|
|
self.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} :Missing Attribute: {ae}")
|
|
self.Base.db_delete_module(module_name)
|
|
except KeyError as ke:
|
|
self.Base.logs.error(f"Key Error: {ke}")
|
|
self.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} :Key Error: {ke}")
|
|
self.Base.db_delete_module(module_name)
|
|
except Exception as e:
|
|
self.Base.logs.error(f"Something went wrong with a module you want to reload: {e}")
|
|
self.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} :Something went wrong with the module: {e}")
|
|
self.Base.db_delete_module(module_name)
|
|
|
|
case 'quit':
|
|
try:
|
|
reason = []
|
|
for i in range(1, len(cmd)):
|
|
reason.append(cmd[i])
|
|
final_reason = ' '.join(reason)
|
|
|
|
self.hb_active = False
|
|
self.Base.shutdown()
|
|
self.Base.execute_periodic_action()
|
|
|
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : Arrêt du service {dnickname}')
|
|
self.send2socket(f':{self.Config.SERVEUR_LINK} SQUIT {self.Config.SERVEUR_LINK} :{final_reason}')
|
|
self.Base.logs.info(f'Arrêt du server {dnickname}')
|
|
self.RESTART = 0
|
|
self.signal = False
|
|
|
|
except IndexError as ie:
|
|
self.Base.logs.error(f'{ie}')
|
|
|
|
self.send2socket(f"QUIT Good bye")
|
|
|
|
case 'restart':
|
|
reason = []
|
|
for i in range(1, len(cmd)):
|
|
reason.append(cmd[i])
|
|
final_reason = ' '.join(reason)
|
|
|
|
self.User.UID_DB.clear() # Clear User Object
|
|
self.Channel.UID_CHANNEL_DB.clear() # Clear Channel Object
|
|
|
|
for class_name in self.loaded_classes:
|
|
self.loaded_classes[class_name].unload()
|
|
|
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : Redémarrage du service {dnickname}')
|
|
self.send2socket(f':{self.Config.SERVEUR_LINK} SQUIT {self.Config.SERVEUR_LINK} :{final_reason}')
|
|
self.Base.logs.info(f'Redémarrage du server {dnickname}')
|
|
self.loaded_classes.clear()
|
|
self.RESTART = 1 # Set restart status to 1 saying that the service will restart
|
|
self.INIT = 1 # set init to 1 saying that the service will be re initiated
|
|
|
|
case 'show_modules':
|
|
|
|
self.Base.logs.debug(self.loaded_classes)
|
|
all_modules = self.Base.get_all_modules()
|
|
|
|
results = self.Base.db_execute_query(f'SELECT module_name FROM {self.Config.table_module}')
|
|
results = results.fetchall()
|
|
|
|
# if len(results) == 0:
|
|
# self.send2socket(f":{dnickname} NOTICE {fromuser} :There is no module loaded")
|
|
# return False
|
|
|
|
found = False
|
|
|
|
for module in all_modules:
|
|
for loaded_mod in results:
|
|
if module == loaded_mod[0]:
|
|
found = True
|
|
|
|
if found:
|
|
self.send2socket(f":{dnickname} NOTICE {fromuser} :{module} - {self.Config.CONFIG_COLOR['verte']}Loaded{self.Config.CONFIG_COLOR['nogc']}")
|
|
else:
|
|
self.send2socket(f":{dnickname} NOTICE {fromuser} :{module} - {self.Config.CONFIG_COLOR['rouge']}Not Loaded{self.Config.CONFIG_COLOR['nogc']}")
|
|
|
|
found = False
|
|
|
|
# for r in results:
|
|
# self.send2socket(f":{dnickname} NOTICE {fromuser} :{r[0]} - {self.Config.CONFIG_COLOR['verte']}Loaded{self.Config.CONFIG_COLOR['nogc']}")
|
|
|
|
case 'show_timers':
|
|
|
|
if self.Base.running_timers:
|
|
for the_timer in self.Base.running_timers:
|
|
self.send2socket(f":{dnickname} NOTICE {fromuser} :>> {the_timer.getName()} - {the_timer.is_alive()}")
|
|
else:
|
|
self.send2socket(f":{dnickname} NOTICE {fromuser} :Aucun timers en cours d'execution")
|
|
|
|
case 'show_threads':
|
|
|
|
for thread in self.Base.running_threads:
|
|
self.send2socket(f":{dnickname} NOTICE {fromuser} :>> {thread.getName()} ({thread.is_alive()})")
|
|
|
|
case 'show_channels':
|
|
|
|
for chan in self.Channel.UID_CHANNEL_DB:
|
|
list_nicknames: list = []
|
|
for uid in chan.uids:
|
|
pattern = fr'[:|@|%|\+|~|\*]*'
|
|
parsed_UID = re.sub(pattern, '', uid)
|
|
list_nicknames.append(self.User.get_nickname(parsed_UID))
|
|
|
|
self.send2socket(f":{dnickname} NOTICE {fromuser} : Channel: {chan.name} - Users: {list_nicknames}")
|
|
|
|
case 'show_users':
|
|
for db_user in self.User.UID_DB:
|
|
self.send2socket(f":{dnickname} NOTICE {fromuser} :UID : {db_user.uid} - isWebirc: {db_user.isWebirc} - Nickname: {db_user.nickname} - Connection: {db_user.connexion_datetime}")
|
|
|
|
case 'show_admins':
|
|
for db_admin in self.Admin.UID_ADMIN_DB:
|
|
self.send2socket(f":{dnickname} NOTICE {fromuser} :UID : {db_admin.uid} - Nickname: {db_admin.nickname} - Level: {db_admin.level} - Connection: {db_admin.connexion_datetime}")
|
|
|
|
case 'uptime':
|
|
uptime = self.get_defender_uptime()
|
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : {uptime}')
|
|
|
|
case 'copyright':
|
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : # Defender V.{self.Config.current_version} Developped by adator® and dktmb® #')
|
|
|
|
case 'checkversion':
|
|
|
|
self.Base.create_thread(
|
|
self.thread_check_for_new_version,
|
|
(fromuser, )
|
|
)
|
|
|
|
case _:
|
|
pass
|