24 Commits

Author SHA1 Message Date
adator
f0853e3afb Merge pull request #21 from adator85/dev
New Installation file created for unix system
2024-08-22 01:02:00 +02:00
adator
88b9b056ca New Installation file created for unix system 2024-08-22 01:01:21 +02:00
adator
6dade09257 Merge pull request #20 from adator85/dev
README Update
2024-08-21 00:50:31 +02:00
adator
d7fab2d701 README Update
Version Update
base.py:
    - Adding timeout variable to github connexion
    - Adding get_all_module method to retrieve all modules in mods/ folder
irc.py:
    - Adapt show_module command
mod_defender.py:
    - Update operator command and use only normal command (owner, deowner, op, deop, halfop, dehalfop, voice, devoice, kick, kickban, ban)
    - Channel variable is coming now from the command but also from the system
2024-08-21 00:43:20 +02:00
adator
9533b010b2 Merge pull request #19 from adator85/dev
V5.0.4 - Delete a user when a user has been kicked
2024-08-20 02:24:37 +02:00
adator
dbfc04a936 V5.0.4 - Delete a user when a user has been kicked 2024-08-20 02:24:11 +02:00
adator
824db73590 Merge pull request #18 from adator85/dev
Delete channel mode information
2024-08-20 02:14:31 +02:00
adator
bfb449f804 Delete channel mode information 2024-08-20 02:13:54 +02:00
adator
96bf4b6f80 Merge pull request #17 from adator85/dev
Fix channel update
2024-08-20 02:08:09 +02:00
adator
8c772f5882 Fix channel update 2024-08-20 02:07:21 +02:00
adator
922336363e Merge pull request #16 from adator85/dev
Dev
2024-08-20 01:56:04 +02:00
adator
e5ceada997 Merge pull request #15 from adator85/mac
Mac
2024-08-20 01:51:43 +02:00
adator
e4781614f4 V5.0.1 2024-08-20 01:50:08 +02:00
adator85
5ba4e27e3d update mod_votekick.py 2024-08-19 00:27:45 +02:00
adator85
eda0edb92a Adding a method to test a channel 2024-08-17 13:46:24 +02:00
adator85
deae79db57 Version 5 with dataclasses and new vote system 2024-08-17 13:32:11 +02:00
adator85
ab593b0ae6 . 2024-08-09 02:35:53 +02:00
adator85
e4a0c530a3 Create DataClasses 2024-08-09 02:08:33 +02:00
adator
b0325d4db5 Merge pull request #14 from adator85/dev
V4.1.0 change configuration systeme
2024-08-04 00:37:43 +02:00
adator
b9e4878764 V4.1.0 change configuration systeme 2024-08-04 00:37:03 +02:00
adator
22cec8a0ef Merge pull request #13 from adator85/dev
patch V4.0.4
2024-08-03 21:14:53 +02:00
adator
1837edf1c2 patch V4.0.4 2024-08-03 21:14:28 +02:00
adator
62b10313a4 Merge pull request #12 from adator85/dev
Patch V4.0.3
2024-08-03 21:05:56 +02:00
adator
c369d86a22 Patch V4.0.3 2024-08-03 21:04:38 +02:00
15 changed files with 2643 additions and 759 deletions

7
.gitignore vendored Normal file
View File

@@ -0,0 +1,7 @@
.pyenv/
db/
logs/
__pycache__/
configuration.json
install.log
test.py

140
README.md
View File

