41 Commits

Author SHA1 Message Date
adator
a7de16f7ad V5.2.4 multiple changes related to clones 2024-09-17 00:14:53 +02:00
adator
c1c0b480ce V5.2.3 If missing MOTD 2024-09-16 21:14:04 +02:00
adator
66ea492593 removing old code 2024-09-16 00:57:27 +02:00
adator
d459fd662f Changing hostname modification 2024-09-16 00:41:43 +02:00
adator
5d3a2b0e64 Changing Proxy_scan to PSUTIL_scan 2024-09-15 23:32:44 +02:00
adator
2f681db2d7 Adding Geoip to the UserModel 2024-09-15 23:29:32 +02:00
adator
7585db4f62 V5.2.2 2024-09-15 22:39:34 +02:00
adator
1984511db8 V5.2.1 2024-09-15 03:08:49 +02:00
adator
ce47739a93 update mode clone 2024-09-15 02:49:42 +02:00
adator
c7047ec3d6 V5.2.0 2024-09-15 02:03:38 +02:00
adator
eddba81cf0 remove mod_jsonrpc.py 2024-09-14 01:32:16 +02:00
adator
59e634951f V5.1.9 2024-09-14 01:25:13 +02:00
adator
37684eaede With unrealircd json rpc 2024-09-13 23:51:13 +02:00
adator
e6156fa301 V5.1.8 2024-09-08 00:42:18 +02:00
adator
58e3ebd287 V5.1.7 2024-09-03 00:19:13 +02:00
adator
322759c5ef V5.1.6 2024-09-01 22:15:24 +02:00
adator
3ba884216f Update vote kick commands 2024-09-01 18:55:30 +02:00
adator
2ce19ee877 mod_command update 2024-09-01 17:29:33 +02:00
adator
351fd6edaf adding Say command for clones 2024-09-01 16:39:49 +02:00
adator
d0c17d69de update mod_clone module 2024-09-01 15:54:23 +02:00
adator
eb9402dd8e update readme 2024-09-01 15:34:49 +02:00
adator
35d5e7a2b5 Update readme.md 2024-09-01 15:23:48 +02:00
adator
21a825c92d Remove install.py 2024-09-01 15:18:32 +02:00
adator
3fcfa0296d . 2024-09-01 14:40:28 +02:00
adator
bcf972d08b . 2024-09-01 14:34:57 +02:00
adator
1348ead6cd remove logging library 2024-09-01 14:33:41 +02:00
adator
f6ebab4780 . 2024-09-01 14:21:25 +02:00
adator
608ec57593 . 2024-09-01 13:59:26 +02:00
adator
f392f2fb2f . 2024-09-01 13:56:55 +02:00
adator
489e1e7b0a . 2024-09-01 13:47:18 +02:00
adator
3d79270ca0 . 2024-09-01 13:42:39 +02:00
adator
e60ada4260 Fix logging instance 2024-09-01 13:41:15 +02:00
adator
ccb9f307b4 Few changes see changelog 2024-09-01 13:39:02 +02:00
adator
2fc8f2d346 Fix issue unexpected argument 2024-09-01 13:14:51 +02:00
adator
e3ada04f2a daemon_reload cmd. 2024-09-01 13:09:47 +02:00
adator
6ba0551fee v5.1.5 2024-09-01 12:51:54 +02:00
adator
6142b4257f fix Installation 2024-08-29 01:36:12 +02:00
adator
ab15cce82b Fix Bug installation 2024-08-29 01:30:10 +02:00
adator
01dcc90d63 V5.1.0 2024-08-29 01:13:55 +02:00
adator
9cd089ee6e Fix some issues 2024-08-28 00:50:50 +02:00
adator
c635851d19 New version 2024-08-28 00:13:14 +02:00
15 changed files with 2750 additions and 1337 deletions

3
.gitignore vendored
View File

@@ -2,6 +2,7 @@
db/
logs/
__pycache__/
mods/mod_jsonrpc.py
configuration.json
install.log
*.log
test.py

View File

