V6.1.0 update the help command

This commit is contained in:
adator
2024-12-08 12:52:36 +01:00
parent a3edf48120
commit cb042a5411
15 changed files with 635 additions and 258 deletions

View File

@@ -1,5 +1,3 @@
from ast import parse
from http import server
import sys
import socket
import threading
@@ -11,8 +9,6 @@ import traceback
from ssl import SSLSocket
from datetime import datetime, timedelta
from typing import Union
from websockets import serve
from core.loader import Loader
from core.classes.protocol import Protocol
@@ -65,6 +61,9 @@ class Irc:
# Use Admin Instance
self.Admin = self.Loader.Admin
# Use Client Instance
self.Client = self.Loader.Client
# Use Channel Instance
self.Channel = self.Loader.Channel
@@ -80,27 +79,50 @@ class Irc:
# define first reputation score to 0
self.first_score: int = 0
# Define first IP connexion
self.first_connexion_ip: str = None
# Define the dict that will contain all loaded modules
self.loaded_classes:dict[str, 'Irc'] = {} # Definir la variable qui contiendra la liste modules chargés
# Global full module commands that contains level, module name, commands and description
self.module_commands: dict[int, dict[str, dict[str, str]]] = {}
# Global command list contains only the commands
self.module_commands_list: list[str] = []
self.build_command(0, 'core', 'help', 'This provide the help')
self.build_command(0, 'core', 'auth', 'Login to the IRC Service')
self.build_command(0, 'core', 'copyright', 'Give some information about the IRC Service')
self.build_command(0, 'core', 'uptime', 'Give you since when the service is connected')
self.build_command(0, 'core', 'firstauth', 'First authentication of the Service')
self.build_command(0, 'core', 'register', f'Register your nickname /msg {self.Config.SERVICE_NICKNAME} REGISTER <password> <email>')
self.build_command(0, 'core', 'identify', f'Identify yourself with your password /msg {self.Config.SERVICE_NICKNAME} IDENTIFY <account> <password>')
self.build_command(1, 'core', 'load', 'Load an existing module')
self.build_command(1, 'core', 'unload', 'Unload a module')
self.build_command(1, 'core', 'reload', 'Reload a module')
self.build_command(1, 'core', 'deauth', 'Deauth from the irc service')
self.build_command(1, 'core', 'checkversion', 'Check the version of the irc service')
self.build_command(2, 'core', 'show_modules', 'Display a list of loaded modules')
self.build_command(2, 'core', 'show_timers', 'Display active timers')
self.build_command(2, 'core', 'show_threads', 'Display active threads in the system')
self.build_command(2, 'core', 'show_channels', 'Display a list of active channels')
self.build_command(2, 'core', 'show_users', 'Display a list of connected users')
self.build_command(2, 'core', 'show_clients', 'Display a list of connected clients')
self.build_command(2, 'core', 'show_admins', 'Display a list of administrators')
self.build_command(2, 'core', 'show_configuration', 'Display the current configuration settings')
self.build_command(3, 'core', 'quit', 'Disconnect the bot or user from the server.')
self.build_command(3, 'core', 'restart', 'Restart the bot or service.')
self.build_command(3, 'core', 'addaccess', 'Add a user or entity to an access list with specific permissions.')
self.build_command(3, 'core', 'editaccess', 'Modify permissions for an existing user or entity in the access list.')
self.build_command(3, 'core', 'delaccess', 'Remove a user or entity from the access list.')
self.build_command(4, 'core', 'rehash', 'Reload the configuration file without restarting')
self.build_command(4, 'core', 'raw', 'Send a raw command directly to the IRC server')
# Define the IrcSocket object
self.IrcSocket:Union[socket.socket, SSLSocket] = None
# Liste des commandes internes du bot
self.commands_level = {
0: ['help', 'auth', 'copyright', 'uptime', 'firstauth'],
1: ['load','reload','unload', 'deauth', 'checkversion'],
2: ['show_modules', 'show_timers', 'show_threads', 'show_channels', 'show_users', 'show_admins', 'show_configuration'],
3: ['quit', 'restart','addaccess','editaccess', 'delaccess'],
4: ['rehash']
}
# l'ensemble des commandes.
self.commands = []
for level, commands in self.commands_level.items():
for command in self.commands_level[level]:
self.commands.append(command)
self.__create_table()
self.Base.create_thread(func=self.heartbeat, func_args=(self.beat, ))
@@ -305,6 +327,71 @@ class Irc:
# FIN CONNEXION IRC #
##############################################
def build_command(self, level: int, module_name: str, command_name: str, command_description: str) -> None:
"""This method build the commands variable
Args:
level (int): The Level of the command
module_name (str): The module name
command_name (str): The command name
command_description (str): The description of the command
"""
self.module_commands.setdefault(level, {}).setdefault(module_name, {}).update({command_name: command_description})
self.module_commands_list.append(command_name)
return None
def generate_help_menu(self, nickname: str) -> None:
# Check if the nickname is an admin
admin_obj = self.Admin.get_Admin(nickname)
dnickname = self.Config.SERVICE_NICKNAME
color_bold = self.Config.COLORS.bold
color_nogc = self.Config.COLORS.nogc
color_blue = self.Config.COLORS.blue
color_black = self.Config.COLORS.black
color_underline = self.Config.COLORS.underline
current_level = 0
count = 0
if admin_obj is not None:
current_level = admin_obj.level
self.Protocol.send_notice(nick_from=dnickname,nick_to=nickname, msg=f" ***************** LISTE DES COMMANDES *****************")
for level, modules in self.module_commands.items():
if level > current_level:
break
if count > 0:
self.Protocol.send_notice(nick_from=dnickname, nick_to=nickname, msg=" ")
self.Protocol.send_notice(nick_from=dnickname, nick_to=nickname, msg=f"{color_blue}{color_bold}Level {level}:{color_nogc}")
for module_name, commands in modules.items():
self.Protocol.send_notice(nick_from=dnickname, nick_to=nickname, msg=f"{color_black} {color_underline}Module: {module_name}{color_nogc}")
for command, description in commands.items():
self.Protocol.send_notice(nick_from=dnickname, nick_to=nickname, msg=f" {command:<20}: {description}")
count += 1
self.Protocol.send_notice(nick_from=dnickname,nick_to=nickname,msg=f" ***************** FIN DES COMMANDES *****************")
return None
def is_cmd_allowed(self, nickname: str, command_name: str) -> bool:
admin_obj = self.Admin.get_Admin(nickname)
current_level = 0
if admin_obj is not None:
current_level = admin_obj.level
for level, modules in self.module_commands.items():
for module_name, commands in modules.items():
for command, description in commands.items():
if command.lower() == command_name.lower() and level <= current_level:
return True
return False
def __create_table(self):
"""## Create core tables
"""
@@ -504,12 +591,6 @@ class Irc:
if class_name in self.loaded_classes:
self.loaded_classes[class_name].unload()
for level, command in self.loaded_classes[class_name].commands_level.items():
# Supprimer la commande de la variable commands
for c in self.loaded_classes[class_name].commands_level[level]:
self.commands.remove(c)
self.commands_level[level].remove(c)
del self.loaded_classes[class_name]
# Supprimer le module de la base de données
@@ -540,13 +621,6 @@ class Irc:
# Supprimer la class déja instancier
if class_name in self.loaded_classes:
# Supprimer les commandes déclarer dans la classe
for level, command in self.loaded_classes[class_name].commands_level.items():
# Supprimer la commande de la variable commands
for c in self.loaded_classes[class_name].commands_level[level]:
self.commands.remove(c)
self.commands_level[level].remove(c)
del self.loaded_classes[class_name]
my_class = getattr(the_module, class_name, None)
@@ -680,31 +754,6 @@ class Irc:
self.Logs.info(response)
return response
def is_cmd_allowed(self, nickname:str, cmd:str) -> bool:
# Vérifier si le user est identifié et si il a les droits
is_command_allowed = False
uid = self.User.get_uid(nickname)
get_admin = self.Admin.get_Admin(uid)
if not get_admin is None:
admin_level = get_admin.level
for ref_level, ref_commands in self.commands_level.items():
# print(f"LevelNo: {ref_level} - {ref_commands} - {admin_level}")
if ref_level <= int(admin_level):
# print(f"LevelNo: {ref_level} - {ref_commands}")
if cmd in ref_commands:
is_command_allowed = True
else:
for ref_level, ref_commands in self.commands_level.items():
if ref_level == 0:
# print(f"LevelNo: {ref_level} - {ref_commands}")
if cmd in ref_commands:
is_command_allowed = True
return is_command_allowed
def logs(self, log_msg:str) -> None:
"""Log to database if you want
@@ -1182,36 +1231,85 @@ class Irc:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Impossible de supprimer l'utilisateur.")
self.Logs.warning(f":{dnickname} NOTICE {fromuser} : Impossible de supprimer l'utilisateur.")
case 'register':
# Register PASSWORD EMAIL
password = cmd[1]
email = cmd[2]
user_obj = self.User.get_User(fromuser)
if user_obj is None:
self.Logs.error(f"Nickname ({fromuser}) doesn't exist, it is impossible to register this nickname")
return None
# If the account already exist.
if self.Client.db_is_account_exist(fromuser):
self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg=f"Your account already exist, please try to login instead /msg {self.Config.SERVICE_NICKNAME} IDENTIFY <account> <password>"
)
return None
# If the account doesn't exist then insert into database
data_to_record = {
'createdOn': self.Base.get_datetime(), 'account': fromuser,
'nickname': user_obj.nickname, 'hostname': user_obj.hostname, 'vhost': user_obj.vhost, 'realname': user_obj.realname, 'email': email,
'password': self.Base.crypt_password(password=password), 'level': 0
}
insert_to_db = self.Base.db_execute_query(f"""
INSERT INTO {self.Config.TABLE_CLIENT}
(createdOn, account, nickname, hostname, vhost, realname, email, password, level)
VALUES
(:createdOn, :account, :nickname, :hostname, :vhost, :realname, :email, :password, :level)
""", data_to_record)
if insert_to_db.rowcount > 0:
self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg=f"You have register your nickname successfully"
)
return None
case 'identify':
# Identify NICKNAME password
nickname = str(cmd[1])
encrypted_password = self.Base.crypt_password(cmd[2])
client_obj = self.Client.get_Client(nickname)
user_obj = self.User.get_User(fromuser)
if client_obj is not None:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You are already logged in")
return None
db_query = f"SELECT account FROM {self.Config.TABLE_CLIENT} WHERE nickname = :nickname AND password = :password"
db_param = {'nickname': nickname, 'password': encrypted_password}
exec_query = self.Base.db_execute_query(
db_query,
db_param
)
result_query = exec_query.fetchone()
if result_query:
account = result_query[0]
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You are now logged in")
client = self.Loader.Definition.MClient(
uid=user_obj.uid, account=account, nickname=nickname,
username=user_obj.username, realname=user_obj.realname, hostname=user_obj.hostname, umodes=user_obj.umodes, vhost=user_obj.vhost,
isWebirc=user_obj.isWebirc, isWebsocket=user_obj.isWebsocket, remote_ip=user_obj.remote_ip, score_connexion=user_obj.score_connexion,
geoip=user_obj.geoip, connexion_datetime=user_obj.connexion_datetime
)
self.Client.insert(client)
self.Protocol.send_svs_mode(nickname=nickname, user_mode='+r')
else:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Wrong password or account")
return None
case 'help':
count_level_definition = 0
get_admin = self.Admin.get_Admin(uid)
if not get_admin is None:
user_level = get_admin.level
else:
user_level = 0
self.Protocol.send_notice(nick_from=dnickname,nick_to=fromuser,msg=f" ***************** LISTE DES COMMANDES *****************")
self.Protocol.send_notice(nick_from=dnickname,nick_to=fromuser,msg=f" ")
for levDef in self.commands_level:
if int(user_level) >= int(count_level_definition):
self.Protocol.send_notice(nick_from=dnickname,nick_to=fromuser,
msg=f" ***************** {self.Config.COLORS.nogc}[ {self.Config.COLORS.green}LEVEL {str(levDef)} {self.Config.COLORS.nogc}] *****************"
)
batch = 7
for i in range(0, len(self.commands_level[count_level_definition]), batch):
groupe = self.commands_level[count_level_definition][i:i + batch] # Extraire le groupe
batch_commands = ' | '.join(groupe)
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" {batch_commands}")
self.Protocol.send_notice(nick_from=dnickname,nick_to=fromuser,msg=f" ")
count_level_definition += 1
self.Protocol.send_notice(nick_from=dnickname,nick_to=fromuser,msg=f" ***************** FIN DES COMMANDES *****************")
self.generate_help_menu(nickname=fromuser)
case 'load':
try:
@@ -1311,7 +1409,9 @@ class Irc:
mod_protocol = sys.modules['core.classes.protocol']
mod_base = sys.modules['core.base']
mod_config = sys.modules['core.classes.config']
mod_definition = sys.modules['core.definition']
importlib.reload(mod_definition)
importlib.reload(mod_config)
self.Config = self.Loader.ConfModule.Configuration().ConfigObject
self.Config.HSID = hsid
@@ -1439,6 +1539,16 @@ class Irc:
msg=f"UID : {db_user.uid} - isWebirc: {db_user.isWebirc} - isWebSocket: {db_user.isWebsocket} - Nickname: {db_user.nickname} - Connection: {db_user.connexion_datetime}"
)
case 'show_clients':
count_users = len(self.Client.CLIENT_DB)
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Total Connected Clients: {count_users}")
for db_client in self.Client.CLIENT_DB:
self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg=f"UID : {db_client.uid} - isWebirc: {db_client.isWebirc} - isWebSocket: {db_client.isWebsocket} - Nickname: {db_client.nickname} - Account: {db_client.account} - Connection: {db_client.connexion_datetime}"
)
case 'show_admins':
for db_admin in self.Admin.UID_ADMIN_DB:
@@ -1481,5 +1591,9 @@ class Irc:
(fromuser, )
)
case 'raw':
raw_command = ' '.join(cmd[1:])
self.Protocol.send_raw(raw_command)
case _:
pass