@@ -2,70 +2,104 @@
Defender est un service IRC basé sur la sécurité des réseaux IRC ( UnrealIRCD )
Il permet d'ajouter une sécurité supplémentaire pour vérifier les users connectés au réseau
en demandant aux user un code de validation.
Il permet aux opérateurs de gérer efficacement un canal, tout en offrant aux utilisateurs des outils d'interaction et de décision collective.
Pré-requis :
# Fonctionnalités principales
Commandes opérateurs complètes:
Kick: Expulser un utilisateur du canal.
Ban: Interdire définitivement l'accès au canal.
Unban: Lever une interdiction.
Op/Deop: Attribuer ou retirer les droits d'opérateur.
Halfop/Dehalfop: Attribuer ou retirer les droits
Voice/Devoice: Attribuer ou retirer les droits de voix.
- Python version >= 3.10
- Pip de python installé sur la machine
- Python librairies psutil & sqlalchemy & requests
- IRC Serveur Version >= UnrealIRCd-6.1.2.2
Système de quarantaine:
Mise en quarantaine: Isoler temporairement un utilisateur dans un canal privé.
Libération: Permettre à un utilisateur de quitter la quarantaine en entrant un code spécifique.
Lancement de Defender :
Système de vote:
Kick: Les utilisateurs peuvent voter pour expulser un membre du canal.
Autres actions: Possibilité d'étendre le système de vote à d'autres actions (ban, etc.).
- Installer les librairies python : psutil & sqlalchemy & requests
- pip3 install psutil sqlalchemy requests ou pip install psutil sqlalchemy requests
- Ne pas lancer Defender en tant que root
- Créer plutot un service qui lancera Defender en tant qu'utilisateur non root
- Un fichier PID sera crée.
# Installation automatique sur une machine Debian/Ubuntu
# TO DO LIST
Prérequis:
- Système d'exploitation Linux (Windows non supporté)
- Droits d'administrateur (root) pour l'exécution du script
- Python version 3.10 ou supérieure
- Optimiser le systeme de réputation:
- lorsque les users ce connectent, Ils entrent dans un salon puis une fraction de seconde le service les bans
Bash
$ git clone https://github.com/adator85/IRC_DEFENDER_MODULES.git
- Renommer le fichier exemple_configuration.json en configuration.json
- Configurer le fichier configuration.json
$ sudo python3 install.py
# VERSION 1
Si votre configuration est bonne, votre service est censé etre connecté a votre réseau IRC
[02.01.2024]
- Rajout de l'activation de la commande flood
- Les deux variables RESTART et INIT ont été déplacées vers le module Irc
- Nouvelle class Install:
- Le programme va vérifier si les 3 librairies sont installées (SQLAlchemy & requests & psutil)
- Une fois la vérification, il va mêtre a jour pip puis installera les dépendances
# Installation manuelle:
Bash
$ git clone https://github.com/adator85/IRC_DEFENDER_MODULES.git
$ cd IRC_DEFENDER_MODULES
$ python3 -m venv .pyenv
$ source .pyenv/bin/activate
- Créer un service nommé "Defender.service" pour votre service et placer le dans "/etc/systemd/system/"
$ sudo systemctl start Defender
[28.12.2023]
- Changement de méthode pour récuperer la version actuelle de python
- Ajout de la réponse a une PING de la part d'un utilisateur
- Installation automatique des packages sqlalchemy, requests et psutil
# Configuration
# BUG FIX
SERVEUR (Serveur)
SERVEUR_IP: Adresse IP du serveur IRC à rejoindre.
SERVEUR_HOSTNAME: Nom d'hôte du serveur IRC à rejoindre (optionnel).
SERVEUR_LINK: Lien vers le serveur IRC (optionnel).
SERVEUR_PORT: Port de connexion au serveur IRC.
SERVEUR_PASSWORD: Mot de passe d'enregistrement du service sur le serveur IRC.
SERVEUR_ID: Identifiant unique du service.
SERVEUR_SSL: Active la connexion SSL sécurisée au serveur IRC (true/false).
SERVICE (Service)
SERVICE_NAME: Nom du service IRC.
SERVICE_NICKNAME: Surnom utilisé par le service sur le serveur IRC.
SERVICE_REALNAME: Nom réel du service affiché sur le serveur IRC.
SERVICE_USERNAME: Nom d'utilisateur utilisé par le service pour se connecter au serveur IRC.
SERVICE_HOST: Nom d'hôte du service affiché sur le serveur IRC (optionnel).
SERVICE_INFO: Description du service.
SERVICE_CHANLOG: Canal utilisé pour la journalisation des actions du service.
SERVICE_SMODES: Modes serveur appliqués aux canaux rejoints par le service.
SERVICE_CMODES: Modes de canal appliqués aux canaux rejoints par le service.
SERVICE_UMODES: Modes utilisateur appliqués au service.
SERVICE_PREFIX: Caractère utilisé comme préfixe des commandes du service.
COMPTE (Compte)
OWNER: Nom d'utilisateur possédant les droits d'administration du service.
PASSWORD: Mot de passe de l'administrateur du service.
CANAUX (Canaux)
SALON_JAIL: Canal utilisé comme prison pour les utilisateurs sanctionnés.
SALON_JAIL_MODES: Modes appliqués au canal de prison.
SALON_LIBERER: Canal utilisé pour la libération des utilisateurs sanctionnés.
API (API)
API_TIMEOUT: Durée maximale d'attente d'une réponse de l'API en secondes.
SCANNER (Scanner)
PORTS_TO_SCAN: Liste des ports à scanner pour détecter des serveurs potentiellement malveillants.
SÉCURITÉ (Sécurité)
WHITELISTED_IP: Liste d'adresses IP autorisées à contourner certaines restrictions.
GLINE_DURATION: Durée de bannissement temporaire d'un utilisateur en minutes.
DEBUG (Debug)
DEBUG_LEVEL: Niveau de verbosité des messages de debug (plus grand est le nombre, plus il y a d'informations).
COULEURS (Couleurs)
CONFIG_COLOR: Dictionnaire contenant des codes de couleurs IRC pour un meilleur affichage des messages.
[29.12.2023]
- Correction des messages de receptions trop longs > 4070 caractéres;
- la méthode boucle et incrémente la réponse tant que le nombre de caractére reçu est supérieur a 4072
- Rajout du protocol MTAGS a la connexion du service
- Impact majeur dans la lecture des messages reçu du serveur ( PRIVMSG, SLOGS, UID, QUIT, NICK, PONG, SJOIN)
Modification de la configuration
# ALREADY IMPLEMENTED
Vous devez modifier le fichier config.json en remplaçant les valeurs par défaut avec vos propres informations. Assurez-vous de bien lire la description de chaque paramètre pour une configuration optimale du service.
- Connexion en tant que service
- Gestion des messages reçus/envoyés par le serveur
- Gestion des caractéres spéciaux
- Gestion des logs (salon, fichiers et console)
- Mode debug : gestion des logs coté console
- Création du systeme de gestion de commandes
- Defender reconnait les commandes qui commence par le suffix définit dans la configuration
- Defender reconnait aussi reconnaitre les commandes qui viennent de /msg Defender [commande]
- Identifications
- Systéme d'identification [OK]
- Systéme de changement d'information [OK]
- Suppression d'un admin
- Systéme de groupe d'accés [OK]
Attention
Reputation security
- Activation ou désaction du systéme --> OK | .reputation ON/off
- Le user sera en mesure de changer la limite de la réputation --> OK | .reputation set limit 120
- Defender devra envoyer l'utilisateur dans un salon définit dans la configuration --> OK
- Defender bannira l'utilisateur de la totalité des salons, il le bannira aussi lorsqu'il souhaitera accéder a de nouveau salon --> OK
- Defender devra envoyer un message du type "Merci de taper cette comande /msg {nomdudefender} code {un code générer aléatoirement} --> OK
- Defender devra reconnaitre le code --> OK
- Defender devra liberer l'utilisateur et l'envoyer vers un salon définit dans la configuration --> OK
Le mot de passe de l'administrateur et le mot de passe du service doivent être modifiés pour des raisons de sécurité.
Ne partagez pas vos informations de connexion au serveur IRC avec des tiers.
#Extension:
Le code est modulaire et conçu pour être facilement étendu. Vous pouvez ajouter de nouvelles commandes, de nouvelles fonctionnalités (mods/mod_test.py est un exemple pour bien demarrer la création de son module).
# Contributions:
Les contributions sont les bienvenues ! N'hésitez pas à ouvrir des issues ou des pull requests.
# Avertissement:
Ce bot est fourni "tel quel" sans aucune garantie. Utilisez-le à vos risques et périls.

374
core/Model.py Normal file
View File

@@ -0,0 +1,374 @@
from dataclasses import dataclass, field
from datetime import datetime
from typing import Union
from core.base import Base
class User:
@dataclass
class UserModel:
uid: str
nickname: str
username: str
hostname: str
umodes: str
vhost: str
isWebirc: bool
remote_ip: str
score_connexion: int
connexion_datetime: datetime = field(default=datetime.now())
UID_DB: list[UserModel] = []
def __init__(self, Base: Base) -> None:
self.log = Base.logs
pass
def insert(self, newUser: UserModel) -> bool:
"""Insert a new User object
Args:
newUser (UserModel): New userModel object
Returns:
bool: True if inserted
"""
result = False
exist = False
for record in self.UID_DB:
if record.uid == newUser.uid:
# If the user exist then return False and do not go further
exist = True
self.log.debug(f'{record.uid} already exist')
return result
if not exist:
self.UID_DB.append(newUser)
result = True
self.log.debug(f'New User Created: ({newUser})')
if not result:
self.log.critical(f'The User Object was not inserted {newUser}')
return result
def update(self, uid: str, newNickname: str) -> bool:
"""Update the nickname starting from the UID
Args:
uid (str): UID of the user
newNickname (str): New nickname
Returns:
bool: True if updated
"""
result = False
for record in self.UID_DB:
if record.uid == uid:
# If the user exist then update and return True and do not go further
record.nickname = newNickname
result = True
self.log.debug(f'UID ({record.uid}) has been updated with new nickname {newNickname}')
return result
if not result:
self.log.critical(f'The new nickname {newNickname} was not updated, uid = {uid}')
return result
def delete(self, uid: str) -> bool:
"""Delete the User starting from the UID
Args:
uid (str): UID of the user
Returns:
bool: True if deleted
"""
result = False
for record in self.UID_DB:
if record.uid == uid:
# If the user exist then remove and return True and do not go further
self.UID_DB.remove(record)
result = True
self.log.debug(f'UID ({record.uid}) has been deleted')
return result
if not result:
self.log.critical(f'The UID {uid} was not deleted')
return result
def get_User(self, uidornickname: str) -> Union[UserModel, None]:
"""Get The User Object model
Args:
uidornickname (str): UID or Nickname
Returns:
UserModel|None: The UserModel Object | None
"""
User = None
for record in self.UID_DB:
if record.uid == uidornickname:
User = record
elif record.nickname == uidornickname:
User = record
self.log.debug(f'Search {uidornickname} -- result = {User}')
return User
def get_uid(self, uidornickname:str) -> Union[str, None]:
"""Get the UID of the user starting from the UID or the Nickname
Args:
uidornickname (str): UID or Nickname
Returns:
str|None: Return the UID
"""
uid = None
for record in self.UID_DB:
if record.uid == uidornickname:
uid = record.uid
if record.nickname == uidornickname:
uid = record.uid
self.log.debug(f'The UID that you are looking for {uidornickname} has been found {uid}')
return uid
def get_nickname(self, uidornickname:str) -> Union[str, None]:
"""Get the Nickname starting from UID or the nickname
Args:
uidornickname (str): UID or Nickname of the user
Returns:
str|None: the nickname
"""
nickname = None
for record in self.UID_DB:
if record.nickname == uidornickname:
nickname = record.nickname
if record.uid == uidornickname:
nickname = record.nickname
self.log.debug(f'The value to check {uidornickname} -> {nickname}')
return nickname
class Admin:
@dataclass
class AdminModel:
uid: str
nickname: str
username: str
hostname: str
umodes: str
vhost: str
level: int
connexion_datetime: datetime = field(default=datetime.now())
UID_ADMIN_DB: list[AdminModel] = []
def __init__(self, Base: Base) -> None:
self.log = Base.logs
pass
def insert(self, newAdmin: AdminModel) -> bool:
result = False
exist = False
for record in self.UID_ADMIN_DB:
if record.uid == newAdmin.uid:
# If the admin exist then return False and do not go further
exist = True
self.log.debug(f'{record.uid} already exist')
return result
if not exist:
self.UID_ADMIN_DB.append(newAdmin)
result = True
self.log.debug(f'UID ({newAdmin.uid}) has been created')
if not result:
self.log.critical(f'The User Object was not inserted {newAdmin}')
return result
def update(self, uid: str, newNickname: str) -> bool:
result = False
for record in self.UID_ADMIN_DB:
if record.uid == uid:
# If the admin exist, update and do not go further
record.nickname = newNickname
result = True
self.log.debug(f'UID ({record.uid}) has been updated with new nickname {newNickname}')
return result
if not result:
self.log.critical(f'The new nickname {newNickname} was not updated, uid = {uid}')
return result
def delete(self, uid: str) -> bool:
result = False
for record in self.UID_ADMIN_DB:
if record.uid == uid:
# If the admin exist, delete and do not go further
self.UID_ADMIN_DB.remove(record)
result = True
self.log.debug(f'UID ({record.uid}) has been created')
return result
if not result:
self.log.critical(f'The UID {uid} was not deleted')
return result
def get_Admin(self, uidornickname: str) -> Union[AdminModel, None]:
Admin = None
for record in self.UID_ADMIN_DB:
if record.uid == uidornickname:
Admin = record
elif record.nickname == uidornickname:
Admin = record
self.log.debug(f'Search {uidornickname} -- result = {Admin}')
return Admin
def get_uid(self, uidornickname:str) -> Union[str, None]:
uid = None
for record in self.UID_ADMIN_DB:
if record.uid == uidornickname:
uid = record.uid
if record.nickname == uidornickname:
uid = record.uid
self.log.debug(f'The UID that you are looking for {uidornickname} has been found {uid}')
return uid
def get_nickname(self, uidornickname:str) -> Union[str, None]:
nickname = None
for record in self.UID_ADMIN_DB:
if record.nickname == uidornickname:
nickname = record.nickname
if record.uid == uidornickname:
nickname = record.nickname
self.log.debug(f'The value {uidornickname} -- {nickname}')
return nickname
class Channel:
@dataclass
class ChannelModel:
name: str
uids: list
UID_CHANNEL_DB: list[ChannelModel] = []
def __init__(self, Base: Base) -> None:
self.log = Base.logs
self.Base = Base
pass
def insert(self, newChan: ChannelModel) -> bool:
"""This method will insert a new channel and if the channel exist it will update the user list (uids)
Args:
newChan (ChannelModel): The channel model object
Returns:
bool: True if new channel, False if channel exist (However UID could be updated)
"""
result = False
exist = False
for record in self.UID_CHANNEL_DB:
if record.name == newChan.name:
# If the channel exist, update the user list and do not go further
exist = True
self.log.debug(f'{record.name} already exist')
for user in newChan.uids:
record.uids.append(user)
# Supprimer les doublons
del_duplicates = list(set(record.uids))
record.uids = del_duplicates
self.log.debug(f'Updating a new UID to the channel {record}')
return result
if not exist:
# If the channel don't exist, then create it
self.UID_CHANNEL_DB.append(newChan)
result = True
self.log.debug(f'New Channel Created: ({newChan})')
if not result:
self.log.critical(f'The Channel Object was not inserted {newChan}')
return result
def delete(self, name: str) -> bool:
result = False
for record in self.UID_CHANNEL_DB:
if record.name == name:
# If the channel exist, then remove it and return True.
# As soon as the channel found, return True and stop the loop
self.UID_CHANNEL_DB.remove(record)
result = True
self.log.debug(f'Channel ({record.name}) has been created')
return result
if not result:
self.log.critical(f'The Channel {name} was not deleted')
return result
def delete_user_from_channel(self, chan_name: str, uid:str) -> bool:
try:
result = False
for record in self.UID_CHANNEL_DB:
if record.name == chan_name:
for user_id in record.uids:
if self.Base.clean_uid(user_id) == uid:
record.uids.remove(user_id)
self.log.debug(f'The UID {uid} has been removed, here is the new object: {record}')
result = True
for record in self.UID_CHANNEL_DB:
if not record.uids:
self.UID_CHANNEL_DB.remove(record)
self.log.debug(f'The Channel {record.name} has been removed, here is the new object: {record}')
return result
except ValueError as ve:
self.log.error(f'{ve}')
def get_Channel(self, name: str) -> Union[ChannelModel, None]:
Channel = None
for record in self.UID_CHANNEL_DB:
if record.name == name:
Channel = record
self.log.debug(f'Search {name} -- result = {Channel}')
return Channel

View File

@@ -1,31 +1,22 @@
import time, threading, os, random, socket, hashlib, ipaddress, logging, requests, json, sys
import time, threading, os, random, socket, hashlib, ipaddress, logging, requests, json, re
from typing import Union
from base64 import b64decode
from datetime import datetime
from sqlalchemy import create_engine, Engine, Connection, CursorResult
from sqlalchemy.sql import text
from core.configuration import Config
from core.loadConf import ConfigDataModel
class Base:
CORE_DB_PATH = 'core' + os.sep + 'db' + os.sep # Le dossier bases de données core
MODS_DB_PATH = 'mods' + os.sep + 'db' + os.sep # Le dossier bases de données des modules
PYTHON_MIN_VERSION = '3.10' # Version min de python
DB_SCHEMA:list[str] = {
'admins': 'sys_admins',
'commandes': 'sys_commandes',
'logs': 'sys_logs',
'modules': 'sys_modules'
}
DEFENDER_VERSION = '' # MAJOR.MINOR.BATCH
LATEST_DEFENDER_VERSION = '' # Latest Version of Defender in git
DEFENDER_DB_PATH = 'db' + os.sep # Séparateur en fonction de l'OS
DEFENDER_DB_NAME = 'defender' # Le nom de la base de données principale
def __init__(self, Config: Config) -> None:
def __init__(self, Config: ConfigDataModel) -> None:
self.Config = Config # Assigner l'objet de configuration
self.init_log_system() # Demarrer le systeme de log
self.check_for_new_version() # Verifier si une nouvelle version est disponible
self.check_for_new_version(True) # Verifier si une nouvelle version est disponible
self.running_timers:list[threading.Timer] = [] # Liste des timers en cours
self.running_threads:list[threading.Thread] = [] # Liste des threads en cours
@@ -48,12 +39,15 @@ class Base:
with open(version_filename, 'r') as version_data:
current_version:dict[str, str] = json.load(version_data)
self.DEFENDER_VERSION = current_version["version"]
# self.DEFENDER_VERSION = current_version["version"]
self.Config.current_version = current_version['version']
return None
def __get_latest_defender_version(self) -> None:
try:
self.logs.debug(f'Looking for a new version available on Github')
print(f'===> Looking for a new version available on Github')
token = ''
json_url = f'https://raw.githubusercontent.com/adator85/IRC_DEFENDER_MODULES/main/version.json'
headers = {
@@ -62,13 +56,14 @@ class Base:
}
if token == '':
response = requests.get(json_url)
response = requests.get(json_url, timeout=self.Config.API_TIMEOUT)
else:
response = requests.get(json_url, headers=headers)
response = requests.get(json_url, headers=headers, timeout=self.Config.API_TIMEOUT)
response.raise_for_status() # Vérifie si la requête a réussi
json_response:dict = response.json()
self.LATEST_DEFENDER_VERSION = json_response["version"]
# self.LATEST_DEFENDER_VERSION = json_response["version"]
self.Config.latest_version = json_response['version']
return None
except requests.HTTPError as err:
@@ -76,16 +71,20 @@ class Base:
except:
self.logs.warning(f'Github not available to fetch latest version')
def check_for_new_version(self) -> bool:
def check_for_new_version(self, online:bool) -> bool:
try:
self.logs.debug(f'Checking for a new service version')
# Assigner la version actuelle de Defender
self.__set_current_defender_version()
self.__set_current_defender_version()
# Récuperer la dernier version disponible dans github
self.__get_latest_defender_version()
if online:
self.logs.debug(f'Retrieve the latest version from Github')
self.__get_latest_defender_version()
isNewVersion = False
latest_version = self.LATEST_DEFENDER_VERSION
current_version = self.DEFENDER_VERSION
latest_version = self.Config.latest_version
current_version = self.Config.current_version
curr_major , curr_minor, curr_patch = current_version.split('.')
last_major, last_minor, last_patch = latest_version.split('.')
@@ -121,6 +120,16 @@ class Base:
currentdate = datetime.now().strftime('%d-%m-%Y %H:%M:%S')
return currentdate
def get_all_modules(self) -> list:
all_files = os.listdir('mods/')
all_modules: list = []
for module in all_files:
if module.endswith('.py') and not module == '__init__.py':
all_modules.append(module.replace('.py', '').lower())
return all_modules
def create_log(self, log_message: str) -> None:
"""Enregiste les logs
@@ -130,7 +139,7 @@ class Base:
Returns:
None: Aucun retour
"""
sql_insert = f"INSERT INTO {self.DB_SCHEMA['logs']} (datetime, server_msg) VALUES (:datetime, :server_msg)"
sql_insert = f"INSERT INTO {self.Config.table_log} (datetime, server_msg) VALUES (:datetime, :server_msg)"
mes_donnees = {'datetime': str(self.get_datetime()),'server_msg': f'{log_message}'}
self.db_execute_query(sql_insert, mes_donnees)
@@ -166,7 +175,7 @@ class Base:
cmd_list[2] = '*******'
cmd = ' '.join(cmd_list)
insert_cmd_query = f"INSERT INTO {self.DB_SCHEMA['commandes']} (datetime, user, commande) VALUES (:datetime, :user, :commande)"
insert_cmd_query = f"INSERT INTO {self.Config.table_commande} (datetime, user, commande) VALUES (:datetime, :user, :commande)"
mes_donnees = {'datetime': self.get_datetime(), 'user': user_cmd, 'commande': cmd}
self.db_execute_query(insert_cmd_query, mes_donnees)
@@ -181,7 +190,7 @@ class Base:
Returns:
bool: True si le module existe déja dans la base de données sinon False
"""
query = f"SELECT id FROM {self.DB_SCHEMA['modules']} WHERE module = :module"
query = f"SELECT id FROM {self.Config.table_module} WHERE module = :module"
mes_donnes = {'module': module_name}
results = self.db_execute_query(query, mes_donnes)
@@ -199,7 +208,7 @@ class Base:
if not self.db_isModuleExist(module_name):
self.logs.debug(f"Le module {module_name} n'existe pas alors ont le créer")
insert_cmd_query = f"INSERT INTO {self.DB_SCHEMA['modules']} (datetime, user, module) VALUES (:datetime, :user, :module)"
insert_cmd_query = f"INSERT INTO {self.Config.table_module} (datetime, user, module) VALUES (:datetime, :user, :module)"
mes_donnees = {'datetime': self.get_datetime(), 'user': user_cmd, 'module': module_name}
self.db_execute_query(insert_cmd_query, mes_donnees)
# self.db_close_session(self.session)
@@ -214,7 +223,7 @@ class Base:
Args:
cmd (str): le module a enregistrer
"""
insert_cmd_query = f"DELETE FROM {self.DB_SCHEMA['modules']} WHERE module = :module"
insert_cmd_query = f"DELETE FROM {self.Config.table_module} WHERE module = :module"
mes_donnees = {'module': module_name}
self.db_execute_query(insert_cmd_query, mes_donnees)
@@ -222,14 +231,20 @@ class Base:
def db_create_first_admin(self) -> None:
user = self.db_execute_query(f"SELECT id FROM {self.DB_SCHEMA['admins']}")
user = self.db_execute_query(f"SELECT id FROM {self.Config.table_admin}")
if not user.fetchall():
admin = self.Config.OWNER
password = self.crypt_password(self.Config.PASSWORD)
mes_donnees = {'createdOn': self.get_datetime(), 'user': admin, 'password': password, 'hostname': '*', 'vhost': '*', 'level': 5}
mes_donnees = {'createdOn': self.get_datetime(),
'user': admin,
'password': password,
'hostname': '*',
'vhost': '*',
'level': 5
}
self.db_execute_query(f"""
INSERT INTO {self.DB_SCHEMA['admins']}
INSERT INTO {self.Config.table_admin}
(createdOn, user, password, hostname, vhost, level)
VALUES
(:createdOn, :user, :password, :hostname, :vhost, :level)"""
@@ -297,7 +312,7 @@ class Base:
if thread.getName() != 'heartbeat':
if not thread.is_alive():
self.running_threads.remove(thread)
self.logs.debug(f"Thread {str(thread.getName())} {str(thread.native_id)} removed")
self.logs.info(f"Thread {str(thread.getName())} {str(thread.native_id)} removed")
# print(threading.enumerate())
except AssertionError as ae:
@@ -348,8 +363,8 @@ class Base:
def db_init(self) -> tuple[Engine, Connection]:
db_directory = self.DEFENDER_DB_PATH
full_path_db = self.DEFENDER_DB_PATH + self.DEFENDER_DB_NAME
db_directory = self.Config.db_path
full_path_db = self.Config.db_path + self.Config.db_name
if not os.path.exists(db_directory):
os.makedirs(db_directory)
@@ -361,14 +376,14 @@ class Base:
def __create_db(self) -> None:
table_logs = f'''CREATE TABLE IF NOT EXISTS {self.DB_SCHEMA['logs']} (
table_logs = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_log} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
server_msg TEXT
)
'''
table_cmds = f'''CREATE TABLE IF NOT EXISTS {self.DB_SCHEMA['commandes']} (
table_cmds = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_commande} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
user TEXT,
@@ -376,7 +391,7 @@ class Base:
)
'''
table_modules = f'''CREATE TABLE IF NOT EXISTS {self.DB_SCHEMA['modules']} (
table_modules = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_module} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
user TEXT,
@@ -384,7 +399,7 @@ class Base:
)
'''
table_admins = f'''CREATE TABLE IF NOT EXISTS {self.DB_SCHEMA['admins']} (
table_admins = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_admin} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
createdOn TEXT,
user TEXT,
@@ -464,6 +479,17 @@ class Base:
except ValueError:
return False
def decode_ip(self, ip_b64encoded: str) -> Union[str, None]:
binary_ip = b64decode(ip_b64encoded)
try:
decoded_ip = ipaddress.ip_address(binary_ip)
return decoded_ip.exploded
except ValueError as ve:
self.logs.critical(f'This remote ip is not valid : {ve}')
return None
def get_random(self, lenght:int) -> str:
"""
Retourn une chaîne aléatoire en fonction de la longueur spécifiée.
@@ -491,3 +517,36 @@ class Base:
# Vider le dictionnaire de fonction
self.periodic_func.clear()
def clean_uid(self, uid:str) -> str:
"""Clean UID by removing @ / % / + / Owner / and *
Args:
uid (str): The UID to clean
Returns:
str: Clean UID without any sign
"""
pattern = fr'[@|%|\+|~|\*]*'
parsed_UID = re.sub(pattern, '', uid)
return parsed_UID
def Is_Channel(self, channelToCheck: str) -> bool:
"""Check if the string has the # caractere and return True if this is a channel
Args:
channelToCheck (str): The string to test if it is a channel or not
Returns:
bool: True if the string is a channel / False if this is not a channel
"""
pattern = fr'^#'
isChannel = re.findall(pattern, channelToCheck)
if not isChannel:
return False
else:
return True

274
core/dataClass.py Normal file
View File

@@ -0,0 +1,274 @@
from dataclasses import dataclass, field
from datetime import datetime
from typing import Union
class User:
@dataclass
class UserDataClass:
uid: str
nickname: str
username: str
hostname: str
umodes: str
vhost: str
isWebirc: bool
connexion_datetime: datetime = field(default=datetime.now())
UID_DB:list[UserDataClass] = []
def __init__(self) -> None:
pass
def insert(self, user: UserDataClass) -> bool:
"""Insert new user
Args:
user (UserDataClass): The User dataclass
Returns:
bool: True if the record has been created
"""
exists = False
inserted = False
for record in self.UID_DB:
if record.uid == user.uid:
exists = True
print(f'{user.uid} already exist')
if not exists:
self.UID_DB.append(user)
print(f'New record with uid: {user.uid}')
inserted = True
return inserted
def update(self, uid: str, newnickname: str) -> bool:
"""Updating a single record with a new nickname
Args:
uid (str): the uid of the user
newnickname (str): the new nickname
Returns:
bool: True if the record has been updated
"""
status = False
for user in self.UID_DB:
if user.uid == uid:
user.nickname = newnickname
status = True
print(f'Updating record with uid: {uid}')
return status
def delete(self, uid: str) -> bool:
"""Delete a user based on his uid
Args:
uid (str): The UID of the user
Returns:
bool: True if the record has been deleted
"""
status = False
for user in self.UID_DB:
if user.uid == uid:
self.UID_DB.remove(user)
status = True
print(f'Removing record with uid: {uid}')
return status
def isexist(self, uidornickname:str) -> bool:
"""do the UID or Nickname exist ?
Args:
uidornickname (str): The UID or the Nickname
Returns:
bool: True if exist or False if don't exist
"""
result = False
for record in self.UID_DB:
if record.uid == uidornickname:
result = True
if record.nickname == uidornickname:
result = True
return result
def get_User(self, uidornickname) -> Union[UserDataClass, None]:
UserObject = None
for record in self.UID_DB:
if record.uid == uidornickname:
UserObject = record
elif record.nickname == uidornickname:
UserObject = record
return UserObject
def get_uid(self, uidornickname:str) -> Union[str, None]:
uid = None
for record in self.UID_DB:
if record.uid == uidornickname:
uid = record.uid
if record.nickname == uidornickname:
uid = record.uid
return uid
def get_nickname(self, uidornickname:str) -> Union[str, None]:
nickname = None
for record in self.UID_DB:
if record.nickname == uidornickname:
nickname = record.nickname
if record.uid == uidornickname:
nickname = record.nickname
return nickname
class Admin:
@dataclass
class AdminDataClass:
uid: str
nickname: str
username: str
hostname: str
umodes: str
vhost: str
level: int
connexion_datetime: datetime = field(default=datetime.now())
UID_ADMIN_DB:list[AdminDataClass] = []
def __init__(self) -> None:
pass
def insert(self, admin: AdminDataClass) -> bool:
"""Insert new user
Args:
user (UserDataClass): The User dataclass
Returns:
bool: True if the record has been created
"""
exists = False
inserted = False
for record in self.UID_ADMIN_DB:
if record.uid == admin.uid:
exists = True
print(f'{admin.uid} already exist')
if not exists:
self.UID_ADMIN_DB.append(admin)
print(f'New record with uid: {admin.uid}')
inserted = True
return inserted
def update(self, uid: str, newnickname: str) -> bool:
"""Updating a single record with a new nickname
Args:
uid (str): the uid of the user
newnickname (str): the new nickname
Returns:
bool: True if the record has been updated
"""
status = False
for admin in self.UID_ADMIN_DB:
if admin.uid == uid:
admin.nickname = newnickname
status = True
print(f'Updating record with uid: {uid}')
return status
def delete(self, uid: str) -> bool:
"""Delete a user based on his uid
Args:
uid (str): The UID of the user
Returns:
bool: True if the record has been deleted
"""
status = False
for admin in self.UID_ADMIN_DB:
if admin.uid == uid:
self.UID_ADMIN_DB.remove(admin)
status = True
print(f'Removing record with uid: {uid}')
return status
def isexist(self, uidornickname:str) -> bool:
"""do the UID or Nickname exist ?
Args:
uidornickname (str): The UID or the Nickname
Returns:
bool: True if exist or False if don't exist
"""
result = False
for record in self.UID_ADMIN_DB:
if record.uid == uidornickname:
result = True
if record.nickname == uidornickname:
result = True
return result
def get_Admin(self, uidornickname) -> Union[AdminDataClass, None]:
AdminObject = None
for record in self.UID_ADMIN_DB:
if record.uid == uidornickname:
AdminObject = record
elif record.nickname == uidornickname:
AdminObject = record
return AdminObject
def get_uid(self, uidornickname:str) -> Union[str, None]:
uid = None
for record in self.UID_ADMIN_DB:
if record.uid == uidornickname:
uid = record.uid
if record.nickname == uidornickname:
uid = record.uid
return uid
def get_nickname(self, uidornickname:str) -> Union[str, None]:
nickname = None
for record in self.UID_ADMIN_DB:
if record.nickname == uidornickname:
nickname = record.nickname
if record.uid == uidornickname:
nickname = record.nickname
return nickname
def get_level(self, uidornickname:str) -> int:
level = 0
for record in self.UID_ADMIN_DB:
if record.uid == uidornickname:
level = record.level
if record.nickname == uidornickname:
level = record.level
return level

View File

@@ -0,0 +1,48 @@
{
"SERVEUR_IP": "0.0.0.0",
"SERVEUR_HOSTNAME": "your.host.name",
"SERVEUR_LINK": "your.link.to.server",
"SERVEUR_PORT": 7002,
"SERVEUR_PASSWORD": "link_password",
"SERVEUR_ID": "006",
"SERVEUR_SSL": true,
"SERVICE_NAME": "defender",
"SERVICE_NICKNAME": "BotNickname",
"SERVICE_REALNAME": "BotRealname",
"SERVICE_USERNAME": "BotUsername",
"SERVICE_HOST": "your.service.hostname",
"SERVICE_INFO": "Network IRC Service",
"SERVICE_CHANLOG": "#services",
"SERVICE_SMODES": "+ioqBS",
"SERVICE_CMODES": "ntsO",
"SERVICE_UMODES": "o",
"SERVICE_PREFIX": "!",
"OWNER": "admin",
"PASSWORD": "password",
"SALON_JAIL": "#jail",
"SALON_JAIL_MODES": "sS",
"SALON_LIBERER": "#welcome",
"API_TIMEOUT": 2,
"PORTS_TO_SCAN": [3028, 8080, 1080, 1085, 4145, 9050],
"WHITELISTED_IP": ["127.0.0.1"],
"GLINE_DURATION": "30",
"DEBUG_LEVEL": 20,
"CONFIG_COLOR": {
"blanche": "\\u0003\\u0030",
"noire": "\\u0003\\u0031",
"bleue": "\\u0003\\u0020",
"verte": "\\u0003\\u0033",
"rouge": "\\u0003\\u0034",
"jaune": "\\u0003\\u0036",
"gras": "\\u0002",
"nogc": "\\u0002\\u0003"
}
}

View File

@@ -1,53 +0,0 @@
##########################################
# CONFIGURATION FILE : #
# Rename file to : configuration.py #
##########################################
class Config:
SERVEUR_IP = '0.0.0.0' # IP ou host du serveur à rejoindre
SERVEUR_HOSTNAME = 'your hostname' # Le hostname du serveur IRC
SERVEUR_LINK = 'your link' # Host attendu par votre IRCd (ex. dans votre link block pour Unrealircd)
SERVEUR_PORT = 6666 # Port du link
SERVEUR_PASSWORD = 'your link password' # Mot de passe du link (Privilégiez argon2 sur Unrealircd)
SERVEUR_ID = '002' # SID (identification) du bot en tant que Services
SERVEUR_SSL = True # Activer / Desactiver la connexion SSL
SERVICE_NAME = 'defender' # Le nom du service
SERVICE_NICKNAME = 'BotName' # Nick du bot sur IRC
SERVICE_REALNAME = 'BotRealname' # Realname du bot
SERVICE_USERNAME = 'BotIdent' # Ident du bot
SERVICE_HOST = 'your service host' # Host du bot
SERVICE_INFO = 'Network IRC Service' # swhois du bot
SERVICE_CHANLOG = '#services' # Salon des logs et autres messages issus du bot
SERVICE_SMODES = '+ioqBS' # Mode du service
SERVICE_CMODES = 'ntsO' # Mode du salon (#ChanLog) que le bot appliquera à son entrée
SERVICE_UMODES = 'o' # Mode que le bot pourra se donner à sa connexion au salon chanlog
SERVICE_PREFIX = '.' # Prefix pour envoyer les commandes au bot
SERVICE_ID = SERVEUR_ID + 'AAAAAB' # L'identifiant du service
OWNER = 'admin' # Identifiant du compte admin
PASSWORD = 'password' # Mot de passe du compte admin
SALON_JAIL = '#JAIL' # Salon pot de miel
SALON_JAIL_MODES = 'sS' # Mode du salon pot de miel
SALON_LIBERER = '#welcome' # Le salon ou sera envoyé l'utilisateur clean
API_TIMEOUT = 2 # Timeout des api's
PORTS_TO_SCAN = [3028, 8080, 1080, 1085, 4145, 9050] # Liste des ports a scanné pour une detection de proxy
WHITELISTED_IP = ['127.0.0.1'] # IP a ne pas scanner
GLINE_DURATION = '1d' # La durée du gline
DEBUG_LEVEL = 10 # Le niveau des logs DEBUG 10 | INFO 20 | WARNING 30 | ERROR 40 | CRITICAL 50
CONFIG_COLOR = {
'blanche': '\x0300', # Couleur blanche
'noire': '\x0301', # Couleur noire
'bleue': '\x0302', # Couleur Bleue
'verte': '\x0303', # Couleur Verte
'rouge': '\x0304', # Couleur rouge
'jaune': '\x0306', # Couleur jaune
'gras': '\x02', # Gras
'nogc': '\x02\x03' # Retirer gras et couleur
}

View File

@@ -1,12 +1,16 @@
from importlib.util import find_spec
from subprocess import check_call, run
from subprocess import check_call, run, CalledProcessError
from platform import python_version
from sys import exit
import os
class Install:
def __init__(self) -> None:
self.PYTHON_MIN_VERSION = '3.10'
self.venv_folder_name = '.pyenv'
self.cmd_venv_command = ['python3', '-m', 'venv', self.venv_folder_name]
self.module_to_install = ['sqlalchemy','psutil','requests']
if not self.checkPythonVersion():
@@ -38,6 +42,17 @@ class Install:
return True
def run_subprocess(self, command:list) -> None:
print(command)
try:
check_call(command)
print("La commande s'est terminée avec succès.")
except CalledProcessError as e:
print(f"La commande a échoué avec le code de retour :{e.returncode}")
print(f"Try to install dependencies ...")
exit(5)
def checkDependencies(self) -> None:
"""### Verifie les dépendances si elles sont installées
- Test si les modules sont installés
@@ -46,6 +61,11 @@ class Install:
"""
do_install = False
# Check if virtual env exist
if not os.path.exists(f'{self.venv_folder_name}'):
self.run_subprocess(self.cmd_venv_command)
do_install = True
for module in self.module_to_install:
if find_spec(module) is None:
do_install = True
@@ -70,3 +90,10 @@ class Install:
print(f"====> Module {module} installé")
else:
print(f"==> {module} already installed")
print(f"#"*12)
print("Installation complete ...")
print("You must change environment using the command below")
print(f"source {self.venv_folder_name}{os.sep}bin{os.sep}activate")
print(f"#"*12)
exit(1)

View File

@@ -2,7 +2,8 @@ import ssl, re, importlib, sys, time, threading, socket
from ssl import SSLSocket
from datetime import datetime, timedelta
from typing import Union
from core.configuration import Config
from core.loadConf import Config
from core.Model import User, Admin, Channel
from core.base import Base
class Irc:
@@ -10,8 +11,7 @@ class Irc:
def __init__(self) -> 'Irc':
self.defender_connexion_datetime = datetime.now() # Date et heure de la premiere connexion de Defender
self.db_uid = {} # Definir la variable qui contiendra la liste des utilisateurs connectés au réseau
self.db_admin = {} # Definir la variable qui contiendra la liste des administrateurs
self.first_score: int = 100
self.db_chan = [] # Definir la variable qui contiendra la liste des salons
self.loaded_classes:dict[str, 'Irc'] = {} # Definir la variable qui contiendra la liste modules chargés
self.beat = 30 # Lancer toutes les 30 secondes des actions de nettoyages
@@ -24,13 +24,13 @@ class Irc:
self.CHARSET = ['utf-8', 'iso-8859-1'] # Charset utiliser pour décoder/encoder les messages
self.SSL_VERSION = None # Version SSL
self.Config = Config()
self.Config = Config().ConfigObject
# Liste des commandes internes du bot
self.commands_level = {
0: ['help', 'auth', 'copyright'],
1: ['load','reload','unload', 'deauth', 'uptime', 'checkversion'],
2: ['show_modules', 'show_timers', 'show_threads', 'sentinel'],
2: ['show_modules', 'show_timers', 'show_threads', 'show_channels'],
3: ['quit', 'restart','addaccess','editaccess', 'delaccess']
}
@@ -41,12 +41,20 @@ class Irc:
self.commands.append(command)
self.Base = Base(self.Config)
self.User = User(self.Base)
self.Admin = Admin(self.Base)
self.Channel = Channel(self.Base)
self.Base.create_thread(func=self.heartbeat, func_args=(self.beat, ))
##############################################
# CONNEXION IRC #
##############################################
def init_irc(self, ircInstance:'Irc') -> None:
"""Create a socket and connect to irc server
Args:
ircInstance (Irc): Instance of Irc object.
"""
try:
self.__create_socket()
self.__connect_to_irc(ircInstance)
@@ -54,7 +62,8 @@ class Irc:
self.Base.logs.critical(f'Assertion error: {ae}')
def __create_socket(self) -> None:
"""Create a socket to connect SSL or Normal connection
"""
try:
soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM or socket.SOCK_NONBLOCK)
connexion_information = (self.Config.SERVEUR_IP, self.Config.SERVEUR_PORT)
@@ -65,9 +74,8 @@ class Irc:
ssl_connexion = ssl_context.wrap_socket(soc, server_hostname=self.Config.SERVEUR_HOSTNAME)
ssl_connexion.connect(connexion_information)
self.IrcSocket:SSLSocket = ssl_connexion
self.Base.logs.info(f"Connexion en mode SSL : Version = {self.IrcSocket.version()}")
self.SSL_VERSION = self.IrcSocket.version()
self.Base.logs.info(f"Connexion en mode SSL : Version = {self.SSL_VERSION}")
else:
soc.connect(connexion_information)
self.IrcSocket:socket.socket = soc
@@ -111,6 +119,11 @@ class Irc:
time.sleep(0.5)
self.Base.logs.warning('--> Waiting for socket to close ...')
# Reload configuration
self.Base.logs.debug('Reloading configuration')
self.Config = Config().ConfigObject
self.Base = Base(self.Config)
self.__create_socket()
self.__link(self.IrcSocket)
self.load_existing_modules()
@@ -157,7 +170,7 @@ class Irc:
# except Exception as e:
# self.debug(f"Exception: {e}")
def __link(self, writer:socket.socket) -> None:
def __link(self, writer:Union[socket.socket, SSLSocket]) -> None:
"""Créer le link et envoyer les informations nécessaires pour la
connexion au serveur.
@@ -181,7 +194,7 @@ class Irc:
sid = self.Config.SERVEUR_ID
service_id = self.Config.SERVICE_ID
version = self.Base.DEFENDER_VERSION
version = self.Config.current_version
unixtime = self.Base.get_unixtime()
# Envoyer un message d'identification
@@ -247,6 +260,10 @@ class Irc:
except AssertionError as ae:
self.Base.logs.error(f"Assertion error : {ae}")
def unload(self) -> None:
# This is only to reference the method
return None
##############################################
# FIN CONNEXION IRC #
##############################################
@@ -257,7 +274,7 @@ class Irc:
Returns:
None: Aucun retour requis, elle charge puis c'est tout
"""
result = self.Base.db_execute_query(f"SELECT module FROM {self.Base.DB_SCHEMA['modules']}")
result = self.Base.db_execute_query(f"SELECT module FROM {self.Config.table_module}")
for r in result.fetchall():
self.load_module('sys', r[0], True)
@@ -378,7 +395,15 @@ class Irc:
my_class = getattr(loaded_module, class_name, None) # Récuperer le nom de classe
create_instance_of_the_class = my_class(self.ircObject) # Créer une nouvelle instance de la classe
self.loaded_classes[class_name] = create_instance_of_the_class # Charger la nouvelle class dans la variable globale
if not hasattr(create_instance_of_the_class, 'cmd'):
self.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} :Module {module_name} ne contient pas de méthode cmd")
self.Base.logs.critical(f"The Module {module_name} has not been loaded because cmd method is not available")
self.Base.db_delete_module(module_name)
return False
# Charger la nouvelle class dans la variable globale
self.loaded_classes[class_name] = create_instance_of_the_class
# Enregistrer le module dans la base de données
if not init:
@@ -394,149 +419,54 @@ class Irc:
except Exception as e:
self.Base.logs.error(f"Something went wrong with a module you want to load : {e}")
def insert_db_uid(self, uid:str, nickname:str, username:str, hostname:str, umodes:str, vhost:str, isWebirc: bool) -> None:
if uid in self.db_uid:
return None
self.db_uid[uid] = {
'nickname': nickname,
'username': username,
'hostname': hostname,
'umodes': umodes,
'vhost': vhost,
'isWebirc': isWebirc,
'datetime': datetime.now()
}
self.db_uid[nickname] = {
'uid': uid,
'username': username,
'hostname': hostname,
'umodes': umodes,
'vhost': vhost,
'isWebirc': isWebirc,
'datetime': datetime.now()
}
return None
def update_db_uid(self, uid:str, newnickname:str) -> None:
# Récupérer l'ancien nickname
oldnickname = self.db_uid[uid]['nickname']
# Enregistrement du nouveau nickname
self.db_uid[newnickname] = {
'uid': uid,
'username': self.db_uid[uid]['username'],
'hostname': self.db_uid[uid]['hostname'],
'umodes': self.db_uid[uid]['umodes'],
'vhost': self.db_uid[uid]['vhost']
}
# Modification du nickname dans la ligne UID
self.db_uid[uid]['nickname'] = newnickname
# Supprimer l'ancien nickname
if oldnickname in self.db_uid:
del self.db_uid[oldnickname]
else:
self.Base.logs.debug(f"L'ancien nickname {oldnickname} n'existe pas dans UID_DB")
response = False
self.Base.logs.debug(f"{oldnickname} changed to {newnickname}")
return None
def delete_db_uid(self, uid:str) -> None:
uid_reel = self.get_uid(uid)
nickname = self.get_nickname(uid_reel)
if uid_reel in self.db_uid:
del self.db_uid[uid]
if nickname in self.db_uid:
del self.db_uid[nickname]
return None
def insert_db_admin(self, uid:str, level:int) -> None:
if not uid in self.db_uid:
if self.User.get_User(uid) is None:
return None
getUser = self.User.get_User(uid)
nickname = self.db_uid[uid]['nickname']
username = self.db_uid[uid]['username']
hostname = self.db_uid[uid]['hostname']
umodes = self.db_uid[uid]['umodes']
vhost = self.db_uid[uid]['vhost']
nickname = getUser.nickname
username = getUser.username
hostname = getUser.hostname
umodes = getUser.umodes
vhost = getUser.vhost
level = int(level)
self.db_admin[uid] = {
'nickname': nickname,
'username': username,
'hostname': hostname,
'umodes': umodes,
'vhost': vhost,
'datetime': self.Base.get_datetime(),
'level': level
}
self.db_admin[nickname] = {
'uid': uid,
'username': username,
'hostname': hostname,
'umodes': umodes,
'vhost': vhost,
'datetime': self.Base.get_datetime(),
'level': level
}
self.Admin.insert(
self.Admin.AdminModel(
uid=uid,
nickname=nickname,
username=username,
hostname=hostname,
umodes=umodes,
vhost=vhost,
level=level,
connexion_datetime=datetime.now()
)
)
return None
def delete_db_admin(self, uid:str) -> None:
if not uid in self.db_admin:
if self.Admin.get_Admin(uid) is None:
return None
nickname_admin = self.db_admin[uid]['nickname']
if uid in self.db_admin:
del self.db_admin[uid]
if nickname_admin in self.db_admin:
del self.db_admin[nickname_admin]
if not self.Admin.delete(uid):
self.Base.logs.critical(f'UID: {uid} was not deleted')
return None
def insert_db_chan(self, channel:str) -> bool:
"""Ajouter l'ensemble des salons dans la variable {CHAN_DB}
Args:
channel (str): le salon à insérer dans {CHAN_DB}
Returns:
bool: True si insertion OK / False si insertion KO
"""
if channel in self.db_chan:
return False
response = True
# Ajouter un nouveau salon
self.db_chan.append(channel)
# Supprimer les doublons de la liste
self.db_chan = list(set(self.db_chan))
self.Base.logs.debug(f"Le salon {channel} a été ajouté à la liste CHAN_DB")
return response
def create_defender_user(self, nickname:str, level: int, password:str) -> str:
nickname = self.get_nickname(nickname)
get_user = self.User.get_User(nickname)
if get_user is None:
response = f'This nickname {nickname} does not exist, it is not possible to create this user'
self.Base.logs.warning(response)
return response
nickname = get_user.nickname
response = ''
if level > 4:
@@ -544,25 +474,19 @@ class Irc:
self.Base.logs.warning(response)
return response
# Verification si le user existe dans notre UID_DB
if not nickname in self.db_uid:
response = f"{nickname} n'est pas connecté, impossible de l'enregistrer pour le moment"
self.Base.logs.warning(response)
return response
hostname = self.db_uid[nickname]['hostname']
vhost = self.db_uid[nickname]['vhost']
hostname = get_user.hostname
vhost = get_user.vhost
spassword = self.Base.crypt_password(password)
mes_donnees = {'admin': nickname}
query_search_user = f"SELECT id FROM {self.Base.DB_SCHEMA['admins']} WHERE user=:admin"
query_search_user = f"SELECT id FROM {self.Config.table_admin} WHERE user=:admin"
r = self.Base.db_execute_query(query_search_user, mes_donnees)
exist_user = r.fetchone()
# On verifie si le user exist dans la base
if not exist_user:
mes_donnees = {'datetime': self.Base.get_datetime(), 'user': nickname, 'password': spassword, 'hostname': hostname, 'vhost': vhost, 'level': level}
self.Base.db_execute_query(f'''INSERT INTO {self.Base.DB_SCHEMA['admins']}
self.Base.db_execute_query(f'''INSERT INTO {self.Config.table_admin}
(createdOn, user, password, hostname, vhost, level) VALUES
(:datetime, :user, :password, :hostname, :vhost, :level)
''', mes_donnees)
@@ -576,41 +500,15 @@ class Irc:
self.Base.logs.info(response)
return response
def get_uid(self, uidornickname:str) -> Union[str, None]:
uid_recherche = uidornickname
response = None
for uid, value in self.db_uid.items():
if uid == uid_recherche:
if 'nickname' in value:
response = uid
if 'uid' in value:
response = value['uid']
return response
def get_nickname(self, uidornickname:str) -> Union[str, None]:
nickname_recherche = uidornickname
response = None
for nickname, value in self.db_uid.items():
if nickname == nickname_recherche:
if 'nickname' in value:
response = value['nickname']
if 'uid' in value:
response = nickname
return response
def is_cmd_allowed(self,nickname:str, cmd:str) -> bool:
def is_cmd_allowed(self, nickname:str, cmd:str) -> bool:
# Vérifier si le user est identifié et si il a les droits
is_command_allowed = False
uid = self.get_uid(nickname)
uid = self.User.get_uid(nickname)
get_admin = self.Admin.get_Admin(uid)
if uid in self.db_admin:
admin_level = self.db_admin[uid]['level']
if not get_admin is None:
admin_level = get_admin.level
for ref_level, ref_commands in self.commands_level.items():
# print(f"LevelNo: {ref_level} - {ref_commands} - {admin_level}")
@@ -646,6 +544,18 @@ class Irc:
return None
def thread_check_for_new_version(self, fromuser: str) -> None:
dnickname = self.Config.SERVICE_NICKNAME
if self.Base.check_for_new_version(True):
self.send2socket(f':{dnickname} NOTICE {fromuser} : New Version available : {self.Config.current_version} >>> {self.Config.latest_version}')
self.send2socket(f':{dnickname} NOTICE {fromuser} : Please run (git pull origin main) in the current folder')
else:
self.send2socket(f':{dnickname} NOTICE {fromuser} : You have the latest version of defender')
return None
def cmd(self, data:list) -> None:
try:
@@ -708,10 +618,15 @@ class Irc:
try:
# if self.Config.ABUSEIPDB == 1:
# self.Base.create_thread(self.abuseipdb_scan, (cmd[2], ))
self.first_connexion_ip = cmd[2]
self.first_score = int(cmd[3])
pass
# Possibilité de déclancher les bans a ce niveau.
except IndexError as ie:
self.Base.logs.error(f'{ie}')
except ValueError as ve:
self.first_score = 0
self.Base.logs.error(f'Impossible to convert first_score: {ve}')
case '320':
#:irc.deb.biz.st 320 PyDefender IRCParis07 :is in security-groups: known-users,webirc-users,tls-and-known-users,tls-users
@@ -730,10 +645,12 @@ class Irc:
hsid = str(cmd[0]).replace(':','')
if hsid == self.HSID:
if self.INIT == 1:
if self.Base.check_for_new_version():
version = f'{self.Base.DEFENDER_VERSION} >>> {self.Base.LATEST_DEFENDER_VERSION}'
current_version = self.Config.current_version
latest_version = self.Config.latest_version
if self.Base.check_for_new_version(False):
version = f'{current_version} >>> {latest_version}'
else:
version = f'{self.Base.DEFENDER_VERSION}'
version = f'{current_version}'
self.send2socket(f"MODE {self.Config.SERVICE_NICKNAME} +B")
self.send2socket(f"JOIN {self.Config.SERVICE_CHANLOG}")
@@ -758,11 +675,12 @@ class Irc:
self.Base.logs.info(f"# CHANNEL : {self.Config.SERVICE_CHANLOG} ")
self.Base.logs.info(f"# VERSION : {version} ")
self.Base.logs.info(f"################################################")
if self.Base.check_for_new_version(False):
self.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} : New Version available {version}")
# Initialisation terminé aprés le premier PING
self.INIT = 0
# self.send2socket(f':{self.Config.SERVICE_ID} PING :{hsid}')
# print(self.db_uid)
case _:
pass
@@ -776,7 +694,7 @@ class Irc:
# :001N1WD7L QUIT :Quit: free_znc_1
cmd.pop(0)
uid_who_quit = str(cmd[0]).replace(':', '')
self.delete_db_uid(uid_who_quit)
self.User.delete(uid_who_quit)
case 'PONG':
# ['@msgid=aTNJhp17kcPboF5diQqkUL;time=2023-12-28T20:35:58.411Z', ':irc.deb.biz.st', 'PONG', 'irc.deb.biz.st', ':Dev-PyDefender']
@@ -790,17 +708,58 @@ class Irc:
cmd.pop(0)
uid = str(cmd[0]).replace(':','')
newnickname = cmd[2]
self.User.update(uid, newnickname)
self.update_db_uid(uid, newnickname)
case 'MODE':
#['@msgid=d0ySx56Yd0nc35oHts2SkC-/J9mVUA1hfM6+Z4494xWUg;time=2024-08-09T12:45:36.651Z',
# ':001', 'MODE', '#a', '+nt', '1723207536']
pass
case 'SJOIN':
# ['@msgid=ictnEBhHmTUHzkEeVZl6rR;time=2023-12-28T20:03:18.482Z', ':001', 'SJOIN', '1702139101', '#stats', '+nst', ':@001SB890A', '@00BAAAAAI']
# ['@msgid=5sTwGdj349D82L96p749SY;time=2024-08-15T09:50:23.528Z', ':001', 'SJOIN', '1721564574', '#welcome', ':001JD94QH']
# ['@msgid=bvceb6HthbLJapgGLXn1b0;time=2024-08-15T09:50:11.464Z', ':001', 'SJOIN', '1721564574', '#welcome', '+lnrt', '13', ':001CIVLQF', '+11ZAAAAAB', '001QGR10C', '*@0014UE10B', '001NL1O07', '001SWZR05', '001HB8G04', '@00BAAAAAJ', '0019M7101']
cmd.pop(0)
channel = cmd[3]
self.insert_db_chan(channel)
channel = str(cmd[3]).lower()
mode = cmd[4]
len_cmd = len(cmd)
list_users:list = []
start_boucle = 0
# Trouver le premier user
for i in range(len_cmd):
s: list = re.findall(fr':', cmd[i])
if s:
start_boucle = i
# Boucle qui va ajouter l'ensemble des users (UID)
for i in range(start_boucle, len(cmd)):
parsed_UID = str(cmd[i])
pattern = fr'[:|@|%|\+|~|\*]*'
pattern = fr':'
parsed_UID = re.sub(pattern, '', parsed_UID)
list_users.append(parsed_UID)
self.Channel.insert(
self.Channel.ChannelModel(
name=channel,
uids=list_users
)
)
case 'PART':
# ['@unrealircd.org/geoip=FR;unrealircd.org/userhost=50d6492c@80.214.73.44;unrealircd.org/userip=50d6492c@80.214.73.44;msgid=YSIPB9q4PcRu0EVfC9ci7y-/mZT0+Gj5FLiDSZshH5NCw;time=2024-08-15T15:35:53.772Z',
# ':001EPFBRD', 'PART', '#welcome', ':WEB', 'IRC', 'Paris']
uid = str(cmd[1]).replace(':','')
channel = str(cmd[3]).lower()
self.Channel.delete_user_from_channel(channel, uid)
pass
case 'UID':
# ['@s2s-md/geoip=cc=GB|cd=United\\sKingdom|asn=16276|asname=OVH\\sSAS;s2s-md/tls_cipher=TLSv1.3-TLS_CHACHA20_POLY1305_SHA256;s2s-md/creationtime=1721564601',
# ':001', 'UID', 'albatros', '0', '1721564597', 'albatros', 'vps-91b2f28b.vps.ovh.net',
# '001HB8G04', '0', '+iwxz', 'Clk-A62F1D18.vps.ovh.net', 'Clk-A62F1D18.vps.ovh.net', 'MyZBwg==', ':...']
if 'webirc' in cmd[0]:
isWebirc = True
else:
@@ -812,8 +771,27 @@ class Irc:
hostname = str(cmd[7])
umodes = str(cmd[10])
vhost = str(cmd[11])
if not 'S' in umodes:
remote_ip = self.Base.decode_ip(str(cmd[13]))
else:
remote_ip = '127.0.0.1'
self.insert_db_uid(uid, nickname, username, hostname, umodes, vhost, isWebirc)
score_connexion = self.first_score
self.User.insert(
self.User.UserModel(
uid=uid,
nickname=nickname,
username=username,
hostname=hostname,
umodes=umodes,
vhost=vhost,
isWebirc=isWebirc,
remote_ip=remote_ip,
score_connexion=score_connexion,
connexion_datetime=datetime.now()
)
)
for classe_name, classe_object in self.loaded_classes.items():
classe_object.cmd(cmd_to_send)
@@ -825,16 +803,17 @@ class Irc:
get_uid_or_nickname = str(cmd[0].replace(':',''))
if len(cmd) == 6:
if cmd[1] == 'PRIVMSG' and cmd[3] == ':auth':
if cmd[1] == 'PRIVMSG' and str(cmd[3]).replace('.','') == ':auth':
cmd_copy = cmd.copy()
cmd_copy[5] = '**********'
self.Base.logs.debug(cmd_copy)
self.Base.logs.info(cmd_copy)
else:
self.Base.logs.info(cmd)
else:
self.Base.logs.info(f'{cmd}')
# user_trigger = get_user.split('!')[0]
user_trigger = self.get_nickname(get_uid_or_nickname)
# user_trigger = self.get_nickname(get_uid_or_nickname)
user_trigger = self.User.get_nickname(get_uid_or_nickname)
dnickname = self.Config.SERVICE_NICKNAME
pattern = fr'(:\{self.Config.SERVICE_PREFIX})(.*)$'
@@ -852,7 +831,7 @@ class Irc:
cmd_to_send = convert_to_string.replace(':','')
self.Base.log_cmd(user_trigger, cmd_to_send)
self._hcmds(user_trigger, arg)
self._hcmds(user_trigger, arg, cmd)
if cmd[2] == self.Config.SERVICE_ID:
pattern = fr'^:.*?:(.*)$'
@@ -866,7 +845,7 @@ class Irc:
# Réponse a un CTCP VERSION
if arg[0] == '\x01VERSION\x01':
self.send2socket(f':{dnickname} NOTICE {user_trigger} :\x01VERSION Service {self.Config.SERVICE_NICKNAME} V{self.Base.DEFENDER_VERSION}\x01')
self.send2socket(f':{dnickname} NOTICE {user_trigger} :\x01VERSION Service {self.Config.SERVICE_NICKNAME} V{self.Config.current_version}\x01')
return False
# Réponse a un TIME
@@ -888,9 +867,9 @@ class Irc:
return False
cmd_to_send = convert_to_string.replace(':','')
self.Base.log_cmd(self.get_nickname(user_trigger), cmd_to_send)
self.Base.log_cmd(self.User.get_nickname(user_trigger), cmd_to_send)
self._hcmds(user_trigger, arg)
self._hcmds(user_trigger, arg, cmd)
except IndexError as io:
self.Base.logs.error(f'{io}')
@@ -906,10 +885,10 @@ class Irc:
except IndexError as ie:
self.Base.logs.error(f"{ie} / {cmd} / length {str(len(cmd))}")
def _hcmds(self, user: str, cmd:list) -> None:
def _hcmds(self, user: str, cmd:list, fullcmd: list = []) -> None:
fromuser = self.get_nickname(user) # Nickname qui a lancé la commande
uid = self.get_uid(fromuser) # Récuperer le uid de l'utilisateur
fromuser = self.User.get_nickname(user) # Nickname qui a lancé la commande
uid = self.User.get_uid(fromuser) # Récuperer le uid de l'utilisateur
# Defender information
dnickname = self.Config.SERVICE_NICKNAME # Defender nickname
@@ -927,14 +906,14 @@ class Irc:
# Envoyer la commande aux classes dynamiquement chargées
if command != 'notallowed':
for classe_name, classe_object in self.loaded_classes.items():
classe_object._hcmds(user, cmd)
classe_object._hcmds(user, cmd, fullcmd)
match command:
case 'notallowed':
try:
current_command = cmd[0]
self.send2socket(f':{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR["rouge"]}{current_command}{self.Config.CONFIG_COLOR["noire"]} ] - Accès Refusé à {self.get_nickname(fromuser)}')
self.send2socket(f':{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR["rouge"]}{current_command}{self.Config.CONFIG_COLOR["noire"]} ] - Accès Refusé à {self.User.get_nickname(fromuser)}')
self.send2socket(f':{dnickname} NOTICE {fromuser} : Accès Refusé')
except IndexError as ie:
self.Base.logs.error(f'{ie}')
@@ -942,29 +921,29 @@ class Irc:
case 'deauth':
current_command = cmd[0]
uid_to_deauth = self.get_uid(fromuser)
uid_to_deauth = self.User.get_uid(fromuser)
self.delete_db_admin(uid_to_deauth)
self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['rouge']}{current_command}{self.Config.CONFIG_COLOR['noire']} ] - {self.get_nickname(fromuser)} est désormais déconnecter de {dnickname}")
self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['rouge']}{current_command}{self.Config.CONFIG_COLOR['noire']} ] - {self.User.get_nickname(fromuser)} est désormais déconnecter de {dnickname}")
case 'auth':
# ['auth', 'adator', 'password']
current_command = cmd[0]
user_to_log = self.get_nickname(cmd[1])
user_to_log = self.User.get_nickname(cmd[1])
password = cmd[2]
if not user_to_log is None:
mes_donnees = {'user': user_to_log, 'password': self.Base.crypt_password(password)}
query = f"SELECT id, level FROM {self.Base.DB_SCHEMA['admins']} WHERE user = :user AND password = :password"
query = f"SELECT id, level FROM {self.Config.table_admin} WHERE user = :user AND password = :password"
result = self.Base.db_execute_query(query, mes_donnees)
user_from_db = result.fetchone()
if not user_from_db is None:
uid_user = self.get_uid(user_to_log)
uid_user = self.User.get_uid(user_to_log)
self.insert_db_admin(uid_user, user_from_db[1])
self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['verte']}{current_command}{self.Config.CONFIG_COLOR['noire']} ] - {self.get_nickname(fromuser)} est désormais connecté a {dnickname}")
self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['verte']}{current_command}{self.Config.CONFIG_COLOR['noire']} ] - {self.User.get_nickname(fromuser)} est désormais connecté a {dnickname}")
self.send2socket(f":{self.Config.SERVICE_NICKNAME} NOTICE {fromuser} :Connexion a {dnickname} réussie!")
else:
self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['rouge']}{current_command}{self.Config.CONFIG_COLOR['noire']} ] - {self.get_nickname(fromuser)} a tapé un mauvais mot de pass")
self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['rouge']}{current_command}{self.Config.CONFIG_COLOR['noire']} ] - {self.User.get_nickname(fromuser)} a tapé un mauvais mot de pass")
self.send2socket(f":{self.Config.SERVICE_NICKNAME} NOTICE {fromuser} :Mot de passe incorrecte")
else:
@@ -999,9 +978,14 @@ class Irc:
self.send2socket(f':{dnickname} NOTICE {fromuser} : .editaccess [USER] [NEWPASSWORD] [NEWLEVEL]')
return None
current_user = self.get_nickname(fromuser)
current_uid = self.get_uid(fromuser)
current_user_level = self.db_admin[current_uid]['level']
get_admin = self.Admin.get_Admin(fromuser)
if get_admin is None:
self.send2socket(f':{dnickname} NOTICE {fromuser} : This user {fromuser} has no Admin access')
return None
current_user = self.User.get_nickname(fromuser)
current_uid = self.User.get_uid(fromuser)
current_user_level = get_admin.level
if user_new_level > 5:
self.send2socket(f':{dnickname} NOTICE {fromuser} : Maximum authorized level is 5')
@@ -1009,7 +993,7 @@ class Irc:
# Rechercher le user dans la base de données.
mes_donnees = {'user': user_to_edit}
query = f"SELECT user, level FROM {self.Base.DB_SCHEMA['admins']} WHERE user = :user"
query = f"SELECT user, level FROM {self.Config.table_admin} WHERE user = :user"
result = self.Base.db_execute_query(query, mes_donnees)
isUserExist = result.fetchone()
@@ -1025,7 +1009,7 @@ class Irc:
# Le user existe dans la base de données
data_to_update = {'user': user_to_edit, 'password': user_password, 'level': user_new_level}
sql_update = f"UPDATE {self.Base.DB_SCHEMA['admins']} SET level = :level, password = :password WHERE user = :user"
sql_update = f"UPDATE {self.Config.table_admin} SET level = :level, password = :password WHERE user = :user"
exec_query = self.Base.db_execute_query(sql_update, data_to_update)
if exec_query.rowcount > 0:
self.send2socket(f':{dnickname} NOTICE {fromuser} : User {user_to_edit} has been modified with level {str(user_new_level)}')
@@ -1051,14 +1035,20 @@ class Irc:
if len(cmd) < 3:
self.send2socket(f':{dnickname} NOTICE {fromuser} : .delaccess [USER] [CONFIRMUSER]')
return None
get_admin = self.Admin.get_Admin(fromuser)
if get_admin is None:
self.send2socket(f':{dnickname} NOTICE {fromuser} : This user {fromuser} has no admin access')
return None
current_user = self.get_nickname(fromuser)
current_uid = self.get_uid(fromuser)
current_user_level = self.db_admin[current_uid]['level']
current_user = self.User.get_nickname(fromuser)
current_uid = self.User.get_uid(fromuser)
current_user_level = get_admin.level
# Rechercher le user dans la base de données.
mes_donnees = {'user': user_to_del}
query = f"SELECT user, level FROM {self.Base.DB_SCHEMA['admins']} WHERE user = :user"
query = f"SELECT user, level FROM {self.Config.table_admin} WHERE user = :user"
result = self.Base.db_execute_query(query, mes_donnees)
info_user = result.fetchone()
@@ -1070,7 +1060,7 @@ class Irc:
return None
data_to_delete = {'user': user_to_del}
sql_delete = f"DELETE FROM {self.Base.DB_SCHEMA['admins']} WHERE user = :user"
sql_delete = f"DELETE FROM {self.Config.table_admin} WHERE user = :user"
exec_query = self.Base.db_execute_query(sql_delete, data_to_delete)
if exec_query.rowcount > 0:
self.send2socket(f':{dnickname} NOTICE {fromuser} : User {user_to_del} has been deleted !')
@@ -1082,8 +1072,9 @@ class Irc:
help = ''
count_level_definition = 0
if uid in self.db_admin:
user_level = self.db_admin[uid]['level']
get_admin = self.Admin.get_Admin(uid)
if not get_admin is None:
user_level = get_admin.level
else:
user_level = 0
@@ -1144,7 +1135,7 @@ class Irc:
if 'mods.' + module_name in sys.modules:
self.loaded_classes[class_name].unload()
self.Base.logs.info('Module Already Loaded ... reload the module ...')
self.Base.logs.info('Module Already Loaded ... reloading the module ...')
the_module = sys.modules['mods.' + module_name]
importlib.reload(the_module)
@@ -1198,8 +1189,11 @@ class Irc:
reason.append(cmd[i])
final_reason = ' '.join(reason)
self.db_uid.clear() #Vider UID_DB
self.db_chan = [] #Vider les salons
# self.db_uid.clear() #Vider UID_DB
# self.db_chan = [] #Vider les salons
self.User.UID_DB.clear() # Clear User Object
self.Channel.UID_CHANNEL_DB.clear() # Clear Channel Object
for class_name in self.loaded_classes:
self.loaded_classes[class_name].unload()
@@ -1214,16 +1208,31 @@ class Irc:
case 'show_modules':
self.Base.logs.debug(self.loaded_classes)
all_modules = self.Base.get_all_modules()
results = self.Base.db_execute_query(f'SELECT module FROM {self.Base.DB_SCHEMA["modules"]}')
results = self.Base.db_execute_query(f'SELECT module FROM {self.Config.table_module}')
results = results.fetchall()
if len(results) == 0:
self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :Aucun module chargé")
return False
# if len(results) == 0:
# self.send2socket(f":{dnickname} NOTICE {fromuser} :There is no module loaded")
# return False
for r in results:
self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :Le module {r[0]} chargé")
found = False
for module in all_modules:
for loaded_mod in results:
if module == loaded_mod[0]:
found = True
if found:
self.send2socket(f":{dnickname} NOTICE {fromuser} :{module} - {self.Config.CONFIG_COLOR['verte']}Loaded{self.Config.CONFIG_COLOR['nogc']}")
else:
self.send2socket(f":{dnickname} NOTICE {fromuser} :{module} - {self.Config.CONFIG_COLOR['rouge']}Not Loaded{self.Config.CONFIG_COLOR['nogc']}")
found = False
# for r in results:
# self.send2socket(f":{dnickname} NOTICE {fromuser} :{r[0]} - {self.Config.CONFIG_COLOR['verte']}Loaded{self.Config.CONFIG_COLOR['nogc']}")
case 'show_timers':
@@ -1240,36 +1249,30 @@ class Irc:
self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :{str(running_thread_name)}")
case 'show_channels':
for chan in self.Channel.UID_CHANNEL_DB:
list_nicknames: list = []
for uid in chan.uids:
pattern = fr'[:|@|%|\+|~|\*]*'
parsed_UID = re.sub(pattern, '', uid)
list_nicknames.append(self.User.get_nickname(parsed_UID))
self.send2socket(f":{dnickname} NOTICE {fromuser} : Channel: {chan.name} - Users: {list_nicknames}")
case 'uptime':
uptime = self.get_defender_uptime()
self.send2socket(f':{dnickname} NOTICE {fromuser} : {uptime}')
case 'copyright':
self.send2socket(f':{dnickname} NOTICE {fromuser} : # Defender V.{self.Base.DEFENDER_VERSION} Developped by adator® and dktmb® #')
case 'sentinel':
# .sentinel on
activation = str(cmd[1]).lower()
service_id = self.Config.SERVICE_ID
channel_to_dont_quit = [self.Config.SALON_JAIL, dchanlog]
if activation == 'on':
for chan in self.db_chan:
if not chan in channel_to_dont_quit:
self.send2socket(f":{service_id} JOIN {chan}")
if activation == 'off':
for chan in self.db_chan:
if not chan in channel_to_dont_quit:
self.send2socket(f":{service_id} PART {chan}")
self.send2socket(f':{dnickname} NOTICE {fromuser} : # Defender V.{self.Config.current_version} Developped by adator® and dktmb® #')
case 'checkversion':
if self.Base.check_for_new_version():
self.send2socket(f':{dnickname} NOTICE {fromuser} : New Version available : {self.Base.DEFENDER_VERSION} >>> {self.Base.LATEST_DEFENDER_VERSION}')
else:
self.send2socket(f':{dnickname} NOTICE {fromuser} : You have the latest version of defender')
pass
self.Base.create_thread(
self.thread_check_for_new_version,
(fromuser, )
)
case _:
pass