@@ -1,4 +1,9 @@
# IRC-DEFENDER
![Static Badge](https://img.shields.io/badge/UnrealIRCd-6.2.2%20or%20later-green)
![Static Badge](https://img.shields.io/badge/Python3-3.10%20or%20later-green)
![Dynamic JSON Badge](https://img.shields.io/badge/dynamic/json?url=https%3A%2F%2Fraw.githubusercontent.com%2Fadator85%2FIRC_DEFENDER_MODULES%2Fmain%2Fversion.json&query=version&label=Current%20Version)
![Static Badge](https://img.shields.io/badge/Maintained-Yes-green)
Defender est un service IRC basé sur la sécurité des réseaux IRC ( UnrealIRCD )
Il permet d'ajouter une sécurité supplémentaire pour vérifier les users connectés au réseau
en demandant aux user un code de validation.
@@ -9,9 +14,9 @@ Il permet aux opérateurs de gérer efficacement un canal, tout en offrant aux u
Kick: Expulser un utilisateur du canal.
Ban: Interdire définitivement l'accès au canal.
Unban: Lever une interdiction.
Op/Deop: Attribuer ou retirer les droits d'opérateur.
Op/Deop/Opall/Deopall: Attribuer ou retirer les droits d'opérateur.
Halfop/Dehalfop: Attribuer ou retirer les droits
Voice/Devoice: Attribuer ou retirer les droits de voix.
Voice/Devoice/VoiceAll/DevoiceAll: Attribuer ou retirer les droits de voix.
Système de quarantaine:
Mise en quarantaine: Isoler temporairement un utilisateur dans un canal privé.
@@ -25,16 +30,20 @@ Il permet aux opérateurs de gérer efficacement un canal, tout en offrant aux u
Prérequis:
- Système d'exploitation Linux (Windows non supporté)
- Droits d'administrateur (root) pour l'exécution du script
- Un server UnrealIRCD corréctement configuré
- Python version 3.10 ou supérieure
Bash:
$ git clone https://github.com/adator85/IRC_DEFENDER_MODULES.git
- Renommer le fichier exemple_configuration.json en configuration.json
- Configurer le fichier configuration.json
$ sudo python3 install.py
$ python3 main.py
Si votre configuration est bonne, votre service est censé etre connecté a votre réseau IRC
Pour Les prochains lancement de defender vous devez utiliser la commande suivante:
Bash:
$ systemctl --user [start | stop | restart | status] defender
# Installation manuelle:
Bash:
@@ -42,8 +51,10 @@ Si votre configuration est bonne, votre service est censé etre connecté a votr
$ cd IRC_DEFENDER_MODULES
$ python3 -m venv .pyenv
$ source .pyenv/bin/activate
- Créer un service nommé "Defender.service" pour votre service et placer le dans "/etc/systemd/system/"
$ sudo systemctl start Defender
(pyenv)$ pip install sqlalchemy, psutil, requests, faker
- Créer un service nommé "defender.service" pour votre service et placer le dans "/PATH/TO/USER/.config/systemd/user/"
- Si le dossier n'existe pas il faut les créer
$ sudo systemctl --user start defender
# Configuration
@@ -55,6 +66,7 @@ Si votre configuration est bonne, votre service est censé etre connecté a votr
SERVEUR_PASSWORD: Mot de passe d'enregistrement du service sur le serveur IRC.
SERVEUR_ID: Identifiant unique du service.
SERVEUR_SSL: Active la connexion SSL sécurisée au serveur IRC (true/false).
SERVICE (Service)
SERVICE_NAME: Nom du service IRC.
SERVICE_NICKNAME: Surnom utilisé par le service sur le serveur IRC.
@@ -67,35 +79,46 @@ Si votre configuration est bonne, votre service est censé etre connecté a votr
SERVICE_CMODES: Modes de canal appliqués aux canaux rejoints par le service.
SERVICE_UMODES: Modes utilisateur appliqués au service.
SERVICE_PREFIX: Caractère utilisé comme préfixe des commandes du service.
COMPTE (Compte)
OWNER: Nom d'utilisateur possédant les droits d'administration du service.
PASSWORD: Mot de passe de l'administrateur du service.
CANAUX (Canaux)
SALON_JAIL: Canal utilisé comme prison pour les utilisateurs sanctionnés.
SALON_JAIL_MODES: Modes appliqués au canal de prison.
SALON_LIBERER: Canal utilisé pour la libération des utilisateurs sanctionnés.
API (API)
API_TIMEOUT: Durée maximale d'attente d'une réponse de l'API en secondes.
SCANNER (Scanner)
PORTS_TO_SCAN: Liste des ports à scanner pour détecter des serveurs potentiellement malveillants.
SÉCURITÉ (Sécurité)
WHITELISTED_IP: Liste d'adresses IP autorisées à contourner certaines restrictions.
GLINE_DURATION: Durée de bannissement temporaire d'un utilisateur en minutes.
DEBUG (Debug)
DEBUG_LEVEL: Niveau de verbosité des messages de debug (plus grand est le nombre, plus il y a d'informations).
COULEURS (Couleurs)
CONFIG_COLOR: Dictionnaire contenant des codes de couleurs IRC pour un meilleur affichage des messages.
Modification de la configuration
Vous devez modifier le fichier config.json en remplaçant les valeurs par défaut avec vos propres informations. Assurez-vous de bien lire la description de chaque paramètre pour une configuration optimale du service.
Vous devez modifier le fichier configuration.json en remplaçant les valeurs par défaut avec vos propres informations. Assurez-vous de bien lire la description de chaque paramètre pour une configuration optimale du service.
Attention
# \\!/ Attention \\!/
Le mot de passe de l'administrateur et le mot de passe du service doivent être modifiés pour des raisons de sécurité.
Ne partagez pas vos informations de connexion au serveur IRC avec des tiers.
a votre premiere connexion vous devez tapez
Le mot de passe de l'administrateur et le mot de passe du service doivent être modifiés pour des raisons de sécurité.
Ne partagez pas vos informations de connexion au serveur IRC avec des tiers.
/msg [NomDuService] auth [nickname] [password]
-- Une fois identifié tapez la commande suivante
/msg [NomDuService] editaccess [nickname] [Nouveau-Password] 5
#Extension:
# Extension:
Le code est modulaire et conçu pour être facilement étendu. Vous pouvez ajouter de nouvelles commandes, de nouvelles fonctionnalités (mods/mod_test.py est un exemple pour bien demarrer la création de son module).
# Contributions:

View File

@@ -10,12 +10,15 @@ class User:
uid: str
nickname: str
username: str
realname: str
hostname: str
umodes: str
vhost: str
isWebirc: bool
isWebsocket: bool
remote_ip: str
score_connexion: int
geoip: str = None
connexion_datetime: datetime = field(default=datetime.now())
UID_DB: list[UserModel] = []
@@ -315,7 +318,7 @@ class Channel:
for user in newChan.uids:
record.uids.append(user)
# Supprimer les doublons
# Supprimer les doublons
del_duplicates = list(set(record.uids))
record.uids = del_duplicates
self.log.debug(f'Updating a new UID to the channel {record}')
@@ -402,3 +405,100 @@ class Channel:
self.log.debug(f'Search {name} -- result = {Channel}')
return Channel
class Clones:
@dataclass
class CloneModel:
alive: bool
nickname: str
username: str
realname: str
vhost: str = None
connected: bool = False
UID_CLONE_DB: list[CloneModel] = []
def __init__(self, Base: Base) -> None:
self.log = Base.logs
def insert(self, newCloneObject: CloneModel) -> bool:
"""Create new Clone object
Args:
newCloneObject (CloneModel): New CloneModel object
Returns:
bool: True if inserted
"""
result = False
exist = False
for record in self.UID_CLONE_DB:
if record.nickname == newCloneObject.nickname:
# If the user exist then return False and do not go further
exist = True
self.log.debug(f'{record.nickname} already exist')
return result
if not exist:
self.UID_CLONE_DB.append(newCloneObject)
result = True
self.log.debug(f'New Clone Object Created: ({newCloneObject})')
if not result:
self.log.critical(f'The Clone Object was not inserted {newCloneObject}')
return result
def delete(self, nickname: str) -> bool:
"""Delete the Clone Object starting from the nickname
Args:
nickname (str): nickname of the clone
Returns:
bool: True if deleted
"""
result = False
for record in self.UID_CLONE_DB:
if record.nickname == nickname:
# If the user exist then remove and return True and do not go further
self.UID_CLONE_DB.remove(record)
result = True
self.log.debug(f'The clone ({record.nickname}) has been deleted')
return result
if not result:
self.log.critical(f'The UID {nickname} was not deleted')
return result
def exists(self, nickname: str) -> bool:
"""Check if the nickname exist
Args:
nickname (str): Nickname of the clone
Returns:
bool: True if the nickname exist
"""
response = False
for cloneObject in self.UID_CLONE_DB:
if cloneObject.nickname == nickname:
response = True
return response
def kill(self, nickname:str) -> bool:
response = False
for cloneObject in self.UID_CLONE_DB:
if cloneObject.nickname == nickname:
cloneObject.alive = False # Kill the clone
response = True
return response

View File

@@ -1,6 +1,7 @@
import time, threading, os, random, socket, hashlib, ipaddress, logging, requests, json, re
from typing import Union
from base64 import b64decode
import time, threading, os, random, socket, hashlib, ipaddress, logging, requests, json, re, ast
from dataclasses import fields
from typing import Union, Literal
from base64 import b64decode, b64encode
from datetime import datetime
from sqlalchemy import create_engine, Engine, Connection, CursorResult
from sqlalchemy.sql import text
@@ -8,15 +9,11 @@ from core.loadConf import ConfigDataModel
class Base:
CORE_DB_PATH = 'core' + os.sep + 'db' + os.sep # Le dossier bases de données core
MODS_DB_PATH = 'mods' + os.sep + 'db' + os.sep # Le dossier bases de données des modules
PYTHON_MIN_VERSION = '3.10' # Version min de python
def __init__(self, Config: ConfigDataModel) -> None:
self.Config = Config # Assigner l'objet de configuration
self.init_log_system() # Demarrer le systeme de log
self.check_for_new_version(True) # Verifier si une nouvelle version est disponible
self.check_for_new_version(True) # Verifier si une nouvelle version est disponible
self.running_timers:list[threading.Timer] = [] # Liste des timers en cours
self.running_threads:list[threading.Thread] = [] # Liste des threads en cours
@@ -25,6 +22,7 @@ class Base:
self.lock = threading.RLock() # Création du lock
self.install: bool = False # Initialisation de la variable d'installation
self.engine, self.cursor = self.db_init() # Initialisation de la connexion a la base de données
self.__create_db() # Initialisation de la base de données
@@ -47,7 +45,7 @@ class Base:
def __get_latest_defender_version(self) -> None:
try:
self.logs.debug(f'Looking for a new version available on Github')
print(f'===> Looking for a new version available on Github')
# print(f'===> Looking for a new version available on Github')
token = ''
json_url = f'https://raw.githubusercontent.com/adator85/IRC_DEFENDER_MODULES/main/version.json'
headers = {
@@ -158,7 +156,7 @@ class Base:
encoding='UTF-8',
format='%(asctime)s - %(levelname)s - %(filename)s - %(lineno)d - %(funcName)s - %(message)s')
self.logs.info('#################### STARTING INTERCEPTOR HQ ####################')
self.logs.info('#################### STARTING DEFENDER ####################')
return None
@@ -190,8 +188,8 @@ class Base:
Returns:
bool: True si le module existe déja dans la base de données sinon False
"""
query = f"SELECT id FROM {self.Config.table_module} WHERE module = :module"
mes_donnes = {'module': module_name}
query = f"SELECT id FROM {self.Config.table_module} WHERE module_name = :module_name"
mes_donnes = {'module_name': module_name}
results = self.db_execute_query(query, mes_donnes)
if results.fetchall():
@@ -199,7 +197,7 @@ class Base:
else:
return False
def db_record_module(self, user_cmd:str, module_name:str) -> None:
def db_record_module(self, user_cmd:str, module_name:str, isdefault:int = 0) -> None:
"""Enregistre les modules dans la base de données
Args:
@@ -208,10 +206,9 @@ class Base:
if not self.db_isModuleExist(module_name):
self.logs.debug(f"Le module {module_name} n'existe pas alors ont le créer")
insert_cmd_query = f"INSERT INTO {self.Config.table_module} (datetime, user, module) VALUES (:datetime, :user, :module)"
mes_donnees = {'datetime': self.get_datetime(), 'user': user_cmd, 'module': module_name}
insert_cmd_query = f"INSERT INTO {self.Config.table_module} (datetime, user, module_name, isdefault) VALUES (:datetime, :user, :module_name, :isdefault)"
mes_donnees = {'datetime': self.get_datetime(), 'user': user_cmd, 'module_name': module_name, 'isdefault': isdefault}
self.db_execute_query(insert_cmd_query, mes_donnees)
# self.db_close_session(self.session)
else:
self.logs.debug(f"Le module {module_name} existe déja dans la base de données")
@@ -221,14 +218,180 @@ class Base:
"""Supprime les modules de la base de données
Args:
cmd (str): le module a enregistrer
cmd (str): le module a supprimer
"""
insert_cmd_query = f"DELETE FROM {self.Config.table_module} WHERE module = :module"
mes_donnees = {'module': module_name}
insert_cmd_query = f"DELETE FROM {self.Config.table_module} WHERE module_name = :module_name"
mes_donnees = {'module_name': module_name}
self.db_execute_query(insert_cmd_query, mes_donnees)
return False
def db_sync_core_config(self, module_name: str, dataclassObj: object) -> bool:
"""Sync module local parameters with the database
if new module then local param will be stored in the database
if old module then db param will be moved to the local dataclassObj
if new local param it will be stored in the database
if local param was removed then it will also be removed from the database
Args:
module_name (str): The module name ex. mod_defender
dataclassObj (object): The Dataclass object
Returns:
bool: _description_
"""
try:
response = True
current_date = self.get_datetime()
core_table = self.Config.table_config
# Add local parameters to DB
for field in fields(dataclassObj):
param_key = field.name
param_value = str(getattr(dataclassObj, field.name))
param_to_search = {'module_name': module_name, 'param_key': param_key}
search_query = f'''SELECT id FROM {core_table} WHERE module_name = :module_name AND param_key = :param_key'''
excecute_search_query = self.db_execute_query(search_query, param_to_search)
result_search_query = excecute_search_query.fetchone()
if result_search_query is None:
# If param and module_name doesn't exist create the record
param_to_insert = {'datetime': current_date,'module_name': module_name,
'param_key': param_key,'param_value': param_value
}
insert_query = f'''INSERT INTO {core_table} (datetime, module_name, param_key, param_value)
VALUES (:datetime, :module_name, :param_key, :param_value)
'''
execution = self.db_execute_query(insert_query, param_to_insert)
if execution.rowcount > 0:
self.logs.debug(f'New parameter added to the database: {param_key} --> {param_value}')
# Delete from DB unused parameter
query_select = f"SELECT module_name, param_key, param_value FROM {core_table} WHERE module_name = :module_name"
parameter = {'module_name': module_name}
execute_query_select = self.db_execute_query(query_select, parameter)
result_query_select = execute_query_select.fetchall()
for result in result_query_select:
db_mod_name, db_param_key, db_param_value = result
if not hasattr(dataclassObj, db_param_key):
mes_donnees = {'param_key': db_param_key, 'module_name': db_mod_name}
execute_delete = self.db_execute_query(f'DELETE FROM {core_table} WHERE module_name = :module_name and param_key = :param_key', mes_donnees)
row_affected = execute_delete.rowcount
if row_affected > 0:
self.logs.debug(f'A parameter has been deleted from the database: {db_param_key} --> {db_param_value} | Mod: {db_mod_name}')
# Sync local variable with Database
query = f"SELECT param_key, param_value FROM {core_table} WHERE module_name = :module_name"
parameter = {'module_name': module_name}
response = self.db_execute_query(query, parameter)
result = response.fetchall()
for param, value in result:
if type(getattr(dataclassObj, param)) == list:
value = ast.literal_eval(value)
setattr(dataclassObj, param, self.int_if_possible(value))
return response
except AttributeError as attrerr:
self.logs.error(f'Attribute Error: {attrerr}')
except Exception as err:
self.logs.error(err)
return False
def db_update_core_config(self, module_name:str, dataclassObj: object, param_key:str, param_value: str) -> bool:
core_table = self.Config.table_config
# Check if the param exist
if not hasattr(dataclassObj, param_key):
self.logs.error(f"Le parametre {param_key} n'existe pas dans la variable global")
return False
mes_donnees = {'module_name': module_name, 'param_key': param_key, 'param_value': param_value}
search_param_query = f"SELECT id FROM {core_table} WHERE module_name = :module_name AND param_key = :param_key"
result = self.db_execute_query(search_param_query, mes_donnees)
isParamExist = result.fetchone()
if not isParamExist is None:
mes_donnees = {'datetime': self.get_datetime(),
'module_name': module_name,
'param_key': param_key,
'param_value': param_value
}
query = f'''UPDATE {core_table} SET datetime = :datetime, param_value = :param_value WHERE module_name = :module_name AND param_key = :param_key'''
update = self.db_execute_query(query, mes_donnees)
updated_rows = update.rowcount
if updated_rows > 0:
setattr(dataclassObj, param_key, self.int_if_possible(param_value))
self.logs.debug(f'Parameter updated : {param_key} - {param_value} | Module: {module_name}')
else:
self.logs.error(f'Parameter NOT updated : {param_key} - {param_value} | Module: {module_name}')
else:
self.logs.error(f'Parameter and Module do not exist: Param ({param_key}) - Value ({param_value}) | Module ({module_name})')
self.logs.debug(dataclassObj)
return True
def db_query_channel(self, action: Literal['add','del'], module_name: str, channel_name: str) -> bool:
"""You can add a channel or delete a channel.
Args:
action (Literal['add','del']): Action on the database
module_name (str): The module name (mod_test)
channel_name (str): The channel name (With #)
Returns:
bool: True if action done
"""
try:
channel_name = channel_name.lower() if self.Is_Channel(channel_name) else None
core_table = 'core_channel'
if not channel_name:
self.logs.warn(f'The channel [{channel_name}] is not correct')
return False
match action:
case 'add':
mes_donnees = {'module_name': module_name, 'channel_name': channel_name}
response = self.db_execute_query(f"SELECT id FROM {core_table} WHERE module_name = :module_name AND channel_name = :channel_name", mes_donnees)
isChannelExist = response.fetchone()
if isChannelExist is None:
mes_donnees = {'datetime': self.get_datetime(), 'channel_name': channel_name, 'module_name': module_name}
insert = self.db_execute_query(f"INSERT INTO {core_table} (datetime, channel_name, module_name) VALUES (:datetime, :channel_name, :module_name)", mes_donnees)
if insert.rowcount:
self.logs.debug(f'New channel added: channel={channel_name} / module_name={module_name}')
return True
else:
return False
pass
case 'del':
mes_donnes = {'channel_name': channel_name, 'module_name': module_name}
response = self.db_execute_query(f"DELETE FROM {core_table} WHERE channel_name = :channel_name AND module_name = :module_name", mes_donnes)
if response.rowcount > 0:
self.logs.debug(f'Channel deleted: channel={channel_name} / module: {module_name}')
return True
else:
return False
case _:
return False
except Exception as err:
self.logs.error(err)
def db_create_first_admin(self) -> None:
user = self.db_execute_query(f"SELECT id FROM {self.Config.table_admin}")
@@ -266,7 +429,14 @@ class Base:
except AssertionError as ae:
self.logs.error(f'Assertion Error -> {ae}')
def create_thread(self, func:object, func_args: tuple = (), run_once:bool = False) -> None:
def create_thread(self, func:object, func_args: tuple = (), run_once:bool = False, daemon: bool = True) -> None:
"""Create a new thread and store it into running_threads variable
Args:
func (object): The method/function you want to execute via this thread
func_args (tuple, optional): Arguments of the function/method. Defaults to ().
run_once (bool, optional): If you want to ensure that this method/function run once. Defaults to False.
"""
try:
func_name = func.__name__
@@ -275,11 +445,7 @@ class Base:
if thread.getName() == func_name:
return None
# if func_name in self.running_threads:
# print(f"HeartBeat is running")
# return None
th = threading.Thread(target=func, args=func_args, name=str(func_name), daemon=True)
th = threading.Thread(target=func, args=func_args, name=str(func_name), daemon=daemon)
th.start()
self.running_threads.append(th)
@@ -367,6 +533,7 @@ class Base:
full_path_db = self.Config.db_path + self.Config.db_name
if not os.path.exists(db_directory):
self.install = True
os.makedirs(db_directory)
engine = create_engine(f'sqlite:///{full_path_db}.db', echo=False)
@@ -376,14 +543,23 @@ class Base:
def __create_db(self) -> None:
table_logs = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_log} (
table_core_log = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_log} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
server_msg TEXT
)
'''
table_cmds = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_commande} (
table_core_config = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_config} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
module_name TEXT,
param_key TEXT,
param_value TEXT
)
'''
table_core_log_command = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_commande} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
user TEXT,
@@ -391,15 +567,24 @@ class Base:
)
'''
table_modules = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_module} (
table_core_module = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_module} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
user TEXT,
module TEXT
module_name TEXT,
isdefault INTEGER
)
'''
table_core_channel = '''CREATE TABLE IF NOT EXISTS core_channel (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
module_name TEXT,
channel_name TEXT
)
'''
table_admins = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_admin} (
table_core_admin = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_admin} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
createdOn TEXT,
user TEXT,
@@ -410,10 +595,17 @@ class Base:
)
'''
self.db_execute_query(table_logs)
self.db_execute_query(table_cmds)
self.db_execute_query(table_modules)
self.db_execute_query(table_admins)
self.db_execute_query(table_core_log)
self.db_execute_query(table_core_log_command)
self.db_execute_query(table_core_module)
self.db_execute_query(table_core_admin)
self.db_execute_query(table_core_channel)
self.db_execute_query(table_core_config)
if self.install:
self.db_record_module('sys', 'mod_command', 1)
self.db_record_module('sys', 'mod_defender', 1)
self.install = False
return None
@@ -490,6 +682,17 @@ class Base:
self.logs.critical(f'This remote ip is not valid : {ve}')
return None
# def encode_ip(self, remote_ip_address: str) -> Union[str, None]:
# binary_ip = b64encode()
# try:
# decoded_ip = ipaddress.ip_address(binary_ip)
# return decoded_ip.exploded
# except ValueError as ve:
# self.logs.critical(f'This remote ip is not valid : {ve}')
# return None
def get_random(self, lenght:int) -> str:
"""
Retourn une chaîne aléatoire en fonction de la longueur spécifiée.
@@ -519,7 +722,7 @@ class Base:
self.periodic_func.clear()
def clean_uid(self, uid:str) -> str:
"""Clean UID by removing @ / % / + / Owner / and *
"""Clean UID by removing @ / % / + / ~ / * / :
Args:
uid (str): The UID to clean
@@ -528,7 +731,7 @@ class Base:
str: Clean UID without any sign
"""
pattern = fr'[@|%|\+|~|\*]*'
pattern = fr'[:|@|%|\+|~|\*]*'
parsed_UID = re.sub(pattern, '', uid)
return parsed_UID
@@ -542,11 +745,19 @@ class Base:
Returns:
bool: True if the string is a channel / False if this is not a channel
"""
try:
if channelToCheck is None:
return False
pattern = fr'^#'
isChannel = re.findall(pattern, channelToCheck)
pattern = fr'^#'
isChannel = re.findall(pattern, channelToCheck)
if not isChannel:
return False
else:
return True
if not isChannel:
return False
else:
return True
except TypeError as te:
self.logs.error(f'TypeError: [{channelToCheck}] - {te}')
except Exception as err:
self.logs.error(f'Error Not defined: {err}')

249
core/connection.py Normal file
View File

@@ -0,0 +1,249 @@
import socket, ssl
from ssl import SSLSocket
from core.loadConf import Config
from core.Model import Clones
from core.base import Base
from typing import Union
class Connection:
def __init__(self, server_port: int, nickname: str, username: str, realname: str, channels:list[str], CloneObject: Clones, ssl:bool = False) -> None:
self.Config = Config().ConfigObject
self.Base = Base(self.Config)
self.IrcSocket: Union[socket.socket, SSLSocket] = None
self.nickname = nickname
self.username = username
self.realname = realname
self.clone_chanlog = self.Config.SALON_CLONES
self.channels:list[str] = channels
self.CHARSET = ['utf-8', 'iso-8859-1']
self.Clones = CloneObject
self.signal: bool = True
for clone in self.Clones.UID_CLONE_DB:
if clone.nickname == nickname:
self.currentCloneObject = clone
self.create_socket(self.Config.SERVEUR_IP, self.Config.SERVEUR_HOSTNAME, server_port, ssl)
self.send_connection_information_to_server(self.IrcSocket)
self.connect()
def create_socket(self, server_ip: str, server_hostname: str, server_port: int, ssl: bool = False) -> bool:
try:
soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM or socket.SOCK_NONBLOCK)
connexion_information = (server_ip, server_port)
if ssl:
# Créer un object ssl
ssl_context = self.__ssl_context()
ssl_connexion = ssl_context.wrap_socket(soc, server_hostname=server_hostname)
ssl_connexion.connect(connexion_information)
self.IrcSocket:SSLSocket = ssl_connexion
self.SSL_VERSION = self.IrcSocket.version()
self.Base.logs.debug(f'> Connexion en mode SSL : Version = {self.SSL_VERSION}')
else:
soc.connect(connexion_information)
self.IrcSocket:socket.socket = soc
self.Base.logs.debug(f'> Connexion en mode normal')
return True
except ssl.SSLEOFError as soe:
self.Base.logs.critical(f"SSLEOFError __create_socket: {soe} - {soc.fileno()}")
return False
except ssl.SSLError as se:
self.Base.logs.critical(f"SSLError __create_socket: {se} - {soc.fileno()}")
return False
except OSError as oe:
self.Base.logs.critical(f"OSError __create_socket: {oe} - {soc.fileno()}")
return False
except AttributeError as ae:
self.Base.logs.critical(f"AttributeError __create_socket: {ae} - {soc.fileno()}")
return False
def send2socket(self, send_message:str, disconnect: bool = False) -> None:
"""Envoit les commandes à envoyer au serveur.
Args:
string (Str): contient la commande à envoyer au serveur.
"""
try:
with self.Base.lock:
self.IrcSocket.send(f"{send_message}\r\n".encode(self.CHARSET[0]))
self.Base.logs.debug(f'<<{self.currentCloneObject.nickname}>>: {send_message}')
except UnicodeDecodeError:
self.Base.logs.error(f'Decode Error try iso-8859-1 - message: {send_message}')
self.IrcSocket.send(f"{send_message}\r\n".encode(self.CHARSET[1],'replace'))
except UnicodeEncodeError:
self.Base.logs.error(f'Encode Error try iso-8859-1 - message: {send_message}')
self.IrcSocket.send(f"{send_message}\r\n".encode(self.CHARSET[1],'replace'))
except AssertionError as ae:
self.Base.logs.warning(f'Assertion Error {ae} - message: {send_message}')
except ssl.SSLEOFError as soe:
self.Base.logs.error(f"SSLEOFError: {soe} - {send_message}")
except ssl.SSLError as se:
self.Base.logs.error(f"SSLError: {se} - {send_message}")
except OSError as oe:
self.Base.logs.error(f"OSError: {oe} - {send_message}")
def send_connection_information_to_server(self, writer:Union[socket.socket, SSLSocket]) -> None:
"""Créer le link et envoyer les informations nécessaires pour la
connexion au serveur.
Args:
writer (StreamWriter): permet l'envoi des informations au serveur.
"""
try:
nickname = self.nickname
username = self.username
realname = self.realname
# Envoyer un message d'identification
writer.send(f"USER {nickname} {username} {username} {nickname} {username} :{username}\r\n".encode('utf-8'))
writer.send(f"USER {username} {username} {username} :{realname}\r\n".encode('utf-8'))
writer.send(f"NICK {nickname}\r\n".encode('utf-8'))
self.Base.logs.debug('Link information sent to the server')
return None
except AttributeError as ae:
self.Base.logs.critical(f'{ae}')
def connect(self):
try:
while self.signal:
try:
# 4072 max what the socket can grab
buffer_size = self.IrcSocket.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
data_in_bytes = self.IrcSocket.recv(buffer_size)
data = data_in_bytes.splitlines(True)
count_bytes = len(data_in_bytes)
while count_bytes > 4070:
# If the received message is > 4070 then loop and add the value to the variable
new_data = self.IrcSocket.recv(buffer_size)
data_in_bytes += new_data
count_bytes = len(new_data)
data = data_in_bytes.splitlines(True)
if not data:
# If no data then quit the loop
break
self.parser(data)
except ssl.SSLEOFError as soe:
self.Base.logs.error(f"SSLEOFError __connect_to_irc: {soe} - {data}")
self.signal = False
except ssl.SSLError as se:
self.Base.logs.error(f"SSLError __connect_to_irc: {se} - {data}")
self.signal = False
except OSError as oe:
self.Base.logs.error(f"OSError __connect_to_irc: {oe} - {data}")
self.signal = False
except AssertionError as ae:
self.Base.logs.error(f'Assertion error : {ae}')
except ValueError as ve:
self.Base.logs.error(f'Value Error : {ve}')
except ssl.SSLEOFError as soe:
self.Base.logs.error(f"OS Error __connect_to_irc: {soe}")
except AttributeError as atte:
self.Base.logs.critical(f"{atte}")
except Exception as e:
self.Base.logs.error(f"Exception: {e}")
finally:
self.IrcSocket.shutdown(socket.SHUT_WR)
self.IrcSocket.shutdown(socket.SHUT_RD)
self.Base.logs.info(f"<<{self.currentCloneObject.nickname}>> Clone Disconnected ...")
# self.IrcSocket.close()
def parser(self, cmd:list[bytes]):
try:
for data in cmd:
response = data.decode(self.CHARSET[0]).split()
current_clone_nickname = self.currentCloneObject.nickname
# print(response)
match response[0]:
case 'PING':
pong = str(response[1]).replace(':','')
self.send2socket(f"PONG :{pong}")
return None
case 'ERROR':
error_value = str(response[1]).replace(':','')
if error_value == 'Closing':
self.Base.logs.info(f"<<{self.currentCloneObject.nickname}>> {response} ...")
# self.signal = False
match response[1]:
case '376':
# End of MOTD
self.currentCloneObject.connected = True
for channel in self.channels:
self.send2socket(f"JOIN {channel}")
self.send2socket(f"JOIN {self.clone_chanlog}")
return None
case '422':
# Missing MOTD
self.currentCloneObject.connected = True
for channel in self.channels:
self.send2socket(f"JOIN {channel}")
self.send2socket(f"JOIN {self.clone_chanlog}")
return None
case 'PRIVMSG':
self.Base.logs.debug(f'<<{self.currentCloneObject.nickname}>> Response: {response}')
self.Base.logs.debug(f'<<{self.currentCloneObject.nickname}>> Alive: {self.currentCloneObject.alive}')
fullname = str(response[0]).replace(':', '')
nickname = fullname.split('!')[0].replace(':','')
if response[2] == current_clone_nickname:
message = []
for i in range(3, len(response)):
message.append(response[i])
final_message = ' '.join(message)
self.send2socket(f"PRIVMSG {self.clone_chanlog} :{fullname} => {final_message[1:]}")
if nickname == self.Config.SERVICE_NICKNAME:
command = str(response[3]).replace(':','')
if command == 'KILL':
self.send2socket(f'QUIT :Thanks and goodbye', disconnect=True)
if command == 'JOIN':
channel_to_join = str(response[4])
self.send2socket(f"JOIN {channel_to_join}")
if command == 'SAY':
clone_channel = str(response[4])
message = []
for i in range(5, len(response)):
message.append(response[i])
final_message = ' '.join(message)
self.send2socket(f"PRIVMSG {clone_channel} :{final_message}")
except UnicodeEncodeError:
for data in cmd:
response = data.decode(self.CHARSET[1],'replace').split()
except UnicodeDecodeError:
for data in cmd:
response = data.decode(self.CHARSET[1],'replace').split()
except AssertionError as ae:
self.Base.logs.error(f"Assertion error : {ae}")
def __ssl_context(self) -> ssl.SSLContext:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
self.Base.logs.debug(f'SSLContext initiated with verified mode {ctx.verify_mode}')
return ctx

View File

@@ -1,64 +1,158 @@
from importlib.util import find_spec
from subprocess import check_call, run, CalledProcessError
from dataclasses import dataclass
from subprocess import check_call, run, CalledProcessError, PIPE
from platform import python_version, python_version_tuple
from sys import exit
import os
class Install:
@dataclass
class CoreConfig:
install_log_file: str
unix_systemd_folder: str
service_file_name: str
service_cmd_executable: list
service_cmd_daemon_reload: list
defender_main_executable: str
python_min_version: str
python_current_version_tuple: tuple[str, str, str]
python_current_version: str
defender_install_folder: str
venv_folder: str
venv_cmd_installation: list
venv_cmd_requirements: list
venv_pip_executable: str
venv_python_executable: str
def __init__(self) -> None:
self.PYTHON_MIN_VERSION = '3.10'
self.venv_folder_name = '.pyenv'
self.cmd_venv_command = ['python3', '-m', 'venv', self.venv_folder_name]
self.module_to_install = ['sqlalchemy','psutil','requests']
self.set_configuration()
if not self.checkPythonVersion():
if not self.check_python_version():
# Tester si c'est la bonne version de python
exit("Python Version Error")
else:
if self.skip_install:
return None
# Sinon tester les dependances python et les installer avec pip
self.checkDependencies()
if self.do_install():
self.install_dependencies()
self.create_service_file()
self.print_final_message()
return None
def checkPythonVersion(self) -> bool:
def set_configuration(self):
self.skip_install = False
defender_install_folder = os.getcwd()
venv_folder = '.pyenv'
unix_user_home_directory = os.path.expanduser("~")
unix_systemd_folder = os.path.join(unix_user_home_directory, '.config', 'systemd', 'user')
defender_main_executable = os.path.join(defender_install_folder, 'main.py')
self.config = self.CoreConfig(
install_log_file='install.log',
unix_systemd_folder=unix_systemd_folder,
service_file_name='defender.service',
service_cmd_executable=['systemctl', '--user', 'start', 'defender'],
service_cmd_daemon_reload=['systemctl', '--user', 'daemon-reload'],
defender_main_executable=defender_main_executable,
python_min_version='3.10',
python_current_version_tuple=python_version_tuple(),
python_current_version=python_version(),
defender_install_folder=defender_install_folder,
venv_folder=venv_folder,
venv_cmd_installation=['python3', '-m', 'venv', venv_folder],
venv_cmd_requirements=['sqlalchemy','psutil','requests','faker'],
venv_pip_executable=f'{os.path.join(defender_install_folder, venv_folder, "bin")}{os.sep}pip',
venv_python_executable=f'{os.path.join(defender_install_folder, venv_folder, "bin")}{os.sep}python'
)
# Exclude Windows OS
if os.name == 'nt':
#print('/!\\ Skip installation /!\\')
self.skip_install = True
else:
if self.is_root():
self.skip_install = True
def is_root(self) -> bool:
if os.geteuid() != 0:
print('User without privileges ==> PASS')
return False
elif os.geteuid() == 0:
print('/!\\ Do not use root to install Defender /!\\')
exit("Do not use root to install Defender")
return True
def do_install(self) -> bool:
full_service_file_path = os.path.join(self.config.unix_systemd_folder, self.config.service_file_name)
if not os.path.exists(full_service_file_path):
print(f'/!\\ Service file does not exist /!\\')
return True
# Check if virtual env exist
if not os.path.exists(f'{os.path.join(self.config.defender_install_folder, self.config.venv_folder)}'):
self.run_subprocess(self.config.venv_cmd_installation)
print(f'/!\\ Virtual env does not exist run the install /!\\')
return True
def run_subprocess(self, command:list) -> None:
print(f'> {command}')
try:
check_call(command)
print("The command completed successfully.")
except CalledProcessError as e:
print(f"The command failed with the return code: {e.returncode}")
print(f"Try to install dependencies ...")
exit(5)
def check_python_version(self) -> bool:
"""Test si la version de python est autorisée ou non
Returns:
bool: True si la version de python est autorisé sinon False
"""
# Current system version
sys_major, sys_minor, sys_patch = python_version_tuple()
sys_major, sys_minor, sys_patch = self.config.python_current_version_tuple
# min python version required
python_required_version = self.PYTHON_MIN_VERSION.split('.')
python_required_version = self.config.python_min_version.split('.')
min_major, min_minor = tuple((python_required_version[0], python_required_version[1]))
if int(sys_major) < int(min_major):
print(f"## Your python version must be greather than or equal to {self.PYTHON_MIN_VERSION} ##")
print(f"## Your python version must be greather than or equal to {self.config.python_min_version} ##")
return False
elif (int(sys_major) <= int(min_major)) and (int(sys_minor) < int(min_minor)):
print(f"## Your python version must be greather than or equal to {self.PYTHON_MIN_VERSION} ##")
print(f"## Your python version must be greather than or equal to {self.config.python_min_version} ##")
return False
print(f"===> Version of python : {python_version()} ==> OK")
print(f"> Version of python : {self.config.python_current_version} ==> OK")
return True
def run_subprocess(self, command:list) -> None:
def check_package(self, package_name) -> bool:
print(command)
try:
check_call(command)
print("La commande s'est terminée avec succès.")
except CalledProcessError as e:
print(f"La commande a échoué avec le code de retour :{e.returncode}")
print(f"Try to install dependencies ...")
exit(5)
# Run a command in the virtual environment's Python to check if the package is installed
run([self.config.venv_python_executable, '-c', f'import {package_name}'], check=True, stdout=PIPE, stderr=PIPE)
return True
except CalledProcessError as cpe:
print(cpe)
return False
def checkDependencies(self) -> None:
def install_dependencies(self) -> None:
"""### Verifie les dépendances si elles sont installées
- Test si les modules sont installés
- Met a jour pip
@@ -67,38 +161,82 @@ class Install:
do_install = False
# Check if virtual env exist
if not os.path.exists(f'{self.venv_folder_name}'):
self.run_subprocess(self.cmd_venv_command)
if not os.path.exists(f'{os.path.join(self.config.defender_install_folder, self.config.venv_folder)}'):
self.run_subprocess(self.config.venv_cmd_installation)
do_install = True
for module in self.module_to_install:
if find_spec(module) is None:
for module in self.config.venv_cmd_requirements:
if not self.check_package(module):
do_install = True
if not do_install:
return None
print("===> Vider le cache de pip")
check_call(['pip','cache','purge'])
self.run_subprocess([self.config.venv_pip_executable, 'cache', 'purge'])
print("===> Verifier si pip est a jour")
check_call(['python', '-m', 'pip', 'install', '--upgrade', 'pip'])
self.run_subprocess([self.config.venv_python_executable, '-m', 'pip', 'install', '--upgrade', 'pip'])
if find_spec('greenlet') is None:
check_call(['pip','install', '--only-binary', ':all:', 'greenlet'])
if not self.check_package('greenlet'):
self.run_subprocess([self.config.venv_pip_executable, 'install', '--only-binary', ':all:', 'greenlet'])
print('====> Module Greenlet installé')
for module in self.module_to_install:
if find_spec(module) is None:
for module in self.config.venv_cmd_requirements:
if not self.check_package(module):
print("### Trying to install missing python packages ###")
check_call(['pip','install', module])
self.run_subprocess([self.config.venv_pip_executable, 'install', module])
print(f"====> Module {module} installé")
else:
print(f"==> {module} already installed")
print(f"#"*12)
def create_service_file(self) -> None:
full_service_file_path = os.path.join(self.config.unix_systemd_folder, self.config.service_file_name)
if os.path.exists(full_service_file_path):
print(f'/!\\ Service file already exist /!\\')
self.run_subprocess(self.config.service_cmd_executable)
return None
contain = f'''[Unit]
Description=Defender IRC Service
[Service]
ExecStart={self.config.venv_python_executable} {self.config.defender_main_executable}
WorkingDirectory={self.config.defender_install_folder}
SyslogIdentifier=Defender
Restart=on-failure
[Install]
WantedBy=default.target
'''
# Check if user systemd is available (.config/systemd/user/)
if not os.path.exists(self.config.unix_systemd_folder):
self.run_subprocess(['mkdir', '-p', self.config.unix_systemd_folder])
with open(full_service_file_path, 'w+') as servicefile:
servicefile.write(contain)
servicefile.close()
print(f'Service file generated with current configuration')
print(f'Running Defender IRC Service ...')
self.run_subprocess(self.config.service_cmd_daemon_reload)
self.run_subprocess(self.config.service_cmd_executable)
else:
with open(full_service_file_path, 'w+') as servicefile:
servicefile.write(contain)
servicefile.close()
print(f'Service file generated with current configuration')
print(f'Running Defender IRC Service ...')
self.run_subprocess(self.config.service_cmd_daemon_reload)
self.run_subprocess(self.config.service_cmd_executable)
def print_final_message(self) -> None:
print(f"#"*24)
print("Installation complete ...")
print("You must change environment using the command below")
print(f"source {self.venv_folder_name}{os.sep}bin{os.sep}activate")
print(f"#"*12)
exit(1)
print("If the configuration is correct, then you must see your service connected to your irc server")
print(f"If any issue, you can see the log file for debug {self.config.defender_install_folder}{os.sep}logs{os.sep}defender.log")
print(f"#"*24)
exit(1)

View File

@@ -1,9 +1,9 @@
import ssl, re, importlib, sys, time, threading, socket
import ssl, re, importlib, sys, time, threading, socket, traceback
from ssl import SSLSocket
from datetime import datetime, timedelta
from typing import Union
from typing import Union, Literal
from core.loadConf import Config
from core.Model import User, Admin, Channel
from core.Model import User, Admin, Channel, Clones
from core.base import Base
class Irc:
@@ -12,7 +12,6 @@ class Irc:
self.defender_connexion_datetime = datetime.now() # Date et heure de la premiere connexion de Defender
self.first_score: int = 100
self.db_chan = [] # Definir la variable qui contiendra la liste des salons
self.loaded_classes:dict[str, 'Irc'] = {} # Definir la variable qui contiendra la liste modules chargés
self.beat = 30 # Lancer toutes les 30 secondes des actions de nettoyages
self.hb_active = True # Heartbeat active
@@ -22,6 +21,8 @@ class Irc:
self.INIT = 1 # Variable d'intialisation | 1 -> indique si le programme est en cours d'initialisation
self.RESTART = 0 # Variable pour le redemarrage du bot | 0 -> indique que le programme n'es pas en cours de redemarrage
self.CHARSET = ['utf-8', 'iso-8859-1'] # Charset utiliser pour décoder/encoder les messages
"""0: utf-8 | 1: iso-8859-1"""
self.SSL_VERSION = None # Version SSL
self.Config = Config().ConfigObject
@@ -44,6 +45,9 @@ class Irc:
self.User = User(self.Base)
self.Admin = Admin(self.Base)
self.Channel = Channel(self.Base)
self.Clones = Clones(self.Base)
self.__create_table()
self.Base.create_thread(func=self.heartbeat, func_args=(self.beat, ))
##############################################
@@ -90,7 +94,7 @@ class Irc:
except OSError as oe:
self.Base.logs.critical(f"OSError __create_socket: {oe} - {soc.fileno()}")
except AttributeError as ae:
self.Base.logs.critical(f"OSError __create_socket: {oe} - {soc.fileno()}")
self.Base.logs.critical(f"AttributeError __create_socket: {ae} - {soc.fileno()}")
def __ssl_context(self) -> ssl.SSLContext:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
@@ -106,6 +110,7 @@ class Irc:
self.ircObject = ircInstance # créer une copie de l'instance Irc
self.__link(self.IrcSocket) # établir la connexion au serveur IRC
self.signal = True # Une variable pour initier la boucle infinie
self.__join_saved_channels() # Join existing channels
self.load_existing_modules() # Charger les modules existant dans la base de données
while self.signal:
@@ -126,6 +131,7 @@ class Irc:
self.__create_socket()
self.__link(self.IrcSocket)
self.__join_saved_channels()
self.load_existing_modules()
self.RESTART = 0
@@ -158,17 +164,19 @@ class Irc:
self.IrcSocket.shutdown(socket.SHUT_RDWR)
self.IrcSocket.close()
self.Base.logs.info("--> Fermeture de Defender ...")
sys.exit(0)
except AssertionError as ae:
self.Base.logs.error(f'Assertion error : {ae}')
self.Base.logs.error(f'AssertionError: {ae}')
except ValueError as ve:
self.Base.logs.error(f'Value Error : {ve}')
self.Base.logs.error(f'ValueError: {ve}')
except ssl.SSLEOFError as soe:
self.Base.logs.error(f"OS Error __connect_to_irc: {soe}")
self.Base.logs.error(f"SSLEOFError: {soe}")
except AttributeError as atte:
self.Base.logs.critical(f"{atte}")
# except Exception as e:
# self.debug(f"Exception: {e}")
self.Base.logs.critical(f"AttributeError: {atte}")
except Exception as e:
self.Base.logs.critical(f"Exception: {e}")
self.Base.logs.critical(traceback.print_exc())
def __link(self, writer:Union[socket.socket, SSLSocket]) -> None:
"""Créer le link et envoyer les informations nécessaires pour la
@@ -196,18 +204,23 @@ class Irc:
version = self.Config.current_version
unixtime = self.Base.get_unixtime()
charset = self.CHARSET[0]
# Envoyer un message d'identification
writer.send(f":{sid} PASS :{password}\r\n".encode('utf-8'))
writer.send(f":{sid} PROTOCTL NICKv2 VHP UMODE2 NICKIP SJOIN SJOIN2 SJ3 NOQUIT TKLEXT MLOCK SID MTAGS\r\n".encode('utf-8'))
writer.send(f":{sid} PROTOCTL EAUTH={link},,,{service_name}-v{version}\r\n".encode('utf-8'))
writer.send(f":{sid} PROTOCTL SID={sid}\r\n".encode('utf-8'))
writer.send(f":{sid} SERVER {link} 1 :{info}\r\n".encode('utf-8'))
writer.send(f":{sid} {nickname} :Reserved for services\r\n".encode('utf-8'))
writer.send(f":{sid} UID {nickname} 1 {unixtime} {username} {host} {service_id} * {smodes} * * * :{realname}\r\n".encode('utf-8'))
writer.send(f":{sid} SJOIN {unixtime} {chan} + :{service_id}\r\n".encode('utf-8'))
writer.send(f":{sid} MODE {chan} +{cmodes}\r\n".encode('utf-8'))
writer.send(f":{service_id} SAMODE {chan} +{umodes} {nickname}\r\n".encode('utf-8'))
writer.send(f":{sid} PASS :{password}\r\n".encode(charset))
writer.send(f":{sid} PROTOCTL SID NOQUIT NICKv2 SJOIN SJ3 NICKIP TKLEXT2 NEXTBANS CLK EXTSWHOIS MLOCK MTAGS\r\n".encode(charset))
# writer.send(f":{sid} PROTOCTL NICKv2 VHP UMODE2 NICKIP SJOIN SJOIN2 SJ3 NOQUIT TKLEXT MLOCK SID MTAGS\r\n".encode(charset))
writer.send(f":{sid} PROTOCTL EAUTH={link},,,{service_name}-v{version}\r\n".encode(charset))
writer.send(f":{sid} PROTOCTL SID={sid}\r\n".encode(charset))
writer.send(f":{sid} SERVER {link} 1 :{info}\r\n".encode(charset))
writer.send(f":{sid} {nickname} :Reserved for services\r\n".encode(charset))
#writer.send(f":{sid} UID {nickname} 1 {unixtime} {username} {host} {service_id} * {smodes} * * * :{realname}\r\n".encode(charset))
writer.send(f":{sid} UID {nickname} 1 {unixtime} {username} {host} {service_id} * {smodes} * * fwAAAQ== :{realname}\r\n".encode(charset))
writer.send(f":{sid} SJOIN {unixtime} {chan} + :{service_id}\r\n".encode(charset))
writer.send(f":{sid} TKL + Q * {nickname} {host} 0 {unixtime} :Reserved for services\r\n".encode(charset))
writer.send(f":{service_id} MODE {chan} +{cmodes}\r\n".encode(charset))
writer.send(f":{service_id} MODE {chan} +{umodes} {service_id}\r\n".encode(charset))
self.Base.logs.debug('Link information sent to the server')
@@ -215,6 +228,19 @@ class Irc:
except AttributeError as ae:
self.Base.logs.critical(f'{ae}')
def __join_saved_channels(self) -> None:
"""## Joining saved channels"""
core_table = self.Config.table_channel
query = f'''SELECT distinct channel_name FROM {core_table}'''
exec_query = self.Base.db_execute_query(query)
result_query = exec_query.fetchall()
if result_query:
for chan_name in result_query:
chan = chan_name[0]
self.send2socket(f":{self.Config.SERVEUR_ID} SJOIN {self.Base.get_unixtime()} {chan} + :{self.Config.SERVICE_ID}")
def send2socket(self, send_message:str) -> None:
"""Envoit les commandes à envoyer au serveur.
@@ -249,14 +275,20 @@ class Irc:
response = data.decode(self.CHARSET[0]).split()
self.cmd(response)
except UnicodeEncodeError:
except UnicodeEncodeError as ue:
for data in responses:
response = data.decode(self.CHARSET[1],'replace').split()
self.cmd(response)
except UnicodeDecodeError:
self.Base.logs.error(f'UnicodeEncodeError: {ue}')
self.Base.logs.error(response)
except UnicodeDecodeError as ud:
for data in responses:
response = data.decode(self.CHARSET[1],'replace').split()
self.cmd(response)
self.Base.logs.error(f'UnicodeDecodeError: {ud}')
self.Base.logs.error(response)
except AssertionError as ae:
self.Base.logs.error(f"Assertion error : {ae}")
@@ -268,13 +300,18 @@ class Irc:
# FIN CONNEXION IRC #
##############################################
def __create_table(self):
"""## Create core tables
"""
pass
def load_existing_modules(self) -> None:
"""Charge les modules qui existe déja dans la base de données
Returns:
None: Aucun retour requis, elle charge puis c'est tout
"""
result = self.Base.db_execute_query(f"SELECT module FROM {self.Config.table_module}")
result = self.Base.db_execute_query(f"SELECT module_name FROM {self.Config.table_module}")
for r in result.fetchall():
self.load_module('sys', r[0], True)
@@ -416,6 +453,7 @@ class Irc:
except ModuleNotFoundError as moduleNotFound:
self.Base.logs.error(f"MODULE_NOT_FOUND: {moduleNotFound}")
self.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} :[ {self.Config.CONFIG_COLOR['rouge']}MODULE_NOT_FOUND{self.Config.CONFIG_COLOR['noire']} ]: {moduleNotFound}")
self.Base.db_delete_module(module_name)
except Exception as e:
self.Base.logs.error(f"Something went wrong with a module you want to load : {e}")
self.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} :[ {self.Config.CONFIG_COLOR['rouge']}ERROR{self.Config.CONFIG_COLOR['noire']} ]: {e}")
@@ -546,7 +584,6 @@ class Irc:
return None
def thread_check_for_new_version(self, fromuser: str) -> None:
dnickname = self.Config.SERVICE_NICKNAME
if self.Base.check_for_new_version(True):
@@ -554,38 +591,42 @@ class Irc:
self.send2socket(f':{dnickname} NOTICE {fromuser} : Please run (git pull origin main) in the current folder')
else:
self.send2socket(f':{dnickname} NOTICE {fromuser} : You have the latest version of defender')
return None
def cmd(self, data:list) -> None:
def cmd(self, data: list[str]) -> None:
"""Parse server response
Args:
data (list[str]): Server response splitted in a list
"""
try:
original_response: list[str] = data.copy()
cmd_to_send:list[str] = data.copy()
cmd = data.copy()
interm_response: list[str] = data.copy()
"""This the original without first value"""
cmd_to_debug = data.copy()
cmd_to_debug.pop(0)
interm_response.pop(0)
if len(cmd) == 0 or len(cmd) == 1:
self.Base.logs.warning(f'Size ({str(len(cmd))}) - {cmd}')
if len(original_response) == 0 or len(original_response) == 1:
self.Base.logs.warning(f'Size ({str(len(original_response))}) - {original_response}')
return False
# self.debug(cmd_to_debug)
if len(data) == 7:
if data[2] == 'PRIVMSG' and data[4] == ':auth':
data_copy = data.copy()
if len(original_response) == 7:
if original_response[2] == 'PRIVMSG' and original_response[4] == ':auth':
data_copy = original_response.copy()
data_copy[6] = '**********'
self.Base.logs.debug(data_copy)
else:
self.Base.logs.debug(data)
self.Base.logs.debug(original_response)
else:
self.Base.logs.debug(data)
self.Base.logs.debug(original_response)
match cmd[0]:
match original_response[0]:
case 'PING':
# Sending PONG response to the serveur
pong = str(cmd[1]).replace(':','')
pong = str(original_response[1]).replace(':','')
self.send2socket(f"PONG :{pong}")
return None
@@ -594,19 +635,19 @@ class Irc:
# 'PREFIX=(qaohv)~&@%+', 'SID=001', 'MLOCK', 'TS=1703793941', 'EXTSWHOIS']
# GET SERVER ID HOST
if len(cmd) > 5:
if '=' in cmd[5]:
serveur_hosting_id = str(cmd[5]).split('=')
if len(original_response) > 5:
if '=' in original_response[5]:
serveur_hosting_id = str(original_response[5]).split('=')
self.HSID = serveur_hosting_id[1]
return False
case _:
pass
if len(cmd) < 2:
if len(original_response) < 2:
return False
match cmd[1]:
match original_response[1]:
case 'SLOG':
# self.Base.scan_ports(cmd[7])
@@ -617,20 +658,18 @@ class Irc:
case 'REPUTATION':
# :001 REPUTATION 91.168.141.239 118
try:
# if self.Config.ABUSEIPDB == 1:
# self.Base.create_thread(self.abuseipdb_scan, (cmd[2], ))
self.first_connexion_ip = cmd[2]
self.first_connexion_ip = original_response[2]
self.first_score = 0
if str(cmd[3]).find('*') != -1:
if str(original_response[3]).find('*') != -1:
# If * available, it means that an ircop changed the repurtation score
# means also that the user exist will try to update all users with same IP
self.first_score = int(str(cmd[3]).replace('*',''))
self.first_score = int(str(original_response[3]).replace('*',''))
for user in self.User.UID_DB:
if user.remote_ip == self.first_connexion_ip:
user.score_connexion = self.first_score
else:
self.first_score = int(cmd[3])
self.first_score = int(original_response[3])
# Possibilité de déclancher les bans a ce niveau.
except IndexError as ie:
@@ -653,7 +692,7 @@ class Irc:
case 'EOS':
hsid = str(cmd[0]).replace(':','')
hsid = str(original_response[0]).replace(':','')
if hsid == self.HSID:
if self.INIT == 1:
current_version = self.Config.current_version
@@ -663,8 +702,6 @@ class Irc:
else:
version = f'{current_version}'
self.send2socket(f"MODE {self.Config.SERVICE_NICKNAME} +B")
self.send2socket(f"JOIN {self.Config.SERVICE_CHANLOG}")
print(f"################### DEFENDER ###################")
print(f"# SERVICE CONNECTE ")
print(f"# SERVEUR : {self.Config.SERVEUR_IP} ")
@@ -696,15 +733,15 @@ class Irc:
case _:
pass
if len(cmd) < 3:
if len(original_response) < 3:
return False
match cmd[2]:
match original_response[2]:
case 'QUIT':
# :001N1WD7L QUIT :Quit: free_znc_1
cmd.pop(0)
uid_who_quit = str(cmd[0]).replace(':', '')
uid_who_quit = str(interm_response[0]).replace(':', '')
self.User.delete(uid_who_quit)
self.Channel.delete_user_from_all_channel(uid_who_quit)
@@ -716,10 +753,8 @@ class Irc:
# ['@unrealircd.org/geoip=FR;unrealircd.org/', ':001OOU2H3', 'NICK', 'WebIrc', '1703795844']
# Changement de nickname
# Supprimer la premiere valeur de la liste
cmd.pop(0)
uid = str(cmd[0]).replace(':','')
newnickname = cmd[2]
uid = str(interm_response[0]).replace(':','')
newnickname = interm_response[2]
self.User.update(uid, newnickname)
case 'MODE':
@@ -734,27 +769,27 @@ class Irc:
# ':001T6VU3F', '001JGWB2K', '@11ZAAAAAB',
# '001F16WGR', '001X9YMGQ', '*+001DYPFGP', '@00BAAAAAJ', '001AAGOG9', '001FMFVG8', '001DAEEG7',
# '&~G:unknown-users', '"~G:websocket-users', '"~G:known-users', '"~G:webirc-users']
cmd.pop(0)
channel = str(cmd[3]).lower()
len_cmd = len(cmd)
channel = str(interm_response[3]).lower()
len_cmd = len(interm_response)
list_users:list = []
occurence = 0
start_boucle = 0
# Trouver le premier user
for i in range(len_cmd):
s: list = re.findall(fr':', cmd[i])
s: list = re.findall(fr':', interm_response[i])
if s:
occurence += 1
if occurence == 2:
start_boucle = i
# Boucle qui va ajouter l'ensemble des users (UID)
for i in range(start_boucle, len(cmd)):
parsed_UID = str(cmd[i])
for i in range(start_boucle, len(interm_response)):
parsed_UID = str(interm_response[i])
# pattern = fr'[:|@|%|\+|~|\*]*'
pattern = fr':'
parsed_UID = re.sub(pattern, '', parsed_UID)
# pattern = fr':'
# parsed_UID = re.sub(pattern, '', parsed_UID)
clean_uid = self.Base.clean_uid(parsed_UID)
if len(clean_uid) == 9:
list_users.append(parsed_UID)
@@ -769,60 +804,88 @@ class Irc:
case 'PART':
# ['@unrealircd.org/geoip=FR;unrealircd.org/userhost=50d6492c@80.214.73.44;unrealircd.org/userip=50d6492c@80.214.73.44;msgid=YSIPB9q4PcRu0EVfC9ci7y-/mZT0+Gj5FLiDSZshH5NCw;time=2024-08-15T15:35:53.772Z',
# ':001EPFBRD', 'PART', '#welcome', ':WEB', 'IRC', 'Paris']
uid = str(cmd[1]).replace(':','')
channel = str(cmd[3]).lower()
self.Channel.delete_user_from_channel(channel, uid)
try:
uid = str(interm_response[0]).replace(':','')
channel = str(interm_response[2]).lower()
self.Channel.delete_user_from_channel(channel, uid)
pass
except IndexError as ie:
self.Base.logs.error(f'Index Error: {ie}')
case 'UID':
# ['@s2s-md/geoip=cc=GB|cd=United\\sKingdom|asn=16276|asname=OVH\\sSAS;s2s-md/tls_cipher=TLSv1.3-TLS_CHACHA20_POLY1305_SHA256;s2s-md/creationtime=1721564601',
# ':001', 'UID', 'albatros', '0', '1721564597', 'albatros', 'vps-91b2f28b.vps.ovh.net',
# '001HB8G04', '0', '+iwxz', 'Clk-A62F1D18.vps.ovh.net', 'Clk-A62F1D18.vps.ovh.net', 'MyZBwg==', ':...']
if 'webirc' in cmd[0]:
isWebirc = True
else:
isWebirc = False
try:
# ['@s2s-md/geoip=cc=GB|cd=United\\sKingdom|asn=16276|asname=OVH\\sSAS;s2s-md/tls_cipher=TLSv1.3-TLS_CHACHA20_POLY1305_SHA256;s2s-md/creationtime=1721564601',
# ':001', 'UID', 'albatros', '0', '1721564597', 'albatros', 'vps-91b2f28b.vps.ovh.net',
# '001HB8G04', '0', '+iwxz', 'Clk-A62F1D18.vps.ovh.net', 'Clk-A62F1D18.vps.ovh.net', 'MyZBwg==', ':...']
uid = str(cmd[8])
nickname = str(cmd[3])
username = str(cmd[6])
hostname = str(cmd[7])
umodes = str(cmd[10])
vhost = str(cmd[11])
if not 'S' in umodes:
remote_ip = self.Base.decode_ip(str(cmd[13]))
else:
remote_ip = '127.0.0.1'
isWebirc = True if 'webirc' in original_response[0] else False
isWebsocket = True if 'websocket' in original_response[0] else False
score_connexion = self.first_score
uid = str(original_response[8])
nickname = str(original_response[3])
username = str(original_response[6])
hostname = str(original_response[7])
umodes = str(original_response[10])
vhost = str(original_response[11])
self.User.insert(
self.User.UserModel(
uid=uid,
nickname=nickname,
username=username,
hostname=hostname,
umodes=umodes,
vhost=vhost,
isWebirc=isWebirc,
remote_ip=remote_ip,
score_connexion=score_connexion,
connexion_datetime=datetime.now()
if not 'S' in umodes:
remote_ip = self.Base.decode_ip(str(original_response[13]))
else:
remote_ip = '127.0.0.1'
# extract realname
realname_list = []
for i in range(14, len(original_response)):
realname_list.append(original_response[i])
realname = ' '.join(realname_list)[1:]
# Extract Geoip information
pattern = r'^.*geoip=cc=(\S{2}).*$'
geoip_match = re.match(pattern, original_response[0])
if geoip_match:
geoip = geoip_match.group(1)
else:
geoip = None
score_connexion = self.first_score
self.User.insert(
self.User.UserModel(
uid=uid,
nickname=nickname,
username=username,
realname=realname,
hostname=hostname,
umodes=umodes,
vhost=vhost,
isWebirc=isWebirc,
isWebsocket=isWebsocket,
remote_ip=remote_ip,
geoip=geoip,
score_connexion=score_connexion,
connexion_datetime=datetime.now()
)
)
)
for classe_name, classe_object in self.loaded_classes.items():
classe_object.cmd(cmd_to_send)
for classe_name, classe_object in self.loaded_classes.items():
classe_object.cmd(original_response)
except Exception as err:
self.Base.logs.error(f'General Error: {err}')
case 'PRIVMSG':
try:
# Supprimer la premiere valeur
cmd.pop(0)
cmd = interm_response.copy()
get_uid_or_nickname = str(cmd[0].replace(':',''))
user_trigger = self.User.get_nickname(get_uid_or_nickname)
dnickname = self.Config.SERVICE_NICKNAME
if len(cmd) == 6:
if cmd[1] == 'PRIVMSG' and str(cmd[3]).replace('.','') == ':auth':
if cmd[1] == 'PRIVMSG' and str(cmd[3]).replace(self.Config.SERVICE_PREFIX,'') == ':auth':
cmd_copy = cmd.copy()
cmd_copy[5] = '**********'
self.Base.logs.info(cmd_copy)
@@ -830,10 +893,6 @@ class Irc:
self.Base.logs.info(cmd)
else:
self.Base.logs.info(f'{cmd}')
# user_trigger = get_user.split('!')[0]
# user_trigger = self.get_nickname(get_uid_or_nickname)
user_trigger = self.User.get_nickname(get_uid_or_nickname)
dnickname = self.Config.SERVICE_NICKNAME
pattern = fr'(:\{self.Config.SERVICE_PREFIX})(.*)$'
hcmds = re.search(pattern, ' '.join(cmd)) # va matcher avec tout les caractéres aprés le .
@@ -850,7 +909,8 @@ class Irc:
cmd_to_send = convert_to_string.replace(':','')
self.Base.log_cmd(user_trigger, cmd_to_send)
self._hcmds(user_trigger, arg, cmd)
fromchannel = str(cmd[2]).lower() if self.Base.Is_Channel(cmd[2]) else None
self._hcmds(user_trigger, fromchannel, arg, cmd)
if cmd[2] == self.Config.SERVICE_ID:
pattern = fr'^:.*?:(.*)$'
@@ -882,13 +942,17 @@ class Irc:
return False
if not arg[0].lower() in self.commands:
self.debug(f"This command {arg[0]} is not available")
self.Base.logs.debug(f"This command {arg[0]} sent by {user_trigger} is not available")
return False
cmd_to_send = convert_to_string.replace(':','')
self.Base.log_cmd(self.User.get_nickname(user_trigger), cmd_to_send)
self.Base.log_cmd(user_trigger, cmd_to_send)
self._hcmds(user_trigger, arg, cmd)
fromchannel = None
if len(arg) >= 2:
fromchannel = str(arg[1]).lower() if self.Base.Is_Channel(arg[1]) else None
self._hcmds(user_trigger, fromchannel, arg, cmd)
except IndexError as io:
self.Base.logs.error(f'{io}')
@@ -896,15 +960,26 @@ class Irc:
case _:
pass
if cmd[2] != 'UID':
if original_response[2] != 'UID':
# Envoyer la commande aux classes dynamiquement chargées
for classe_name, classe_object in self.loaded_classes.items():
classe_object.cmd(cmd_to_send)
classe_object.cmd(original_response)
except IndexError as ie:
self.Base.logs.error(f"{ie} / {cmd} / length {str(len(cmd))}")
self.Base.logs.error(f"{ie} / {original_response} / length {str(len(original_response))}")
def _hcmds(self, user: str, cmd:list, fullcmd: list = []) -> None:
def _hcmds(self, user: str, channel: Union[str, None], cmd: list, fullcmd: list = []) -> None:
"""_summary_
Args:
user (str): The user who sent the query
channel (Union[str, None]): If the command contain the channel
cmd (list): The defender cmd
fullcmd (list, optional): The full list of the cmd coming from PRIVMS. Defaults to [].
Returns:
None: Nothing to return
"""
fromuser = self.User.get_nickname(user) # Nickname qui a lancé la commande
uid = self.User.get_uid(fromuser) # Récuperer le uid de l'utilisateur
@@ -925,7 +1000,7 @@ class Irc:
# Envoyer la commande aux classes dynamiquement chargées
if command != 'notallowed':
for classe_name, classe_object in self.loaded_classes.items():
classe_object._hcmds(user, cmd, fullcmd)
classe_object._hcmds(user, channel, cmd, fullcmd)
match command:
@@ -1229,9 +1304,6 @@ class Irc:
reason.append(cmd[i])
final_reason = ' '.join(reason)
# self.db_uid.clear() #Vider UID_DB
# self.db_chan = [] #Vider les salons
self.User.UID_DB.clear() # Clear User Object
self.Channel.UID_CHANNEL_DB.clear() # Clear Channel Object
@@ -1250,13 +1322,9 @@ class Irc:
self.Base.logs.debug(self.loaded_classes)
all_modules = self.Base.get_all_modules()
results = self.Base.db_execute_query(f'SELECT module FROM {self.Config.table_module}')
results = self.Base.db_execute_query(f'SELECT module_name FROM {self.Config.table_module}')
results = results.fetchall()
# if len(results) == 0:
# self.send2socket(f":{dnickname} NOTICE {fromuser} :There is no module loaded")
# return False
found = False
for module in all_modules:
@@ -1271,23 +1339,18 @@ class Irc:
found = False
# for r in results:
# self.send2socket(f":{dnickname} NOTICE {fromuser} :{r[0]} - {self.Config.CONFIG_COLOR['verte']}Loaded{self.Config.CONFIG_COLOR['nogc']}")
case 'show_timers':
if self.Base.running_timers:
self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :{self.Base.running_timers}")
for the_timer in self.Base.running_timers:
self.send2socket(f":{dnickname} NOTICE {fromuser} :>> {the_timer.getName()} - {the_timer.is_alive()}")
else:
self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :Aucun timers en cours d'execution")
self.send2socket(f":{dnickname} NOTICE {fromuser} :Aucun timers en cours d'execution")
case 'show_threads':
running_thread_name:list = []
for thread in self.Base.running_threads:
running_thread_name.append(f"{thread.getName()} ({thread.is_alive()})")
self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :{str(running_thread_name)}")
self.send2socket(f":{dnickname} NOTICE {fromuser} :>> {thread.getName()} ({thread.is_alive()})")
case 'show_channels':
@@ -1302,7 +1365,7 @@ class Irc:
case 'show_users':
for db_user in self.User.UID_DB:
self.send2socket(f":{dnickname} NOTICE {fromuser} :UID : {db_user.uid} - isWebirc: {db_user.isWebirc} - Nickname: {db_user.nickname} - Connection: {db_user.connexion_datetime}")
self.send2socket(f":{dnickname} NOTICE {fromuser} :UID : {db_user.uid} - isWebirc: {db_user.isWebirc} - isWebSocket: {db_user.isWebsocket} - Nickname: {db_user.nickname} - Connection: {db_user.connexion_datetime}")
case 'show_admins':
for db_admin in self.Admin.UID_ADMIN_DB:

View File

@@ -1,6 +1,6 @@
import json
import json, sys
from os import sep
from typing import Union
from typing import Union, Literal
from dataclasses import dataclass, field
##########################################
@@ -11,56 +11,131 @@ from dataclasses import dataclass, field
class ConfigDataModel:
SERVEUR_IP: str
SERVEUR_HOSTNAME: str # Le hostname du serveur IRC
SERVEUR_LINK: str # Host attendu par votre IRCd (ex. dans votre link block pour Unrealircd)
SERVEUR_PORT: int # Port du link
SERVEUR_PASSWORD: str # Mot de passe du link (Privilégiez argon2 sur Unrealircd)
SERVEUR_ID: str # SID (identification) du bot en tant que Services
SERVEUR_SSL: bool # Activer la connexion SSL
"""Server public IP (could be 127.0.0.1 localhost)"""
SERVICE_NAME: str # Le nom du service
SERVICE_NICKNAME: str # Nick du bot sur IRC
SERVICE_REALNAME: str # Realname du bot
SERVICE_USERNAME: str # Ident du bot
SERVICE_HOST: str # Host du bot
SERVICE_INFO: str # swhois du bot
SERVICE_CHANLOG: str # Salon des logs et autres messages issus du bot
SERVICE_SMODES: str # Mode du service
SERVICE_CMODES: str # Mode du salon (#ChanLog) que le bot appliquera à son entrée
SERVICE_UMODES: str # Mode que le bot pourra se donner à sa connexion au salon chanlog
SERVICE_PREFIX: str # Prefix pour envoyer les commandes au bot
SERVICE_ID: str = field(init=False) # L'identifiant du service
SERVEUR_HOSTNAME: str
"""IRC Server Hostname (your.hostname.extension)"""
OWNER: str # Identifiant du compte admin
PASSWORD: str # Mot de passe du compte admin
SERVEUR_LINK: str
"""The link hostname (should be the same as your unrealircd link block)"""
SALON_JAIL: str # Salon pot de miel
SALON_JAIL_MODES: str # Mode du salon pot de miel
SALON_LIBERER: str # Le salon ou sera envoyé l'utilisateur clean
SERVEUR_PORT: int
"""Server port as configured in your unrealircd link block"""
API_TIMEOUT: int # Timeout des api's
SERVEUR_PASSWORD: str
"""Your link password"""
PORTS_TO_SCAN: list # Liste des ports a scanné pour une detection de proxy
WHITELISTED_IP: list # IP a ne pas scanner
GLINE_DURATION: str # La durée du gline
SERVEUR_ID: str
"""Service identification could be Z01 should be unique"""
DEBUG_LEVEL: int # Le niveau des logs DEBUG 10 | INFO 20 | WARNING 30 | ERROR 40 | CRITICAL 50
SERVEUR_SSL: bool
"""Activate SSL connexion"""
SERVICE_NAME: str
"""Service name (Ex. Defender)"""
SERVICE_NICKNAME: str
"""Nickname of the service (Ex. Defender)"""
SERVICE_REALNAME: str
"""Realname of the service"""
SERVICE_USERNAME: str
"""Username of the service"""
SERVICE_HOST: str
"""The service hostname"""
SERVICE_INFO: str
"""Swhois of the service"""
SERVICE_CHANLOG: str
"""The channel used by the service (ex. #services)"""
SERVICE_SMODES: str
"""The service mode (ex. +ioqBS)"""
SERVICE_CMODES: str
"""The mode of the log channel (ex. ntsO)"""
SERVICE_UMODES: str
"""The mode of the service when joining chanlog (ex. o, the service will be operator in the chanlog)"""
SERVICE_PREFIX: str
"""The default prefix to communicate with the service"""
SERVICE_ID: str = field(init=False)
"""The service unique ID"""
OWNER: str
"""The nickname of the admin of the service"""
PASSWORD: str
"""The password of the admin of the service"""
SALON_JAIL: str
"""The JAIL channel (ex. #jail)"""
SALON_JAIL_MODES: str
"""The jail channel modes (ex. sS)"""
SALON_LIBERER: str
"""Channel where the nickname will be released"""
SALON_CLONES: str
"""Channel to host clones"""
API_TIMEOUT: int
"""Default api timeout in second"""
PORTS_TO_SCAN: list
"""List of ports to scan available for proxy_scan in the mod_defender module"""
WHITELISTED_IP: list
"""List of remote IP to don't scan"""
GLINE_DURATION: str
"""Gline duration"""
DEBUG_LEVEL:Literal[10, 20, 30, 40, 50]
"""Logs level: DEBUG 10 | INFO 20 | WARNING 30 | ERROR 40 | CRITICAL 50"""
CONFIG_COLOR: dict[str, str]
table_admin: str
"""Admin table"""
table_commande: str
"""Core command table"""
table_log: str
"""Core log table"""
table_module: str
"""Core module table"""
table_config: str
"""Core configuration table"""
table_channel: str
"""Core channel table"""
current_version: str
"""Current version of Defender"""
latest_version: str
"""The Latest version fetched from github"""
db_name: str
"""The database name"""
db_path: str
"""The database path"""
def __post_init__(self):
# Initialiser SERVICE_ID après la création de l'objet
self.SERVICE_ID:str = f"{self.SERVEUR_ID}AAAAAB"
"""The service ID which is SERVEUR_ID and AAAAAB"""
class Config:
@@ -70,16 +145,21 @@ class Config:
return None
def __load_json_service_configuration(self):
try:
conf_filename = f'core{sep}configuration.json'
with open(conf_filename, 'r') as configuration_data:
configuration:dict[str, Union[str, int, list, dict]] = json.load(configuration_data)
conf_filename = f'core{sep}configuration.json'
with open(conf_filename, 'r') as configuration_data:
configuration:dict[str, Union[str, int, list, dict]] = json.load(configuration_data)
for key, value in configuration['CONFIG_COLOR'].items():
configuration['CONFIG_COLOR'][key] = str(value).encode('utf-8').decode('unicode_escape')
return configuration
except FileNotFoundError as fe:
print(f'FileNotFound: {fe}')
print('Configuration file not found please create core/configuration.json')
sys.exit(0)
for key, value in configuration['CONFIG_COLOR'].items():
configuration['CONFIG_COLOR'][key] = str(value).encode('utf-8').decode('unicode_escape')
return configuration
def __load_service_configuration(self) -> ConfigDataModel:
import_config = self.__load_json_service_configuration()
@@ -107,16 +187,19 @@ class Config:
SALON_JAIL=import_config["SALON_JAIL"],
SALON_JAIL_MODES=import_config["SALON_JAIL_MODES"],
SALON_LIBERER=import_config["SALON_LIBERER"],
SALON_CLONES=import_config["SALON_CLONES"],
API_TIMEOUT=import_config["API_TIMEOUT"],
PORTS_TO_SCAN=import_config["PORTS_TO_SCAN"],
WHITELISTED_IP=import_config["WHITELISTED_IP"],
GLINE_DURATION=import_config["GLINE_DURATION"],
DEBUG_LEVEL=import_config["DEBUG_LEVEL"],
CONFIG_COLOR=import_config["CONFIG_COLOR"],
table_admin='sys_admins',
table_commande='sys_commandes',
table_log='sys_logs',
table_module='sys_modules',
table_admin='core_admin',
table_commande='core_command',
table_log='core_log',
table_module='core_module',
table_config='core_config',
table_channel='core_channel',
current_version='',
latest_version='',
db_name='defender',

View File

@@ -1,275 +0,0 @@
from subprocess import check_call, run, CalledProcessError, PIPE
from platform import python_version, python_version_tuple, system
from sys import exit
import os, logging, shutil
try:
import pwd
except ModuleNotFoundError as err:
print(err)
class Install:
def __init__(self) -> None:
# Python required version
self.python_min_version = '3.10'
self.log_file = 'install.log'
self.ServiceName = 'Defender'
self.venv_name = '.pyenv'
self.venv_dependencies: list[str] = ['sqlalchemy','psutil','requests']
self.install_folder = os.getcwd()
self.osname = os.name
self.system_name = system()
self.cmd_linux_requirements: list[str] = ['apt', 'install', '-y', 'python3', 'python3-pip', 'python3-venv']
self.venv_pip_full_path = os.path.join(self.venv_name, f'bin{os.sep}pip')
self.venv_python_full_path = os.path.join(self.venv_name, f'bin{os.sep}python')
self.systemd_folder = '/etc/systemd/system/'
# Init log system
self.init_log_system()
# Exclude Windows OS
if self.osname == 'nt':
print('/!\\ Windows OS is not supported by this automatic installation /!\\')
self.Logs.critical('/!\\ Windows OS is not supported by this automatic install /!\\')
print(self.system_name)
exit(5)
if not self.is_root():
exit(5)
# Get the current user
self.system_username: str = input(f'What is the user ro run defender with ? [{os.getlogin()}] : ')
if str(self.system_username).strip() == '':
self.system_username = os.getlogin()
self.get_user_information(self.system_username)
self.Logs.debug(f'The user selected is: {self.system_username}')
self.Logs.debug(f'Operating system: {self.osname}')
# Install linux dependencies
self.install_linux_dependencies()
# Check python version
self.check_python_version()
# Create systemd service file
self.create_service_file()
# Check if Env Exist | install environment | Install python dependencies
self.check_venv()
# Create and start service
if self.osname != 'nt':
self.run_subprocess(['systemctl','daemon-reload'])
self.run_subprocess(['systemctl','start', self.ServiceName])
self.run_subprocess(['systemctl','status', self.ServiceName])
# Clean the Installation
self.clean_installation()
return None
def is_installed(self) -> bool:
is_installed = False
# Check logs folder
if os.path.exists('logs'):
is_installed = True
# Check db folder
if os.path.exists('db'):
is_installed = True
return is_installed
def is_root(self) -> bool:
if os.geteuid() != 0:
print('/!\\ user must run install.py as root /!\\')
self.Logs.critical('/!\\ user must run install.py as root /!\\')
return False
elif os.geteuid() == 0:
return True
def get_user_information(self, system_user: str) -> None:
try:
username: tuple = pwd.getpwnam(system_user)
self.system_uid = username.pw_uid
self.system_gid = username.pw_gid
return None
except KeyError as ke:
self.Logs.critical(f"This user [{system_user}] doesn't exist: {ke}")
print(f"This user [{system_user}] doesn't exist: {ke}")
exit(5)
def init_log_system(self) -> None:
# Init logs object
self.Logs = logging
self.Logs.basicConfig(level=logging.DEBUG,
filename=self.log_file,
encoding='UTF-8',
format='%(asctime)s - %(levelname)s - %(filename)s - %(lineno)d - %(funcName)s - %(message)s')
self.Logs.debug('#################### STARTING INSTALLATION ####################')
return None
def clean_installation(self) -> None:
# Chown the Python Env to non user privilege
self.run_subprocess(['chown','-R', f'{self.system_username}:{self.system_username}',
f'{os.path.join(self.install_folder, self.venv_name)}'
]
)
# Chown the installation log file
self.run_subprocess(['chown','-R', f'{self.system_username}:{self.system_username}',
f'{os.path.join(self.install_folder, self.log_file)}'
]
)
return None
def run_subprocess(self, command:list) -> None:
try:
run_command = check_call(command)
self.Logs.debug(f'{command} - {run_command}')
print(f'{command} - {run_command}')
except CalledProcessError as e:
print(f"Command failed :{e.returncode}")
self.Logs.critical(f"Command failed :{e.returncode}")
exit(5)
def check_python_version(self) -> bool:
"""Test si la version de python est autorisée ou non
Returns:
bool: True si la version de python est autorisé sinon False
"""
self.Logs.debug(f'The current python version is: {python_version()}')
# Current system version
sys_major, sys_minor, sys_patch = python_version_tuple()
# min python version required
python_required_version = self.PYTHON_MIN_VERSION.split('.')
min_major, min_minor = tuple((python_required_version[0], python_required_version[1]))
if int(sys_major) < int(min_major):
print(f"## Your python version must be greather than or equal to {self.PYTHON_MIN_VERSION} ##")
self.Logs.critical(f'Your python version must be greather than or equal to {self.python_min_version}')
return False
elif (int(sys_major) <= int(min_major)) and (int(sys_minor) < int(min_minor)):
print(f"## Your python version must be greather than or equal to {self.PYTHON_MIN_VERSION} ##")
self.Logs.critical(f'Your python version must be greather than or equal to {self.python_min_version}')
return False
print(f"===> Version of python : {python_version()} ==> OK")
self.Logs.debug(f'Version of python : {python_version()} ==> OK')
return True
def check_packages(self, package_name) -> bool:
try:
# Run a command in the virtual environment's Python to check if the package is installed
run([self.venv_python_full_path, '-c', f'import {package_name}'], check=True, stdout=PIPE, stderr=PIPE)
return True
except CalledProcessError:
return False
def check_venv(self) -> bool:
if os.path.exists(self.venv_name):
# Installer les dependances
self.install_dependencies()
return True
else:
self.run_subprocess(['python3', '-m', 'venv', self.venv_name])
self.Logs.debug(f'Python Virtual env installed {self.venv_name}')
print(f'Python Virtual env installed {self.venv_name}')
self.install_dependencies()
return False
def create_service_file(self) -> None:
if self.systemd_folder is None:
# If Windows, do not install systemd
return None
if os.path.exists(f'{self.systemd_folder}{os.sep}{self.ServiceName}.service'):
print(f'/!\\ Service already created in the system /!\\')
self.Logs.warning('/!\\ Service already created in the system /!\\')
print(f'The service file will be regenerated')
self.Logs.warning('The service file will be regenerated')
contain = f'''[Unit]
Description={self.ServiceName} IRC Service
[Service]
User={self.system_username}
ExecStart={os.path.join(self.install_folder, self.venv_python_full_path)} {os.path.join(self.install_folder, 'main.py')}
WorkingDirectory={self.install_folder}
SyslogIdentifier={self.ServiceName}
Restart=on-failure
[Install]
WantedBy=multi-user.target
'''
with open(f'{self.ServiceName}.service.generated', 'w+') as servicefile:
servicefile.write(contain)
servicefile.close()
print('Service file generated with current configuration')
self.Logs.debug('Service file generated with current configuration')
source = f'{self.install_folder}{os.sep}{self.ServiceName}.service.generated'
self.run_subprocess(['chown','-R', f'{self.system_username}:{self.system_username}', source])
destination = f'{self.systemd_folder}'
shutil.copy(source, destination)
os.rename(f'{self.systemd_folder}{os.sep}{self.ServiceName}.service.generated', f'{self.systemd_folder}{os.sep}{self.ServiceName}.service')
print(f'Service file moved to systemd folder {self.systemd_folder}')
self.Logs.debug(f'Service file moved to systemd folder {self.systemd_folder}')
def install_linux_dependencies(self) -> None:
self.run_subprocess(self.cmd_linux_requirements)
return None
def install_dependencies(self) -> None:
try:
self.run_subprocess([self.venv_pip_full_path, 'cache', 'purge'])
self.run_subprocess([self.venv_python_full_path, '-m', 'pip', 'install', '--upgrade', 'pip'])
if self.check_packages('greenlet') is None:
self.run_subprocess(
[self.venv_pip_full_path, 'install', '--only-binary', ':all:', 'greenlet']
)
for module in self.venv_dependencies:
if not self.check_packages(module):
### Trying to install missing python packages ###
self.run_subprocess([self.venv_pip_full_path, 'install', module])
else:
self.Logs.debug(f'{module} already installed')
print(f"==> {module} already installed")
except CalledProcessError as cpe:
self.Logs.critical(f'{cpe}')
Install()

340
mods/mod_clone.py Normal file
View File

@@ -0,0 +1,340 @@
from dataclasses import dataclass, fields, field
import random, faker, time
from datetime import datetime
from typing import Union
from core.irc import Irc
from core.connection import Connection
class Clone():
@dataclass
class ModConfModel:
clone_nicknames: list[str]
def __init__(self, ircInstance:Irc) -> None:
# Module name (Mandatory)
self.module_name = 'mod_' + str(self.__class__.__name__).lower()
# Add Irc Object to the module (Mandatory)
self.Irc = ircInstance
# Add Global Configuration to the module (Mandatory)
self.Config = ircInstance.Config
# Add Base object to the module (Mandatory)
self.Base = ircInstance.Base
# Add logs object to the module (Mandatory)
self.Logs = ircInstance.Base.logs
# Add User object to the module (Mandatory)
self.User = ircInstance.User
# Add Channel object to the module (Mandatory)
self.Channel = ircInstance.Channel
self.Clone = ircInstance.Clones
# Créer les nouvelles commandes du module
self.commands_level = {
1: ['clone']
}
# Init the module (Mandatory)
self.__init_module()
# Log the module
self.Logs.debug(f'Module {self.module_name} loaded ...')
def __init_module(self) -> None:
# Enrigstrer les nouvelles commandes dans le code
self.__set_commands(self.commands_level)
# Créer les tables necessaire a votre module (ce n'es pas obligatoire)
self.__create_tables()
# Load module configuration (Mandatory)
self.__load_module_configuration()
def __set_commands(self, commands:dict[int, list[str]]) -> None:
"""### Rajoute les commandes du module au programme principal
Args:
commands (list): Liste des commandes du module
"""
for level, com in commands.items():
for c in commands[level]:
if not c in self.Irc.commands:
self.Irc.commands_level[level].append(c)
self.Irc.commands.append(c)
return None
def __create_tables(self) -> None:
"""Methode qui va créer la base de donnée si elle n'existe pas.
Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module
Args:
database_name (str): Nom de la base de données ( pas d'espace dans le nom )
Returns:
None: Aucun retour n'es attendu
"""
table_channel = '''CREATE TABLE IF NOT EXISTS clone_list (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
nickname TEXT,
username TEXT
)
'''
self.Base.db_execute_query(table_channel)
return None
def __load_module_configuration(self) -> None:
"""### Load Module Configuration
"""
try:
# Variable qui va contenir les options de configuration du module Defender
self.ModConfig = self.ModConfModel(
clone_nicknames=[]
)
# Sync the configuration with core configuration (Mandatory)
# self.Base.db_sync_core_config(self.module_name, self.ModConfig)
return None
except TypeError as te:
self.Logs.critical(te)
def unload(self) -> None:
"""Cette methode sera executée a chaque désactivation ou
rechargement de module
"""
# kill all clones before unload
for clone in self.ModConfig.clone_nicknames:
self.Irc.send2socket(f':{self.Config.SERVICE_NICKNAME} PRIVMSG {clone} :KILL')
return None
def thread_change_hostname(self):
fake = faker.Faker('en_GB')
for clone in self.Clone.UID_CLONE_DB:
if not clone.vhost is None:
continue
rand_1 = fake.random_elements(['A','B','C','D','E','F','0','1','2','3','4','5','6','7','8','9'], unique=True, length=8)
rand_2 = fake.random_elements(['A','B','C','D','E','F','0','1','2','3','4','5','6','7','8','9'], unique=True, length=8)
rand_3 = fake.random_elements(['A','B','C','D','E','F','0','1','2','3','4','5','6','7','8','9'], unique=True, length=8)
rand_ip = ''.join(rand_1) + '.' + ''.join(rand_2) + '.' + ''.join(rand_3) + '.IP'
found = False
while not found:
if clone.connected:
self.Irc.send2socket(f':{self.Config.SERVICE_NICKNAME} CHGHOST {clone.nickname} {rand_ip}')
found = True
clone.vhost = rand_ip
break
def thread_create_clones(self, nickname: str, username: str, realname: str, channels: list, server_port: int, ssl: bool) -> None:
Connection(server_port=server_port, nickname=nickname, username=username, realname=realname, channels=channels, CloneObject=self.Clone, ssl=ssl)
return None
def thread_join_channels(self, channel_name: str, wait: float, clone_name:str = None):
self.Irc.send2socket(f':{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} :Clones start to join {channel_name} with {wait} secondes frequency')
if clone_name is None:
for clone in self.Clone.UID_CLONE_DB:
time.sleep(wait)
self.Irc.send2socket(f':{self.Config.SERVICE_NICKNAME} PRIVMSG {clone.nickname} :JOIN {channel_name}')
else:
for clone in self.Clone.UID_CLONE_DB:
if clone_name == clone.nickname:
time.sleep(wait)
self.Irc.send2socket(f':{self.Config.SERVICE_NICKNAME} PRIVMSG {clone.nickname} :JOIN {channel_name}')
def generate_names(self) -> tuple[str, str, str]:
try:
fake = faker.Faker('en_GB')
nickname = fake.first_name()
# username = fake.last_name()
# Generate Username
chaine = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
new_username = fake.random_sample(chaine, 9)
username = ''.join(new_username)
# Create realname XX F|M Department
gender = fake.random_choices(['F','M'], 1)
gender = ''.join(gender)
age = random.randint(20, 60)
fake_fr = faker.Faker(['fr_FR', 'en_GB'])
department = fake_fr.department_name()
realname = f'{age} {gender} {department}'
if self.Clone.exists(nickname=nickname):
caracteres = '0123456789'
randomize = ''.join(random.choice(caracteres) for _ in range(2))
nickname = nickname + str(randomize)
self.Clone.insert(
self.Clone.CloneModel(alive=True, nickname=nickname, username=username, realname=realname)
)
else:
self.Clone.insert(
self.Clone.CloneModel(alive=True, nickname=nickname, username=username, realname=realname)
)
return (nickname, username, realname)
except AttributeError as ae:
self.Logs.error(f'Attribute Error : {ae}')
except Exception as err:
self.Logs.error(err)
def cmd(self, data:list) -> None:
service_id = self.Config.SERVICE_ID # Defender serveur id
cmd = list(data).copy()
if len(cmd) < 2:
return None
match cmd[1]:
case 'REPUTATION':
pass
def _hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
try:
command = str(cmd[0]).lower()
fromuser = user
dnickname = self.Config.SERVICE_NICKNAME # Defender nickname
match command:
case 'clone':
if len(cmd) == 1:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} clone connect 6')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} clone kill [all | nickname]')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} clone join [all | nickname] #channel')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} clone list')
option = str(cmd[1]).lower()
match option:
case 'connect':
try:
number_of_clones = int(cmd[2])
for i in range(number_of_clones):
nickname, username, realname = self.generate_names()
self.Base.create_thread(
self.thread_create_clones,
(nickname, username, realname, [], 6697, True)
)
self.Base.create_thread(
self.thread_change_hostname
)
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :{str(number_of_clones)} clones joined the network')
except Exception as err:
self.Logs.error(f'{err}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} clone connect [number of clone you want to connect]')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :Exemple /msg {dnickname} clone connect 6')
case 'kill':
try:
# clone kill [all | nickname]
clone_name = str(cmd[2])
clone_to_kill: list[str] = []
if clone_name.lower() == 'all':
for clone in self.Clone.UID_CLONE_DB:
self.Irc.send2socket(f':{dnickname} PRIVMSG {clone.nickname} :KILL')
clone_to_kill.append(clone.nickname)
clone.alive = False
for clone_nickname in clone_to_kill:
self.Clone.delete(clone_nickname)
del clone_to_kill
else:
if self.Clone.exists(clone_name):
self.Irc.send2socket(f':{dnickname} PRIVMSG {clone_name} :KILL')
self.Clone.kill(clone_name)
self.Clone.delete(clone_name)
except Exception as err:
self.Logs.error(f'{err}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} clone kill all')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} clone kill clone_nickname')
case 'join':
try:
# clone join [all | nickname] #channel
clone_name = str(cmd[2])
clone_channel_to_join = str(cmd[3])
if clone_name.lower() == 'all':
self.Base.create_thread(self.thread_join_channels, (clone_channel_to_join, 2))
else:
self.Base.create_thread(self.thread_join_channels, (clone_channel_to_join, 2, clone_name))
except Exception as err:
self.Logs.error(f'{err}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} clone join all #channel')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} clone join clone_nickname #channel')
case 'list':
try:
for clone_name in self.Clone.UID_CLONE_DB:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :>> Nickname: {clone_name.nickname} | Username: {clone_name.username}')
except Exception as err:
self.Logs.error(f'{err}')
case 'say':
try:
# clone say clone_nickname #channel message
clone_name = str(cmd[2])
clone_channel = str(cmd[3]) if self.Base.Is_Channel(str(cmd[3])) else None
message = []
for i in range(4, len(cmd)):
message.append(cmd[i])
final_message = ' '.join(message)
if clone_channel is None or not self.Clone.exists(clone_name):
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} clone say [clone_nickname] #channel message')
return None
if self.Clone.exists(clone_name):
self.Irc.send2socket(f':{dnickname} PRIVMSG {clone_name} :SAY {clone_channel} {final_message}')
except Exception as err:
self.Logs.error(f'{err}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} clone say [clone_nickname] #channel message')
case _:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} clone connect 6')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} clone kill [all | nickname]')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} clone join [all | nickname] #channel')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} clone say [clone_nickname] #channel [message]')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} clone list')
except IndexError as ie:
self.Logs.error(f'Index Error: {ie}')
except Exception as err:
self.Logs.error(f'Index Error: {err}')

644
mods/mod_command.py Normal file
View File

@@ -0,0 +1,644 @@
from dataclasses import dataclass, fields
from core.irc import Irc
class Command():
@dataclass
class ModConfModel:
"""The Model containing the module parameters
"""
pass
def __init__(self, ircInstance:Irc) -> None:
# Module name (Mandatory)
self.module_name = 'mod_' + str(self.__class__.__name__).lower()
# Add Irc Object to the module (Mandatory)
self.Irc = ircInstance
# Add Global Configuration to the module (Mandatory)
self.Config = ircInstance.Config
# Add Base object to the module (Mandatory)
self.Base = ircInstance.Base
# Add logs object to the module (Mandatory)
self.Logs = ircInstance.Base.logs
# Add User object to the module (Mandatory)
self.User = ircInstance.User
# Add Channel object to the module (Mandatory)
self.Channel = ircInstance.Channel
# Create module commands (Mandatory)
self.commands_level = {
1: ['join', 'part'],
2: ['owner', 'deowner', 'op', 'deop', 'halfop', 'dehalfop', 'voice',
'devoice', 'opall', 'deopall', 'devoiceall', 'voiceall', 'ban',
'unban','kick', 'kickban', 'umode', 'svsjoin', 'svspart', 'svsnick']
}
# Init the module
self.__init_module()
# Log the module
self.Logs.debug(f'Module {self.module_name} loaded ...')
def __init_module(self) -> None:
# Insert module commands into the core one (Mandatory)
self.__set_commands(self.commands_level)
# Create you own tables (Mandatory)
self.__create_tables()
# Load module configuration and sync with core one (Mandatory)
self.__load_module_configuration()
# End of mandatory methods you can start your customization #
return None
def __set_commands(self, commands:dict[int, list[str]]) -> None:
"""### Rajoute les commandes du module au programme principal
Args:
commands (list): Liste des commandes du module
"""
for level, com in commands.items():
for c in commands[level]:
if not c in self.Irc.commands:
self.Irc.commands_level[level].append(c)
self.Irc.commands.append(c)
return None
def __create_tables(self) -> None:
"""Methode qui va créer la base de donnée si elle n'existe pas.
Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module
Args:
database_name (str): Nom de la base de données ( pas d'espace dans le nom )
Returns:
None: Aucun retour n'es attendu
"""
table_logs = '''CREATE TABLE IF NOT EXISTS test_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
server_msg TEXT
)
'''
self.Base.db_execute_query(table_logs)
return None
def __load_module_configuration(self) -> None:
"""### Load Module Configuration
"""
try:
# Build the default configuration model (Mandatory)
self.ModConfig = self.ModConfModel()
# Sync the configuration with core configuration (Mandatory)
self.Base.db_sync_core_config(self.module_name, self.ModConfig)
return None
except TypeError as te:
self.Logs.critical(te)
def __update_configuration(self, param_key: str, param_value: str):
"""Update the local and core configuration
Args:
param_key (str): The parameter key
param_value (str): The parameter value
"""
self.Base.db_update_core_config(self.module_name, self.ModConfig, param_key, param_value)
def unload(self) -> None:
return None
def add_defender_channel(self, channel:str) -> bool:
"""Cette fonction ajoute les salons de join de Defender
Args:
channel (str): le salon à enregistrer.
"""
mes_donnees = {'channel': channel}
response = self.Base.db_execute_query("SELECT id FROM def_channels WHERE channel = :channel", mes_donnees)
isChannelExist = response.fetchone()
if isChannelExist is None:
mes_donnees = {'datetime': self.Base.get_datetime(), 'channel': channel}
insert = self.Base.db_execute_query(f"INSERT INTO def_channels (datetime, channel) VALUES (:datetime, :channel)", mes_donnees)
if insert.rowcount >=0:
return True
else:
return False
else:
return False
def delete_defender_channel(self, channel:str) -> bool:
"""Cette fonction supprime les salons de join de Defender
Args:
channel (str): le salon à enregistrer.
"""
mes_donnes = {'channel': channel}
response = self.Base.db_execute_query("DELETE FROM def_channels WHERE channel = :channel", mes_donnes)
affected_row = response.rowcount
if affected_row > 0:
return True
else:
return False
def cmd(self, data:list) -> None:
return None
def _hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
command = str(cmd[0]).lower()
dnickname = self.Config.SERVICE_NICKNAME
service_id = self.Config.SERVICE_ID
dchanlog = self.Config.SERVICE_CHANLOG
fromuser = user
fromchannel = channel
match command:
case 'deopall':
try:
self.Irc.send2socket(f":{service_id} SVSMODE {fromchannel} -o")
except IndexError as e:
self.Logs.warning(f'_hcmd OP: {str(e)}')
except Exception as err:
self.Logs.warning(f'Unknown Error: {str(err)}')
case 'devoiceall':
try:
self.Irc.send2socket(f":{service_id} SVSMODE {fromchannel} -v")
except IndexError as e:
self.Logs.warning(f'_hcmd OP: {str(e)}')
except Exception as err:
self.Logs.warning(f'Unknown Error: {str(err)}')
case 'voiceall':
try:
chan_info = self.Channel.get_Channel(fromchannel)
set_mode = 'v'
mode:str = ''
users:str = ''
uids_split = [chan_info.uids[i:i + 6] for i in range(0, len(chan_info.uids), 6)]
self.Irc.send2socket(f":{service_id} MODE {fromchannel} +{set_mode} {dnickname}")
for uid in uids_split:
for i in range(0, len(uid)):
mode += set_mode
users += f'{self.User.get_nickname(self.Base.clean_uid(uid[i]))} '
if i == len(uid) - 1:
self.Irc.send2socket(f":{service_id} MODE {fromchannel} +{mode} {users}")
mode = ''
users = ''
except IndexError as e:
self.Logs.warning(f'_hcmd OP: {str(e)}')
except Exception as err:
self.Logs.warning(f'Unknown Error: {str(err)}')
case 'opall':
try:
chan_info = self.Channel.get_Channel(fromchannel)
set_mode = 'o'
mode:str = ''
users:str = ''
uids_split = [chan_info.uids[i:i + 6] for i in range(0, len(chan_info.uids), 6)]
self.Irc.send2socket(f":{service_id} MODE {fromchannel} +{set_mode} {dnickname}")
for uid in uids_split:
for i in range(0, len(uid)):
mode += set_mode
users += f'{self.User.get_nickname(self.Base.clean_uid(uid[i]))} '
if i == len(uid) - 1:
self.Irc.send2socket(f":{service_id} MODE {fromchannel} +{mode} {users}")
mode = ''
users = ''
except IndexError as e:
self.Logs.warning(f'_hcmd OP: {str(e)}')
except Exception as err:
self.Logs.warning(f'Unknown Error: {str(err)}')
case 'op':
# /mode #channel +o user
# .op #channel user
# /msg dnickname op #channel user
# [':adator', 'PRIVMSG', '#services', ':.o', '#services', 'dktmb']
try:
if fromchannel is None:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} op [#SALON] [NICKNAME]')
return False
if len(cmd) == 1:
self.Irc.send2socket(f":{dnickname} MODE {fromchannel} +o {fromuser}")
return True
# deop nickname
if len(cmd) == 2:
nickname = cmd[1]
self.Irc.send2socket(f":{service_id} MODE {fromchannel} +o {nickname}")
return True
nickname = cmd[2]
self.Irc.send2socket(f":{service_id} MODE {fromchannel} +o {nickname}")
except IndexError as e:
self.Logs.warning(f'_hcmd OP: {str(e)}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} op [#SALON] [NICKNAME]')
except Exception as err:
self.Logs.warning(f'Unknown Error: {str(err)}')
case 'deop':
# /mode #channel -o user
# .deop #channel user
try:
if fromchannel is None:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} deop [#SALON] [NICKNAME]')
return False
if len(cmd) == 1:
self.Irc.send2socket(f":{service_id} MODE {fromchannel} -o {fromuser}")
return True
# deop nickname
if len(cmd) == 2:
nickname = cmd[1]
self.Irc.send2socket(f":{service_id} MODE {fromchannel} -o {nickname}")
return True
nickname = cmd[2]
self.Irc.send2socket(f":{service_id} MODE {fromchannel} -o {nickname}")
except IndexError as e:
self.Logs.warning(f'_hcmd DEOP: {str(e)}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} deop [#SALON] [NICKNAME]')
except Exception as err:
self.Logs.warning(f'Unknown Error: {str(err)}')
case 'owner':
# /mode #channel +q user
# .owner #channel user
try:
if fromchannel is None:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} owner [#SALON] [NICKNAME]')
return False
if len(cmd) == 1:
self.Irc.send2socket(f":{service_id} MODE {fromchannel} +q {fromuser}")
return True
# owner nickname
if len(cmd) == 2:
nickname = cmd[1]
self.Irc.send2socket(f":{service_id} MODE {fromchannel} +q {nickname}")
return True
nickname = cmd[2]
self.Irc.send2socket(f":{service_id} MODE {fromchannel} +q {nickname}")
except IndexError as e:
self.Logs.warning(f'_hcmd OWNER: {str(e)}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} owner [#SALON] [NICKNAME]')
except Exception as err:
self.Logs.warning(f'Unknown Error: {str(err)}')
case 'deowner':
# /mode #channel -q user
# .deowner #channel user
try:
if fromchannel is None:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} deowner [#SALON] [NICKNAME]')
return False
if len(cmd) == 1:
self.Irc.send2socket(f":{service_id} MODE {fromchannel} -q {fromuser}")
return True
# deowner nickname
if len(cmd) == 2:
nickname = cmd[1]
self.Irc.send2socket(f":{service_id} MODE {fromchannel} -q {nickname}")
return True
nickname = cmd[2]
self.Irc.send2socket(f":{service_id} MODE {fromchannel} -q {nickname}")
except IndexError as e:
self.Logs.warning(f'_hcmd DEOWNER: {str(e)}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} deowner [#SALON] [NICKNAME]')
except Exception as err:
self.Logs.warning(f'Unknown Error: {str(err)}')
case 'halfop':
# /mode #channel +h user
# .halfop #channel user
try:
if fromchannel is None:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} halfop [#SALON] [NICKNAME]')
return False
if len(cmd) == 1:
self.Irc.send2socket(f":{service_id} MODE {fromchannel} +h {fromuser}")
return True
# deop nickname
if len(cmd) == 2:
nickname = cmd[1]
self.Irc.send2socket(f":{service_id} MODE {fromchannel} +h {nickname}")
return True
nickname = cmd[2]
self.Irc.send2socket(f":{service_id} MODE {fromchannel} +h {nickname}")
except IndexError as e:
self.Logs.warning(f'_hcmd halfop: {str(e)}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} halfop [#SALON] [NICKNAME]')
except Exception as err:
self.Logs.warning(f'Unknown Error: {str(err)}')
case 'dehalfop':
# /mode #channel -h user
# .dehalfop #channel user
try:
if fromchannel is None:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} dehalfop [#SALON] [NICKNAME]')
return False
if len(cmd) == 1:
self.Irc.send2socket(f":{service_id} MODE {fromchannel} -h {fromuser}")
return True
# dehalfop nickname
if len(cmd) == 2:
nickname = cmd[1]
self.Irc.send2socket(f":{service_id} MODE {fromchannel} -h {nickname}")
return True
nickname = cmd[2]
self.Irc.send2socket(f":{service_id} MODE {fromchannel} -h {nickname}")
except IndexError as e:
self.Logs.warning(f'_hcmd DEHALFOP: {str(e)}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} dehalfop [#SALON] [NICKNAME]')
except Exception as err:
self.Logs.warning(f'Unknown Error: {str(err)}')
case 'voice':
# /mode #channel +v user
# .voice #channel user
try:
if fromchannel is None:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} voice [#SALON] [NICKNAME]')
return False
if len(cmd) == 1:
self.Irc.send2socket(f":{service_id} MODE {fromchannel} +v {fromuser}")
return True
# voice nickname
if len(cmd) == 2:
nickname = cmd[1]
self.Irc.send2socket(f":{service_id} MODE {fromchannel} +v {nickname}")
return True
nickname = cmd[2]
self.Irc.send2socket(f":{service_id} MODE {fromchannel} +v {nickname}")
except IndexError as e:
self.Logs.warning(f'_hcmd VOICE: {str(e)}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} voice [#SALON] [NICKNAME]')
except Exception as err:
self.Logs.warning(f'Unknown Error: {str(err)}')
case 'devoice':
# /mode #channel -v user
# .devoice #channel user
try:
if fromchannel is None:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} devoice [#SALON] [NICKNAME]')
return False
if len(cmd) == 1:
self.Irc.send2socket(f":{service_id} MODE {fromchannel} -v {fromuser}")
return True
# dehalfop nickname
if len(cmd) == 2:
nickname = cmd[1]
self.Irc.send2socket(f":{service_id} MODE {fromchannel} -v {nickname}")
return True
nickname = cmd[2]
self.Irc.send2socket(f":{service_id} MODE {fromchannel} -v {nickname}")
except IndexError as e:
self.Logs.warning(f'_hcmd DEVOICE: {str(e)}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} devoice [#SALON] [NICKNAME]')
except Exception as err:
self.Logs.warning(f'Unknown Error: {str(err)}')
case 'ban':
# .ban #channel nickname
try:
sentchannel = str(cmd[1]) if self.Base.Is_Channel(cmd[1]) else None
if sentchannel is None:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} ban [#SALON] [NICKNAME]')
return False
nickname = cmd[2]
self.Irc.send2socket(f":{service_id} MODE {sentchannel} +b {nickname}!*@*")
self.Logs.debug(f'{fromuser} has banned {nickname} from {sentchannel}')
except IndexError as e:
self.Logs.warning(f'_hcmd BAN: {str(e)}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} ban [#SALON] [NICKNAME]')
except Exception as err:
self.Logs.warning(f'Unknown Error: {str(err)}')
case 'unban':
# .unban #channel nickname
try:
sentchannel = str(cmd[1]) if self.Base.Is_Channel(cmd[1]) else None
if sentchannel is None:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} ban [#SALON] [NICKNAME]')
return False
nickname = cmd[2]
self.Irc.send2socket(f":{service_id} MODE {sentchannel} -b {nickname}!*@*")
self.Logs.debug(f'{fromuser} has unbanned {nickname} from {sentchannel}')
except IndexError as e:
self.Logs.warning(f'_hcmd UNBAN: {str(e)}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} unban [#SALON] [NICKNAME]')
except Exception as err:
self.Logs.warning(f'Unknown Error: {str(err)}')
case 'kick':
# .kick #channel nickname reason
try:
sentchannel = str(cmd[1]) if self.Base.Is_Channel(cmd[1]) else None
if sentchannel is None:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} ban [#SALON] [NICKNAME]')
return False
nickname = cmd[2]
reason = []
for i in range(3, len(cmd)):
reason.append(cmd[i])
final_reason = ' '.join(reason)
self.Irc.send2socket(f":{service_id} KICK {sentchannel} {nickname} {final_reason}")
self.Logs.debug(f'{fromuser} has kicked {nickname} from {sentchannel} : {final_reason}')
except IndexError as e:
self.Logs.warning(f'_hcmd KICK: {str(e)}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} kick [#SALON] [NICKNAME] [REASON]')
except Exception as err:
self.Logs.warning(f'Unknown Error: {str(err)}')
case 'kickban':
# .kickban #channel nickname reason
try:
sentchannel = str(cmd[1]) if self.Base.Is_Channel(cmd[1]) else None
if sentchannel is None:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} ban [#SALON] [NICKNAME]')
return False
nickname = cmd[2]
reason = []
for i in range(3, len(cmd)):
reason.append(cmd[i])
final_reason = ' '.join(reason)
self.Irc.send2socket(f":{service_id} KICK {sentchannel} {nickname} {final_reason}")
self.Irc.send2socket(f":{service_id} MODE {sentchannel} +b {nickname}!*@*")
self.Logs.debug(f'{fromuser} has kicked and banned {nickname} from {sentchannel} : {final_reason}')
except IndexError as e:
self.Logs.warning(f'_hcmd KICKBAN: {str(e)}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : Right command : /msg {dnickname} kickban [#SALON] [NICKNAME] [REASON]')
except Exception as err:
self.Logs.warning(f'Unknown Error: {str(err)}')
case 'join':
try:
sent_channel = str(cmd[1]) if self.Base.Is_Channel(cmd[1]) else None
if sent_channel is None:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :{self.Config.SERVICE_PREFIX}JOIN #channel')
return False
self.Irc.send2socket(f':{service_id} JOIN {sent_channel}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : {dnickname} JOINED {sent_channel}')
self.Base.db_query_channel('add', self.module_name, sent_channel)
except IndexError as ie:
self.Logs.error(f'{ie}')
except Exception as err:
self.Logs.warning(f'Unknown Error: {str(err)}')
case 'part':
try:
sent_channel = str(cmd[1]) if self.Base.Is_Channel(cmd[1]) else None
if sent_channel is None:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :{self.Config.SERVICE_PREFIX}PART #channel')
return False
if sent_channel == dchanlog:
self.Irc.send2socket(f":{dnickname} NOTICE {fromuser} : {dnickname} CAN'T LEFT {sent_channel} AS IT IS LOG CHANNEL")
return False
self.Irc.send2socket(f':{service_id} PART {sent_channel}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : {dnickname} LEFT {sent_channel}')
self.Base.db_query_channel('del', self.module_name, sent_channel)
except IndexError as ie:
self.Logs.error(f'{ie}')
except Exception as err:
self.Logs.warning(f'Unknown Error: {str(err)}')
case 'umode':
try:
# .umode nickname +mode
nickname = str(cmd[1])
umode = str(cmd[2])
self.Irc.send2socket(f':{dnickname} SVSMODE {nickname} {umode}')
except KeyError as ke:
self.Base.logs.error(ke)
except Exception as err:
self.Logs.warning(f'Unknown Error: {str(err)}')
case 'svsjoin':
try:
# .svsjoin nickname #channel
nickname = str(cmd[1])
channel = str(cmd[2])
if len(cmd) != 3:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : /msg {dnickname} SVSJOIN nickname #channel')
return None
self.Irc.send2socket(f':{self.Config.SERVEUR_ID} SVSJOIN {nickname} {channel}')
except KeyError as ke:
self.Base.logs.error(ke)
except Exception as err:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : /msg {dnickname} SVSJOIN nickname #channel')
self.Logs.warning(f'Unknown Error: {str(err)}')
case 'svspart':
try:
# .svspart nickname #channel
nickname = str(cmd[1])
channel = str(cmd[2])
if len(cmd) != 3:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : /msg {dnickname} SVSPART nickname #channel')
return None
self.Irc.send2socket(f':{self.Config.SERVEUR_ID} SVSPART {nickname} {channel}')
except KeyError as ke:
self.Base.logs.error(ke)
except Exception as err:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : /msg {dnickname} SVSPART nickname #channel')
self.Logs.warning(f'Unknown Error: {str(err)}')
case 'svsnick':
try:
# .svsnick nickname newnickname
nickname = str(cmd[1])
newnickname = str(cmd[2])
unixtime = self.Base.get_unixtime()
if self.User.get_nickname(nickname) is None:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : This nickname do not exist')
return None
if len(cmd) != 3:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : /msg {dnickname} SVSNICK nickname newnickname')
return None
self.Irc.send2socket(f':{self.Config.SERVEUR_ID} SVSNICK {nickname} {newnickname} {unixtime}')
except KeyError as ke:
self.Base.logs.error(ke)
except Exception as err:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : /msg {dnickname} SVSNICK nickname newnickname')
self.Logs.warning(f'Unknown Error: {str(err)}')

File diff suppressed because it is too large Load Diff

View File

@@ -1,53 +1,64 @@
from dataclasses import dataclass, fields
from core.irc import Irc
# Le module crée devra réspecter quelques conditions
# 1. Importer le module de configuration
# 2. Le nom de class devra toujours s'appeler comme le module exemple => nom de class Dktmb | nom du module mod_dktmb
# 3. la fonction __init__ devra toujours avoir les parametres suivant (self, irc:object)
# 1 . Créer la variable Irc dans le module
# 2 . Récuperer la configuration dans une variable
# 3 . Définir et enregistrer les nouvelles commandes
# 4. une fonction _hcmds(self, user:str, cmd: list) devra toujours etre crée.
class Test():
@dataclass
class ModConfModel:
"""The Model containing the module parameters
"""
param_exemple1: str
param_exemple2: int
def __init__(self, ircInstance:Irc) -> None:
# Add Irc Object to the module
# Module name (Mandatory)
self.module_name = 'mod_' + str(self.__class__.__name__).lower()
# Add Irc Object to the module (Mandatory)
self.Irc = ircInstance
# Add Global Configuration to the module
# Add Global Configuration to the module (Mandatory)
self.Config = ircInstance.Config
# Add Base object to the module
# Add Base object to the module (Mandatory)
self.Base = ircInstance.Base
# Add logs object to the module
# Add logs object to the module (Mandatory)
self.Logs = ircInstance.Base.logs
# Add User object to the module
# Add User object to the module (Mandatory)
self.User = ircInstance.User
# Add Channel object to the module
# Add Channel object to the module (Mandatory)
self.Channel = ircInstance.Channel
# Créer les nouvelles commandes du module
# Create module commands (Mandatory)
self.commands_level = {
0: ['test'],
1: ['test_level_1']
0: ['test-command'],
1: ['test_level_1'],
2: ['test_level_2'],
3: ['test_level_3']
}
# Init the module
self.__init_module()
# Log the module
self.Logs.debug(f'Module {self.__class__.__name__} loaded ...')
self.Logs.debug(f'Module {self.module_name} loaded ...')
def __init_module(self) -> None:
# Insert module commands into the core one (Mandatory)
self.__set_commands(self.commands_level)
# Create you own tables (Mandatory)
self.__create_tables()
# Load module configuration and sync with core one (Mandatory)
self.__load_module_configuration()
# End of mandatory methods you can start your customization #
return None
def __set_commands(self, commands:dict[int, list[str]]) -> None:
@@ -78,31 +89,67 @@ class Test():
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
server_msg TEXT
)
)
'''
self.Base.db_execute_query(table_logs)
return None
def __load_module_configuration(self) -> None:
"""### Load Module Configuration
"""
try:
# Build the default configuration model (Mandatory)
self.ModConfig = self.ModConfModel(param_exemple1='param value 1', param_exemple2=1)
# Sync the configuration with core configuration (Mandatory)
self.Base.db_sync_core_config(self.module_name, self.ModConfig)
return None
except TypeError as te:
self.Logs.critical(te)
def __update_configuration(self, param_key: str, param_value: str):
"""Update the local and core configuration
Args:
param_key (str): The parameter key
param_value (str): The parameter value
"""
self.Base.db_update_core_config(self.module_name, self.ModConfig, param_key, param_value)
def unload(self) -> None:
return None
def cmd(self, data:list) -> None:
return None
def _hcmds(self, user:str, cmd: list, fullcmd: list = []) -> None:
def _hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
command = str(cmd[0]).lower()
dnickname = self.Config.SERVICE_NICKNAME
fromuser = user
fromchannel = str(channel) if not channel is None else None
match command:
case 'test':
case 'test-command':
try:
self.Irc.send2socket(f":{dnickname} NOTICE {fromuser} : test command ready ...")
self.Irc.send2socket(f":{dnickname} NOTICE {fromuser} : This is a notice to the sender ...")
self.Irc.send2socket(f":{dnickname} PRIVMSG {fromuser} : This is private message to the sender ...")
if not fromchannel is None:
self.Irc.send2socket(f":{dnickname} PRIVMSG {fromchannel} : This is channel message to the sender ...")
# How to update your module configuration
self.__update_configuration('param_exemple2', 7)
# Log if you want the result
self.Logs.debug(f"Test logs ready")
except KeyError as ke:
self.Logs.error(f"Key Error : {ke}")
except Exception as err:
self.Logs.error(f"Unknown Error: {err}")