126
core/loadConf.py Normal file
View File

@@ -0,0 +1,126 @@
import json
from os import sep
from typing import Union
from dataclasses import dataclass, field
##########################################
# CONFIGURATION FILE #
##########################################
@dataclass
class ConfigDataModel:
SERVEUR_IP: str
SERVEUR_HOSTNAME: str # Le hostname du serveur IRC
SERVEUR_LINK: str # Host attendu par votre IRCd (ex. dans votre link block pour Unrealircd)
SERVEUR_PORT: int # Port du link
SERVEUR_PASSWORD: str # Mot de passe du link (Privilégiez argon2 sur Unrealircd)
SERVEUR_ID: str # SID (identification) du bot en tant que Services
SERVEUR_SSL: bool # Activer la connexion SSL
SERVICE_NAME: str # Le nom du service
SERVICE_NICKNAME: str # Nick du bot sur IRC
SERVICE_REALNAME: str # Realname du bot
SERVICE_USERNAME: str # Ident du bot
SERVICE_HOST: str # Host du bot
SERVICE_INFO: str # swhois du bot
SERVICE_CHANLOG: str # Salon des logs et autres messages issus du bot
SERVICE_SMODES: str # Mode du service
SERVICE_CMODES: str # Mode du salon (#ChanLog) que le bot appliquera à son entrée
SERVICE_UMODES: str # Mode que le bot pourra se donner à sa connexion au salon chanlog
SERVICE_PREFIX: str # Prefix pour envoyer les commandes au bot
SERVICE_ID: str = field(init=False) # L'identifiant du service
OWNER: str # Identifiant du compte admin
PASSWORD: str # Mot de passe du compte admin
SALON_JAIL: str # Salon pot de miel
SALON_JAIL_MODES: str # Mode du salon pot de miel
SALON_LIBERER: str # Le salon ou sera envoyé l'utilisateur clean
API_TIMEOUT: int # Timeout des api's
PORTS_TO_SCAN: list # Liste des ports a scanné pour une detection de proxy
WHITELISTED_IP: list # IP a ne pas scanner
GLINE_DURATION: str # La durée du gline
DEBUG_LEVEL: int # Le niveau des logs DEBUG 10 | INFO 20 | WARNING 30 | ERROR 40 | CRITICAL 50
CONFIG_COLOR: dict[str, str]
table_admin: str
table_commande: str
table_log: str
table_module: str
current_version: str
latest_version: str
db_name: str
db_path: str
def __post_init__(self):
# Initialiser SERVICE_ID après la création de l'objet
self.SERVICE_ID:str = f"{self.SERVEUR_ID}AAAAAB"
class Config:
def __init__(self):
self.ConfigObject: ConfigDataModel = self.__load_service_configuration()
return None
def __load_json_service_configuration(self):
conf_filename = f'core{sep}configuration.json'
with open(conf_filename, 'r') as configuration_data:
configuration:dict[str, Union[str, int, list, dict]] = json.load(configuration_data)
for key, value in configuration['CONFIG_COLOR'].items():
configuration['CONFIG_COLOR'][key] = str(value).encode('utf-8').decode('unicode_escape')
return configuration
def __load_service_configuration(self) -> ConfigDataModel:
import_config = self.__load_json_service_configuration()
ConfigObject: ConfigDataModel = ConfigDataModel(
SERVEUR_IP=import_config["SERVEUR_IP"],
SERVEUR_HOSTNAME=import_config["SERVEUR_HOSTNAME"],
SERVEUR_LINK=import_config["SERVEUR_LINK"],
SERVEUR_PORT=import_config["SERVEUR_PORT"],
SERVEUR_PASSWORD=import_config["SERVEUR_PASSWORD"],
SERVEUR_ID=import_config["SERVEUR_ID"],
SERVEUR_SSL=import_config["SERVEUR_SSL"],
SERVICE_NAME=import_config["SERVICE_NAME"],
SERVICE_NICKNAME=import_config["SERVICE_NICKNAME"],
SERVICE_REALNAME=import_config["SERVICE_REALNAME"],
SERVICE_USERNAME=import_config["SERVICE_USERNAME"],
SERVICE_HOST=import_config["SERVICE_HOST"],
SERVICE_INFO=import_config["SERVICE_INFO"],
SERVICE_CHANLOG=import_config["SERVICE_CHANLOG"],
SERVICE_SMODES=import_config["SERVICE_SMODES"],
SERVICE_CMODES=import_config["SERVICE_CMODES"],
SERVICE_UMODES=import_config["SERVICE_UMODES"],
SERVICE_PREFIX=import_config["SERVICE_PREFIX"],
OWNER=import_config["OWNER"],
PASSWORD=import_config["PASSWORD"],
SALON_JAIL=import_config["SALON_JAIL"],
SALON_JAIL_MODES=import_config["SALON_JAIL_MODES"],
SALON_LIBERER=import_config["SALON_LIBERER"],
API_TIMEOUT=import_config["API_TIMEOUT"],
PORTS_TO_SCAN=import_config["PORTS_TO_SCAN"],
WHITELISTED_IP=import_config["WHITELISTED_IP"],
GLINE_DURATION=import_config["GLINE_DURATION"],
DEBUG_LEVEL=import_config["DEBUG_LEVEL"],
CONFIG_COLOR=import_config["CONFIG_COLOR"],
table_admin='sys_admins',
table_commande='sys_commandes',
table_log='sys_logs',
table_module='sys_modules',
current_version='',
latest_version='',
db_name='defender',
db_path=f'db{sep}'
)
return ConfigObject