View File

@@ -20,10 +20,14 @@ class Votekick():
voter_users: list
vote_for: int
vote_against: int
VOTE_CHANNEL_DB:list[VoteChannelModel] = []
def __init__(self, ircInstance:Irc) -> None:
# Module name (Mandatory)
self.module_name = 'mod_' + str(self.__class__.__name__).lower()
# Add Irc Object to the module
self.Irc = ircInstance
@@ -44,18 +48,20 @@ class Votekick():
# Créer les nouvelles commandes du module
self.commands_level = {
0: ['vote_for', 'vote_against'],
1: ['activate', 'deactivate', 'submit', 'vote_stat', 'vote_verdict', 'vote_cancel']
0: ['vote']
}
# Init the module
self.__init_module()
# Log the module
self.Logs.debug(f'Module {self.__class__.__name__} loaded ...')
self.Logs.debug(f'Module {self.module_name} loaded ...')
def __init_module(self) -> None:
# Add admin object to retrieve admin users
self.Admin = self.Irc.Admin
self.__set_commands(self.commands_level)
self.__create_tables()
self.join_saved_channels()
@@ -143,7 +149,7 @@ class Votekick():
if not found:
self.VOTE_CHANNEL_DB.append(ChannelObject)
self.Logs.debug(f"The channel has been added {ChannelObject}")
self.db_add_vote_channel(ChannelObject.channel_name)
# self.db_add_vote_channel(ChannelObject.channel_name)
return result
@@ -188,7 +194,9 @@ class Votekick():
def join_saved_channels(self) -> None:
result = self.Base.db_execute_query("SELECT id, channel FROM votekick_channel")
param = {'module_name': self.module_name}
result = self.Base.db_execute_query(f"SELECT id, channel_name FROM {self.Config.table_channel} WHERE module_name = :module_name", param)
channels = result.fetchall()
unixtime = self.Base.get_unixtime()
@@ -214,15 +222,18 @@ class Votekick():
dnickname = self.Config.SERVICE_NICKNAME
if not self.is_vote_ongoing(channel):
return None
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
target_user = self.User.get_nickname(chan.target_user)
if chan.vote_for > chan.vote_against:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :The user {self.Config.CONFIG_COLOR["gras"]}{target_user}{self.Config.CONFIG_COLOR["nogc"]} will be kicked from this channel')
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :User {self.Config.CONFIG_COLOR["gras"]}{target_user}{self.Config.CONFIG_COLOR["nogc"]} has {chan.vote_against} votes against and {chan.vote_for} votes for. For this reason, it\'ll be kicked from the channel')
self.Irc.send2socket(f":{dnickname} KICK {channel} {target_user} Following the vote, you are not welcome in {channel}")
self.Channel.delete_user_from_channel(channel, self.User.get_uid(target_user))
elif chan.vote_for <= chan.vote_against:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :This user will stay on this channel')
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :User {self.Config.CONFIG_COLOR["gras"]}{target_user}{self.Config.CONFIG_COLOR["nogc"]} has {chan.vote_against} votes against and {chan.vote_for} votes for. For this reason, it\'ll remain in the channel')
# Init the system
if self.init_vote_system(channel):
@@ -241,201 +252,244 @@ class Votekick():
return None
def _hcmds(self, user:str, cmd: list, fullcmd: list = []) -> None:
def _hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
# cmd is the command starting from the user command
# full cmd is sending the entire server response
command = str(cmd[0]).lower()
dnickname = self.Config.SERVICE_NICKNAME
fromuser = user
if len(fullcmd) >= 3:
fromchannel = str(fullcmd[2]).lower() if self.Base.Is_Channel(str(fullcmd[2]).lower()) else None
else:
fromchannel = None
if len(cmd) >= 2:
sentchannel = str(cmd[1]).lower() if self.Base.Is_Channel(str(cmd[1]).lower()) else None
else:
sentchannel = None
if not fromchannel is None:
channel = fromchannel
elif not sentchannel is None:
channel = sentchannel
else:
channel = None
fromchannel = channel
match command:
case 'vote':
option = str(cmd[1]).lower()
case 'vote_cancel':
try:
if channel is None:
self.Logs.error(f"The channel is not known, defender can't cancel the vote")
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :You need to specify the channel => /msg {dnickname} vote_cancel #channel')
if len(command) == 1:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} vote activate #channel')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} vote deactivate #channel')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} vote +')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} vote -')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} vote cancel')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} vote status')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} vote submit nickname')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} vote verdict')
for vote in self.VOTE_CHANNEL_DB:
if vote.channel_name == channel:
self.init_vote_system(channel)
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :Vote system re-initiated')
match option:
except IndexError as ke:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} vote_cancel #channel')
self.Logs.error(f'Index Error: {ke}')
case 'activate':
try:
# vote activate #channel
if self.Admin.get_Admin(fromuser) is None:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :Your are not allowed to execute this command')
return None
case 'vote_for':
try:
# vote_for
channel = str(fullcmd[2]).lower()
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
if fromuser in chan.voter_users:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :You already submitted a vote')
else:
chan.vote_for += 1
chan.voter_users.append(fromuser)
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :Vote recorded, thank you')
sentchannel = str(cmd[2]).lower() if self.Base.Is_Channel(str(cmd[2]).lower()) else None
if sentchannel is None:
self.Irc.send2socket(f":{dnickname} NOTICE {fromuser} :The correct command is {self.Config.SERVICE_PREFIX}{command} {option} #CHANNEL")
except KeyError as ke:
self.Logs.error(f'Key Error: {ke}')
except IndexError as ie:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} vote_cancel #channel')
self.Logs.error(f'Index Error: {ie}')
self.insert_vote_channel(
self.VoteChannelModel(
channel_name=sentchannel,
target_user='',
voter_users=[],
vote_for=0,
vote_against=0
)
)
case 'vote_against':
try:
# vote_against
channel = str(fullcmd[2]).lower()
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
if fromuser in chan.voter_users:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :You already submitted a vote')
else:
chan.vote_against += 1
chan.voter_users.append(fromuser)
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :Vote recorded, thank you')
self.Base.db_query_channel('add', self.module_name, sentchannel)
except KeyError as ke:
self.Logs.error(f'Key Error: {ke}')
self.Irc.send2socket(f":{dnickname} JOIN {sentchannel}")
self.Irc.send2socket(f":{dnickname} SAMODE {sentchannel} +o {dnickname}")
self.Irc.send2socket(f":{dnickname} PRIVMSG {sentchannel} :You can now use !submit <nickname> to decide if he will stay or not on this channel ")
except Exception as err:
self.Logs.error(f'{err}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} {command} {option} #channel')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :Exemple /msg {dnickname} {command} {option} #welcome')
case 'vote_stat':
try:
# channel = str(fullcmd[2]).lower()
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :Channel: {chan.channel_name} | Target: {self.User.get_nickname(chan.target_user)} | For: {chan.vote_for} | Against: {chan.vote_against} | Number of voters: {str(len(chan.voter_users))}')
case 'deactivate':
try:
# vote deactivate #channel
if self.Admin.get_Admin(fromuser) is None:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :Your are not allowed to execute this command')
return None
except KeyError as ke:
self.Logs.error(f'Key Error: {ke}')
sentchannel = str(cmd[2]).lower() if self.Base.Is_Channel(str(cmd[2]).lower()) else None
if sentchannel is None:
self.Irc.send2socket(f":{dnickname} NOTICE {fromuser} :The correct command is {self.Config.SERVICE_PREFIX}{command} {option} #CHANNEL")
case 'vote_verdict':
try:
# channel = str(fullcmd[2]).lower()
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
target_user = self.User.get_nickname(chan.target_user)
if chan.vote_for > chan.vote_against:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :The user {self.Config.CONFIG_COLOR["gras"]}{target_user}{self.Config.CONFIG_COLOR["nogc"]} will be kicked from this channel')
self.Irc.send2socket(f":{dnickname} KICK {channel} {target_user} Following the vote, you are not welcome in {channel}")
elif chan.vote_for <= chan.vote_against:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :This user will stay on this channel')
# Init the system
if self.init_vote_system(channel):
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :System vote re initiated')
self.Irc.send2socket(f":{dnickname} SAMODE {sentchannel} -o {dnickname}")
self.Irc.send2socket(f":{dnickname} PART {sentchannel}")
except KeyError as ke:
self.Logs.error(f'Key Error: {ke}')
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == sentchannel:
self.VOTE_CHANNEL_DB.remove(chan)
self.Base.db_query_channel('del', self.module_name, chan.channel_name)
case 'submit':
# submit nickname
try:
nickname_submitted = cmd[1]
# channel = str(fullcmd[2]).lower()
uid_submitted = self.User.get_uid(nickname_submitted)
user_submitted = self.User.get_User(nickname_submitted)
self.Logs.debug(f"The Channel {sentchannel} has been deactivated from the vote system")
except Exception as err:
self.Logs.error(f'{err}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} {command} {option} #channel')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :Exemple /msg {dnickname} {command} {option} #welcome')
# check if there is an ongoing vote
if self.is_vote_ongoing(channel):
for vote in self.VOTE_CHANNEL_DB:
if vote.channel_name == channel:
ongoing_user = self.User.get_nickname(vote.target_user)
case '+':
try:
# vote +
channel = fromchannel
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
if fromuser in chan.voter_users:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :You already submitted a vote')
else:
chan.vote_for += 1
chan.voter_users.append(fromuser)
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :Vote recorded, thank you')
except Exception as err:
self.Logs.error(f'{err}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} {command} {option}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :Exemple /msg {dnickname} {command} {option}')
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :There is an ongoing vote on {ongoing_user}')
return False
case '-':
try:
# vote -
channel = fromchannel
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
if fromuser in chan.voter_users:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :You already submitted a vote')
else:
chan.vote_against += 1
chan.voter_users.append(fromuser)
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :Vote recorded, thank you')
except Exception as err:
self.Logs.error(f'{err}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} {command} {option}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :Exemple /msg {dnickname} {command} {option}')
# check if the user exist
if user_submitted is None:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :This nickname <{nickname_submitted}> do not exist')
return False
case 'cancel':
try:
# vote cancel
if self.Admin.get_Admin(fromuser) is None:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :Your are not allowed to execute this command')
return None
uid_cleaned = self.Base.clean_uid(uid_submitted)
ChannelInfo = self.Channel.get_Channel(channel)
if channel is None:
self.Logs.error(f"The channel is not known, defender can't cancel the vote")
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :You need to specify the channel => /msg {dnickname} vote_cancel #channel')
clean_uids_in_channel: list = []
for uid in ChannelInfo.uids:
clean_uids_in_channel.append(self.Base.clean_uid(uid))
for vote in self.VOTE_CHANNEL_DB:
if vote.channel_name == channel:
self.init_vote_system(channel)
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :Vote system re-initiated')
if not uid_cleaned in clean_uids_in_channel:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :This nickname <{nickname_submitted}> is not available in this channel')
return False
except Exception as err:
self.Logs.error(f'{err}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} {command} {option}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :Exemple /msg {dnickname} {command} {option}')
# check if Ircop or Service or Bot
pattern = fr'[o|B|S]'
operator_user = re.findall(pattern, user_submitted.umodes)
if operator_user:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :You cant vote for this user ! he/she is protected')
return False
case 'status':
try:
# vote status
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :Channel: {chan.channel_name} | Target: {self.User.get_nickname(chan.target_user)} | For: {chan.vote_for} | Against: {chan.vote_against} | Number of voters: {str(len(chan.voter_users))}')
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
chan.target_user = self.User.get_uid(nickname_submitted)
except Exception as err:
self.Logs.error(f'{err}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} {command} {option}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :Exemple /msg {dnickname} {command} {option}')
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :{nickname_submitted} has been targeted for a vote')
case 'submit':
try:
# vote submit nickname
if self.Admin.get_Admin(fromuser) is None:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :Your are not allowed to execute this command')
return None
self.Base.create_timer(60, self.timer_vote_verdict, (channel, ))
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :This vote will end after 60 secondes')
nickname_submitted = cmd[2]
uid_submitted = self.User.get_uid(nickname_submitted)
user_submitted = self.User.get_User(nickname_submitted)
except KeyError as ke:
self.Logs.error(f'Key Error: {ke}')
except TypeError as te:
self.Logs.error(te)
# check if there is an ongoing vote
if self.is_vote_ongoing(channel):
for vote in self.VOTE_CHANNEL_DB:
if vote.channel_name == channel:
ongoing_user = self.User.get_nickname(vote.target_user)
case 'activate':
try:
# activate #channel
# channel = str(cmd[1]).lower()
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :There is an ongoing vote on {ongoing_user}')
return False
self.insert_vote_channel(
self.VoteChannelModel(
channel_name=channel,
target_user='',
voter_users=[],
vote_for=0,
vote_against=0
)
)
# check if the user exist
if user_submitted is None:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :This nickname <{nickname_submitted}> do not exist')
return False
self.Irc.send2socket(f":{dnickname} JOIN {channel}")
self.Irc.send2socket(f":{dnickname} SAMODE {channel} +o {dnickname}")
self.Irc.send2socket(f":{dnickname} PRIVMSG {channel} :You can now use !submit <nickname> to decide if he will stay or not on this channel ")
uid_cleaned = self.Base.clean_uid(uid_submitted)
ChannelInfo = self.Channel.get_Channel(channel)
if ChannelInfo is None:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :This channel [{channel}] do not exist in the Channel Object')
return False
except KeyError as ke:
self.Logs.error(f"Key Error : {ke}")
clean_uids_in_channel: list = []
for uid in ChannelInfo.uids:
clean_uids_in_channel.append(self.Base.clean_uid(uid))
case 'deactivate':
try:
# deactivate #channel
# channel = str(cmd[1]).lower()
if not uid_cleaned in clean_uids_in_channel:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :This nickname <{nickname_submitted}> is not available in this channel')
return False
self.Irc.send2socket(f":{dnickname} SAMODE {channel} -o {dnickname}")
self.Irc.send2socket(f":{dnickname} PART {channel}")
# check if Ircop or Service or Bot
pattern = fr'[o|B|S]'
operator_user = re.findall(pattern, user_submitted.umodes)
if operator_user:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :You cant vote for this user ! he/she is protected')
return False
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
self.VOTE_CHANNEL_DB.remove(chan)
self.db_delete_vote_channel(chan.channel_name)
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
chan.target_user = self.User.get_uid(nickname_submitted)
self.Logs.debug(f"Test logs ready")
except KeyError as ke:
self.Logs.error(f"Key Error : {ke}")
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :{nickname_submitted} has been targeted for a vote')
self.Base.create_timer(60, self.timer_vote_verdict, (channel, ))
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :This vote will end after 60 secondes')
except Exception as err:
self.Logs.error(f'{err}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} {command} {option} nickname')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :Exemple /msg {dnickname} {command} {option} adator')
case 'verdict':
try:
# vote verdict
if self.Admin.get_Admin(fromuser) is None:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :Your are not allowed to execute this command')
return None
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
target_user = self.User.get_nickname(chan.target_user)
if chan.vote_for > chan.vote_against:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :User {self.Config.CONFIG_COLOR["gras"]}{target_user}{self.Config.CONFIG_COLOR["nogc"]} has {chan.vote_against} votes against and {chan.vote_for} votes for. For this reason, it\'ll be kicked from the channel')
self.Irc.send2socket(f":{dnickname} KICK {channel} {target_user} Following the vote, you are not welcome in {channel}")
elif chan.vote_for <= chan.vote_against:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :User {self.Config.CONFIG_COLOR["gras"]}{target_user}{self.Config.CONFIG_COLOR["nogc"]} has {chan.vote_against} votes against and {chan.vote_for} votes for. For this reason, it\'ll remain in the channel')
# Init the system
if self.init_vote_system(channel):
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :System vote re initiated')
except Exception as err:
self.Logs.error(f'{err}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} {command} {option}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :Exemple /msg {dnickname} {command} {option}')
case _:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} vote activate #channel')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} vote deactivate #channel')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} vote +')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} vote -')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} vote cancel')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} vote status')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} vote submit nickname')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} vote verdict')

View File

@@ -1,3 +1,3 @@
{
"version": "5.0.7"
"version": "5.2.4"
}