263
install.py Normal file
View File

@@ -0,0 +1,263 @@
from subprocess import check_call, run, CalledProcessError, PIPE
from platform import python_version
from sys import exit
import os, logging, shutil, pwd
class Install:
def __init__(self) -> None:
# Python required version
self.python_min_version = '3.10'
self.log_file = 'install.log'
self.ServiceName = 'Defender'
self.venv_name = '.pyenv'
self.venv_dependencies: list[str] = ['sqlalchemy','psutil','requests']
self.install_folder = os.getcwd()
self.osname = os.name
self.cmd_linux_requirements: list[str] = ['apt', 'install', '-y', 'python3', 'python3-pip', 'python3-venv']
self.venv_pip_full_path = os.path.join(self.venv_name, f'bin{os.sep}pip')
self.venv_python_full_path = os.path.join(self.venv_name, f'bin{os.sep}python')
self.systemd_folder = '/etc/systemd/system/'
# Init log system
self.init_log_system()
# Exclude Windows OS
if self.osname == 'nt':
print('/!\\ Windows OS is not supported by this automatic installation /!\\')
self.Logs.critical('/!\\ Windows OS is not supported by this automatic install /!\\')
exit(5)
if not self.is_root():
exit(5)
# Get the current user
self.system_username: str = input(f'What is the user ro run defender with ? [{os.getlogin()}] : ')
if str(self.system_username).strip() == '':
self.system_username = os.getlogin()
self.get_user_information(self.system_username)
self.Logs.debug(f'The user selected is: {self.system_username}')
self.Logs.debug(f'Operating system: {self.osname}')
# Install linux dependencies
self.install_linux_dependencies()
# Check python version
self.checkPythonVersion()
# Create systemd service file
self.create_service_file()
# Check if Env Exist | install environment | Install python dependencies
self.check_venv()
# Create and start service
if self.osname != 'nt':
self.run_subprocess(['systemctl','daemon-reload'])
self.run_subprocess(['systemctl','start', self.ServiceName])
self.run_subprocess(['systemctl','status', self.ServiceName])
# Clean the Installation
self.clean_installation()
return None
def is_installed(self) -> bool:
is_installed = False
# Check logs folder
if os.path.exists('logs'):
is_installed = True
# Check db folder
if os.path.exists('db'):
is_installed = True
return is_installed
def is_root(self) -> bool:
if os.geteuid() != 0:
print('/!\\ user must run install.py as root /!\\')
self.Logs.critical('/!\\ user must run install.py as root /!\\')
return False
elif os.geteuid() == 0:
return True
def get_user_information(self, system_user: str) -> None:
try:
username: tuple = pwd.getpwnam(system_user)
self.system_uid = username.pw_uid
self.system_gid = username.pw_gid
return None
except KeyError as ke:
self.Logs.critical(f"This user [{system_user}] doesn't exist: {ke}")
print(f"This user [{system_user}] doesn't exist: {ke}")
exit(5)
def init_log_system(self) -> None:
# Init logs object
self.Logs = logging
self.Logs.basicConfig(level=logging.DEBUG,
filename=self.log_file,
encoding='UTF-8',
format='%(asctime)s - %(levelname)s - %(filename)s - %(lineno)d - %(funcName)s - %(message)s')
self.Logs.debug('#################### STARTING INSTALLATION ####################')
return None
def clean_installation(self) -> None:
# Chown the Python Env to non user privilege
self.run_subprocess(['chown','-R', f'{self.system_username}:{self.system_username}',
f'{os.path.join(self.install_folder, self.venv_name)}'
]
)
# Chown the installation log file
self.run_subprocess(['chown','-R', f'{self.system_username}:{self.system_username}',
f'{os.path.join(self.install_folder, self.log_file)}'
]
)
return None
def run_subprocess(self, command:list) -> None:
try:
run_command = check_call(command)
self.Logs.debug(f'{command} - {run_command}')
print(f'{command} - {run_command}')
except CalledProcessError as e:
print(f"Command failed :{e.returncode}")
self.Logs.critical(f"Command failed :{e.returncode}")
exit(5)
def checkPythonVersion(self) -> bool:
"""Test si la version de python est autorisée ou non
Returns:
bool: True si la version de python est autorisé sinon False
"""
python_required_version = self.python_min_version.split('.')
python_current_version = python_version().split('.')
self.Logs.debug(f'The current python version is: {python_version()}')
if int(python_current_version[0]) < int(python_required_version[0]):
print(f"## Your python version must be greather than or equal to {self.python_min_version} ##")
self.Logs.critical(f'Your python version must be greather than or equal to {self.python_min_version}')
return False
elif int(python_current_version[1]) < int(python_required_version[1]):
print(f"### Your python version must be greather than or equal to {self.python_min_version} ###")
self.Logs.critical(f'Your python version must be greather than or equal to {self.python_min_version}')
return False
print(f"===> Version of python : {python_version()} ==> OK")
self.Logs.debug(f'Version of python : {python_version()} ==> OK')
return True
def check_packages(self, package_name) -> bool:
try:
# Run a command in the virtual environment's Python to check if the package is installed
run([self.venv_python_full_path, '-c', f'import {package_name}'], check=True, stdout=PIPE, stderr=PIPE)
return True
except CalledProcessError:
return False
def check_venv(self) -> bool:
if os.path.exists(self.venv_name):
# Installer les dependances
self.install_dependencies()
return True
else:
self.run_subprocess(['python3', '-m', 'venv', self.venv_name])
self.Logs.debug(f'Python Virtual env installed {self.venv_name}')
print(f'Python Virtual env installed {self.venv_name}')
self.install_dependencies()
return False
def create_service_file(self) -> None:
if self.systemd_folder is None:
# If Windows, do not install systemd
return None
if os.path.exists(f'{self.systemd_folder}{os.sep}{self.ServiceName}.service'):
print(f'/!\\ Service already created in the system /!\\')
self.Logs.warning('/!\\ Service already created in the system /!\\')
print(f'The service file will be regenerated')
self.Logs.warning('The service file will be regenerated')
contain = f'''[Unit]
Description={self.ServiceName} IRC Service
[Service]
User={self.system_username}
ExecStart={os.path.join(self.install_folder, self.venv_python_full_path)} {os.path.join(self.install_folder, 'main.py')}
WorkingDirectory={self.install_folder}
SyslogIdentifier={self.ServiceName}
Restart=on-failure
[Install]
WantedBy=multi-user.target
'''
with open(f'{self.ServiceName}.service.generated', 'w+') as servicefile:
servicefile.write(contain)
servicefile.close()
print('Service file generated with current configuration')
self.Logs.debug('Service file generated with current configuration')
source = f'{self.install_folder}{os.sep}{self.ServiceName}.service.generated'
self.run_subprocess(['chown','-R', f'{self.system_username}:{self.system_username}', source])
destination = f'{self.systemd_folder}'
shutil.copy(source, destination)
os.rename(f'{self.systemd_folder}{os.sep}{self.ServiceName}.service.generated', f'{self.systemd_folder}{os.sep}{self.ServiceName}.service')
print(f'Service file moved to systemd folder {self.systemd_folder}')
self.Logs.debug(f'Service file moved to systemd folder {self.systemd_folder}')
def install_linux_dependencies(self) -> None:
self.run_subprocess(self.cmd_linux_requirements)
return None
def install_dependencies(self) -> None:
try:
self.run_subprocess([self.venv_pip_full_path, 'cache', 'purge'])
self.run_subprocess([self.venv_python_full_path, '-m', 'pip', 'install', '--upgrade', 'pip'])
if self.check_packages('greenlet') is None:
self.run_subprocess(
[self.venv_pip_full_path, 'install', '--only-binary', ':all:', 'greenlet']
)
for module in self.venv_dependencies:
if not self.check_packages(module):
### Trying to install missing python packages ###
self.run_subprocess([self.venv_pip_full_path, 'install', module])
else:
self.Logs.debug(f'{module} already installed')
print(f"==> {module} already installed")
except CalledProcessError as cpe:
self.Logs.critical(f'{cpe}')
Install()

File diff suppressed because it is too large Load Diff

View File

@@ -1,11 +1,10 @@
import threading
from core.irc import Irc
# Le module crée devra réspecter quelques conditions
# 1. Importer le module de configuration
# 2. Le nom de class devra toujours s'appeler comme le module exemple => nom de class Dktmb | nom du module mod_dktmb
# 3. la fonction __init__ devra toujours avoir les parametres suivant (self, irc:object)
# 1 . Créer la variable irc dans le module
# 1 . Créer la variable Irc dans le module
# 2 . Récuperer la configuration dans une variable
# 3 . Définir et enregistrer les nouvelles commandes
# 4. une fonction _hcmds(self, user:str, cmd: list) devra toujours etre crée.
@@ -13,37 +12,59 @@ from core.irc import Irc
class Test():
def __init__(self, ircInstance:Irc) -> None:
print(f'Module {self.__class__.__name__} loaded ...')
self.irc = ircInstance # Ajouter l'object mod_irc a la classe
# Add Irc Object to the module
self.Irc = ircInstance
self.config = ircInstance.Config # Ajouter la configuration a la classe
# Add Global Configuration to the module
self.Config = ircInstance.Config
# Add Base object to the module
self.Base = ircInstance.Base
# Add logs object to the module
self.Logs = ircInstance.Base.logs
# Add User object to the module
self.User = ircInstance.User
# Add Channel object to the module
self.Channel = ircInstance.Channel
# Créer les nouvelles commandes du module
self.commands = ['test']
self.commands_level = {
0: ['test'],
1: ['test_level_1']
}
self.__set_commands(self.commands) # Enrigstrer les nouvelles commandes dans le code
# Init the module
self.__init_module()
self.core = ircInstance.Base # Instance du module Base
# Log the module
self.Logs.debug(f'Module {self.__class__.__name__} loaded ...')
self.session = '' # Instancier une session pour la base de données
self.__create_db('mod_test') # Créer la base de données si necessaire
def __init_module(self) -> None:
def __set_commands(self, commands:list) -> None:
"""Rajoute les commandes du module au programme principal
self.__set_commands(self.commands_level)
self.__create_tables()
return None
def __set_commands(self, commands:dict[int, list[str]]) -> None:
"""### Rajoute les commandes du module au programme principal
Args:
commands (list): Liste des commandes du module
Returns:
None: Aucun retour attendu
"""
for command in commands:
self.irc.commands.append(command)
for level, com in commands.items():
for c in commands[level]:
if not c in self.Irc.commands:
self.Irc.commands_level[level].append(c)
self.Irc.commands.append(c)
return True
return None
def __create_db(self, db_name:str) -> None:
def __create_tables(self) -> None:
"""Methode qui va créer la base de donnée si elle n'existe pas.
Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module
Args:
@@ -52,36 +73,36 @@ class Test():
Returns:
None: Aucun retour n'es attendu
"""
db_directory = self.core.MODS_DB_PATH
self.session = self.core.db_init(db_directory, db_name)
table_logs = '''CREATE TABLE IF NOT EXISTS logs (
table_logs = '''CREATE TABLE IF NOT EXISTS test_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
server_msg TEXT
)
'''
self.core.db_execute_query(self.session, table_logs)
self.Base.db_execute_query(table_logs)
return None
def unload(self) -> None:
return None
def _hcmds(self, user:str, cmd: list) -> None:
def cmd(self, data:list) -> None:
return None
command = cmd[0].lower()
def _hcmds(self, user:str, cmd: list, fullcmd: list = []) -> None:
command = str(cmd[0]).lower()
dnickname = self.Config.SERVICE_NICKNAME
fromuser = user
match command:
case 'test':
try:
user_action = cmd[1]
self.irc.send2socket(f'PRIVMSG #webmail Je vais voicer {user}')
self.irc.send2socket(f'MODE #webmail +v {user_action}')
self.core.create_log(f"MODE +v sur {user_action}")
self.Irc.send2socket(f":{dnickname} NOTICE {fromuser} : test command ready ...")
self.Logs.debug(f"Test logs ready")
except KeyError as ke:
self.core.create_log(f"Key Error : {ke}")
self.Logs.error(f"Key Error : {ke}")

441
mods/mod_votekick.py Normal file
View File

@@ -0,0 +1,441 @@
from core.irc import Irc
import re
from dataclasses import dataclass, field
# Activer le systeme sur un salon (activate #salon)
# Le service devra se connecter au salon
# Le service devra se mettre en op
# Soumettre un nom de user (submit nickname)
# voter pour un ban (vote_for)
# voter contre un ban (vote_against)
class Votekick():
@dataclass
class VoteChannelModel:
channel_name: str
target_user: str
voter_users: list
vote_for: int
vote_against: int
VOTE_CHANNEL_DB:list[VoteChannelModel] = []
def __init__(self, ircInstance:Irc) -> None:
# Add Irc Object to the module
self.Irc = ircInstance
# Add Global Configuration to the module
self.Config = ircInstance.Config
# Add Base object to the module
self.Base = ircInstance.Base
# Add logs object to the module
self.Logs = ircInstance.Base.logs
# Add User object to the module
self.User = ircInstance.User
# Add Channel object to the module
self.Channel = ircInstance.Channel
# Créer les nouvelles commandes du module
self.commands_level = {
0: ['vote_for', 'vote_against'],
1: ['activate', 'deactivate', 'submit', 'vote_stat', 'vote_verdict', 'vote_cancel']
}
# Init the module
self.__init_module()
# Log the module
self.Logs.debug(f'Module {self.__class__.__name__} loaded ...')
def __init_module(self) -> None:
self.__set_commands(self.commands_level)
self.__create_tables()
self.join_saved_channels()
return None
def __set_commands(self, commands:dict[int, list[str]]) -> None:
"""### Rajoute les commandes du module au programme principal
Args:
commands (list): Liste des commandes du module
"""
for level, com in commands.items():
for c in commands[level]:
if not c in self.Irc.commands:
self.Irc.commands_level[level].append(c)
self.Irc.commands.append(c)
return None
def __create_tables(self) -> None:
"""Methode qui va créer la base de donnée si elle n'existe pas.
Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module
Args:
database_name (str): Nom de la base de données ( pas d'espace dans le nom )
Returns:
None: Aucun retour n'es attendu
"""
table_logs = '''CREATE TABLE IF NOT EXISTS votekick_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
server_msg TEXT
)
'''
table_vote = '''CREATE TABLE IF NOT EXISTS votekick_channel (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
channel TEXT
)
'''
self.Base.db_execute_query(table_logs)
self.Base.db_execute_query(table_vote)
return None
def unload(self) -> None:
try:
for chan in self.VOTE_CHANNEL_DB:
self.Irc.send2socket(f":{self.Config.SERVICE_NICKNAME} PART {chan.channel_name}")
self.VOTE_CHANNEL_DB = []
self.Logs.debug(f'Delete memory DB VOTE_CHANNEL_DB: {self.VOTE_CHANNEL_DB}')
return None
except UnboundLocalError as ne:
self.Logs.error(f'{ne}')
except NameError as ue:
self.Logs.error(f'{ue}')
except:
self.Logs.error('Error on the module')
def init_vote_system(self, channel: str) -> bool:
response = False
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
chan.target_user = ''
chan.voter_users = []
chan.vote_against = 0
chan.vote_for = 0
response = True
return response
def insert_vote_channel(self, ChannelObject: VoteChannelModel) -> bool:
result = False
found = False
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == ChannelObject.channel_name:
found = True
if not found:
self.VOTE_CHANNEL_DB.append(ChannelObject)
self.Logs.debug(f"The channel has been added {ChannelObject}")
self.db_add_vote_channel(ChannelObject.channel_name)
return result
def db_add_vote_channel(self, channel:str) -> bool:
"""Cette fonction ajoute les salons ou seront autoriser les votes
Args:
channel (str): le salon à enregistrer.
"""
current_datetime = self.Base.get_datetime()
mes_donnees = {'channel': channel}
response = self.Base.db_execute_query("SELECT id FROM votekick_channel WHERE channel = :channel", mes_donnees)
isChannelExist = response.fetchone()
if isChannelExist is None:
mes_donnees = {'datetime': current_datetime, 'channel': channel}
insert = self.Base.db_execute_query(f"INSERT INTO votekick_channel (datetime, channel) VALUES (:datetime, :channel)", mes_donnees)
if insert.rowcount > 0:
return True
else:
return False
else:
return False
def db_delete_vote_channel(self, channel: str) -> bool:
"""Cette fonction supprime les salons de join de Defender
Args:
channel (str): le salon à enregistrer.
"""
mes_donnes = {'channel': channel}
response = self.Base.db_execute_query("DELETE FROM votekick_channel WHERE channel = :channel", mes_donnes)
affected_row = response.rowcount
if affected_row > 0:
return True
else:
return False
def join_saved_channels(self) -> None:
result = self.Base.db_execute_query("SELECT id, channel FROM votekick_channel")
channels = result.fetchall()
unixtime = self.Base.get_unixtime()
for channel in channels:
id, chan = channel
self.insert_vote_channel(self.VoteChannelModel(channel_name=chan, target_user='', voter_users=[], vote_for=0, vote_against=0))
self.Irc.send2socket(f":{self.Config.SERVEUR_ID} SJOIN {unixtime} {chan} + :{self.Config.SERVICE_ID}")
self.Irc.send2socket(f":{self.Config.SERVICE_NICKNAME} SAMODE {chan} +o {self.Config.SERVICE_NICKNAME}")
return None
def is_vote_ongoing(self, channel: str) -> bool:
response = False
for vote in self.VOTE_CHANNEL_DB:
if vote.channel_name == channel:
if vote.target_user:
response = True
return response
def timer_vote_verdict(self, channel: str) -> None:
dnickname = self.Config.SERVICE_NICKNAME
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
target_user = self.User.get_nickname(chan.target_user)
if chan.vote_for > chan.vote_against:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :The user {self.Config.CONFIG_COLOR["gras"]}{target_user}{self.Config.CONFIG_COLOR["nogc"]} will be kicked from this channel')
self.Irc.send2socket(f":{dnickname} KICK {channel} {target_user} Following the vote, you are not welcome in {channel}")
self.Channel.delete_user_from_channel(channel, self.User.get_uid(target_user))
elif chan.vote_for <= chan.vote_against:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :This user will stay on this channel')
# Init the system
if self.init_vote_system(channel):
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :System vote re initiated')
return None
def cmd(self, data:list) -> None:
cmd = list(data).copy()
match cmd[2]:
case 'SJOIN':
pass
case _:
pass
return None
def _hcmds(self, user:str, cmd: list, fullcmd: list = []) -> None:
# cmd is the command starting from the user command
# full cmd is sending the entire server response
command = str(cmd[0]).lower()
dnickname = self.Config.SERVICE_NICKNAME
fromuser = user
if len(fullcmd) >= 3:
fromchannel = str(fullcmd[2]).lower() if self.Base.Is_Channel(str(fullcmd[2]).lower()) else None
else:
fromchannel = None
if len(cmd) >= 2:
sentchannel = str(cmd[1]).lower() if self.Base.Is_Channel(str(cmd[1]).lower()) else None
else:
sentchannel = None
if not fromchannel is None:
channel = fromchannel
elif not sentchannel is None:
channel = sentchannel
else:
channel = None
match command:
case 'vote_cancel':
try:
if channel is None:
self.Logs.error(f"The channel is not known, defender can't cancel the vote")
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :You need to specify the channel => /msg {dnickname} vote_cancel #channel')
for vote in self.VOTE_CHANNEL_DB:
if vote.channel_name == channel:
self.init_vote_system(channel)
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :Vote system re-initiated')
except IndexError as ke:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} vote_cancel #channel')
self.Logs.error(f'Index Error: {ke}')
case 'vote_for':
try:
# vote_for
channel = str(fullcmd[2]).lower()
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
if fromuser in chan.voter_users:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :You already submitted a vote')
else:
chan.vote_for += 1
chan.voter_users.append(fromuser)
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :Vote recorded, thank you')
except KeyError as ke:
self.Logs.error(f'Key Error: {ke}')
except IndexError as ie:
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} vote_cancel #channel')
self.Logs.error(f'Index Error: {ie}')
case 'vote_against':
try:
# vote_against
channel = str(fullcmd[2]).lower()
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
if fromuser in chan.voter_users:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :You already submitted a vote')
else:
chan.vote_against += 1
chan.voter_users.append(fromuser)
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :Vote recorded, thank you')
except KeyError as ke:
self.Logs.error(f'Key Error: {ke}')
case 'vote_stat':
try:
# channel = str(fullcmd[2]).lower()
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :Channel: {chan.channel_name} | Target: {self.User.get_nickname(chan.target_user)} | For: {chan.vote_for} | Against: {chan.vote_against} | Number of voters: {str(len(chan.voter_users))}')
except KeyError as ke:
self.Logs.error(f'Key Error: {ke}')
case 'vote_verdict':
try:
# channel = str(fullcmd[2]).lower()
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
target_user = self.User.get_nickname(chan.target_user)
if chan.vote_for > chan.vote_against:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :The user {self.Config.CONFIG_COLOR["gras"]}{target_user}{self.Config.CONFIG_COLOR["nogc"]} will be kicked from this channel')
self.Irc.send2socket(f":{dnickname} KICK {channel} {target_user} Following the vote, you are not welcome in {channel}")
elif chan.vote_for <= chan.vote_against:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :This user will stay on this channel')
# Init the system
if self.init_vote_system(channel):
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :System vote re initiated')
except KeyError as ke:
self.Logs.error(f'Key Error: {ke}')
case 'submit':
# submit nickname
try:
nickname_submitted = cmd[1]
# channel = str(fullcmd[2]).lower()
uid_submitted = self.User.get_uid(nickname_submitted)
user_submitted = self.User.get_User(nickname_submitted)
# check if there is an ongoing vote
if self.is_vote_ongoing(channel):
for vote in self.VOTE_CHANNEL_DB:
if vote.channel_name == channel:
ongoing_user = self.User.get_nickname(vote.target_user)
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :There is an ongoing vote on {ongoing_user}')
return False
# check if the user exist
if user_submitted is None:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :This nickname <{nickname_submitted}> do not exist')
return False
uid_cleaned = self.Base.clean_uid(uid_submitted)
ChannelInfo = self.Channel.get_Channel(channel)
clean_uids_in_channel: list = []
for uid in ChannelInfo.uids:
clean_uids_in_channel.append(self.Base.clean_uid(uid))
if not uid_cleaned in clean_uids_in_channel:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :This nickname <{nickname_submitted}> is not available in this channel')
return False
# check if Ircop or Service or Bot
pattern = fr'[o|B|S]'
operator_user = re.findall(pattern, user_submitted.umodes)
if operator_user:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :You cant vote for this user ! he/she is protected')
return False
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
chan.target_user = self.User.get_uid(nickname_submitted)
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :{nickname_submitted} has been targeted for a vote')
self.Base.create_timer(60, self.timer_vote_verdict, (channel, ))
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :This vote will end after 60 secondes')
except KeyError as ke:
self.Logs.error(f'Key Error: {ke}')
except TypeError as te:
self.Logs.error(te)
case 'activate':
try:
# activate #channel
# channel = str(cmd[1]).lower()
self.insert_vote_channel(
self.VoteChannelModel(
channel_name=channel,
target_user='',
voter_users=[],
vote_for=0,
vote_against=0
)
)
self.Irc.send2socket(f":{dnickname} JOIN {channel}")
self.Irc.send2socket(f":{dnickname} SAMODE {channel} +o {dnickname}")
self.Irc.send2socket(f":{dnickname} PRIVMSG {channel} :You can now use !submit <nickname> to decide if he will stay or not on this channel ")
except KeyError as ke:
self.Logs.error(f"Key Error : {ke}")
case 'deactivate':
try:
# deactivate #channel
# channel = str(cmd[1]).lower()
self.Irc.send2socket(f":{dnickname} SAMODE {channel} -o {dnickname}")
self.Irc.send2socket(f":{dnickname} PART {channel}")
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
self.VOTE_CHANNEL_DB.remove(chan)
self.db_delete_vote_channel(chan.channel_name)
self.Logs.debug(f"Test logs ready")
except KeyError as ke:
self.Logs.error(f"Key Error : {ke}")

View File

@@ -1,3 +1,3 @@
{
"version": "4.0.2"
"version": "5.0.6"
}