Version 5 with dataclasses and new vote system

This commit is contained in:
adator85
2024-08-17 13:32:11 +02:00
parent ab593b0ae6
commit deae79db57
8 changed files with 1074 additions and 383 deletions

1
.gitignore vendored
View File

@@ -3,3 +3,4 @@ db/
logs/
__pycache__/
configuration.json
test.py

View File

@@ -68,7 +68,7 @@ class User:
if record.uid == uid:
self.UID_DB.remove(record)
result = True
self.log.debug(f'UID ({record.uid}) has been created')
self.log.debug(f'UID ({record.uid}) has been deleted')
if not result:
self.log.critical(f'The UID {uid} was not deleted')
@@ -111,7 +111,6 @@ class User:
self.log.debug(f'The value {uidornickname} -- {nickname}')
return nickname
class Admin:
@dataclass
@@ -216,3 +215,108 @@ class Admin:
nickname = record.nickname
self.log.debug(f'The value {uidornickname} -- {nickname}')
return nickname
class Channel:
@dataclass
class ChannelModel:
name: str
mode: str
uids: list
UID_CHANNEL_DB: list[ChannelModel] = []
def __init__(self, Base: Base) -> None:
self.log = Base.logs
pass
def insert(self, newChan: ChannelModel) -> bool:
result = False
exist = False
for record in self.UID_CHANNEL_DB:
if record.name == newChan.name:
exist = True
self.log.debug(f'{record.name} already exist')
for user in newChan.uids:
record.uids.append(user)
# Supprimer les doublons
del_duplicates = list(set(record.uids))
record.uids = del_duplicates
self.log.debug(f'Updating a new UID to the channel {record}')
if not exist:
self.UID_CHANNEL_DB.append(newChan)
result = True
self.log.debug(f'New Channel Created: ({newChan})')
if not result:
self.log.critical(f'The Channel Object was not inserted {newChan}')
return result
def update(self, name: str, newMode: str) -> bool:
result = False
for record in self.UID_CHANNEL_DB:
if record.name == name:
record.mode = newMode
result = True
self.log.debug(f'Mode ({record.name}) has been updated with new mode {newMode}')
if not result:
self.log.critical(f'The channel mode {newMode} was not updated, name = {name}')
return result
def delete(self, name: str) -> bool:
result = False
for record in self.UID_CHANNEL_DB:
if record.name == name:
self.UID_CHANNEL_DB.remove(record)
result = True
self.log.debug(f'Channel ({record.name}) has been created')
if not result:
self.log.critical(f'The Channel {name} was not deleted')
return result
def delete_user_from_channel(self,chan_name: str, uid:str) -> bool:
result = False
for record in self.UID_CHANNEL_DB:
if record.name == chan_name:
record.uids.remove(uid)
self.log.debug(f'uid {uid} has been removed, here is the new object: {record}')
result = True
return result
def get_Channel(self, name: str) -> Union[ChannelModel, None]:
Channel = None
for record in self.UID_CHANNEL_DB:
if record.name == name:
Channel = record
self.log.debug(f'Search {name} -- result = {Channel}')
return Channel
def get_mode(self, name:str) -> Union[str, None]:
mode = None
for record in self.UID_CHANNEL_DB:
if record.name == name:
mode = record.mode
self.log.debug(f'The mode of the channel {name} has been found: {mode}')
return mode

View File

@@ -1,4 +1,4 @@
import time, threading, os, random, socket, hashlib, ipaddress, logging, requests, json, sys
import time, threading, os, random, socket, hashlib, ipaddress, logging, requests, json, re
from typing import Union
from base64 import b64decode
from datetime import datetime
@@ -507,3 +507,10 @@ class Base:
# Vider le dictionnaire de fonction
self.periodic_func.clear()
def clean_uid(self, uid:str) -> str:
pattern = fr'[@|%|\+|~|\*]*'
parsed_UID = re.sub(pattern, '', uid)
return parsed_UID

View File

@@ -3,7 +3,7 @@ from ssl import SSLSocket
from datetime import datetime, timedelta
from typing import Union
from core.loadConf import Config
from core.Model import User, Admin
from core.Model import User, Admin, Channel
from core.base import Base
class Irc:
@@ -12,8 +12,6 @@ class Irc:
self.defender_connexion_datetime = datetime.now() # Date et heure de la premiere connexion de Defender
self.first_score: int = 100
#self.db_uid = {} # Definir la variable qui contiendra la liste des utilisateurs connectés au réseau
#self.db_admin = {} # Definir la variable qui contiendra la liste des administrateurs
self.db_chan = [] # Definir la variable qui contiendra la liste des salons
self.loaded_classes:dict[str, 'Irc'] = {} # Definir la variable qui contiendra la liste modules chargés
self.beat = 30 # Lancer toutes les 30 secondes des actions de nettoyages
@@ -32,7 +30,7 @@ class Irc:
self.commands_level = {
0: ['help', 'auth', 'copyright'],
1: ['load','reload','unload', 'deauth', 'uptime', 'checkversion'],
2: ['show_modules', 'show_timers', 'show_threads', 'sentinel'],
2: ['show_modules', 'show_timers', 'show_threads', 'show_channels'],
3: ['quit', 'restart','addaccess','editaccess', 'delaccess']
}
@@ -45,12 +43,18 @@ class Irc:
self.Base = Base(self.Config)
self.User = User(self.Base)
self.Admin = Admin(self.Base)
self.Channel = Channel(self.Base)
self.Base.create_thread(func=self.heartbeat, func_args=(self.beat, ))
##############################################
# CONNEXION IRC #
##############################################
def init_irc(self, ircInstance:'Irc') -> None:
"""Create a socket and connect to irc server
Args:
ircInstance (Irc): Instance of Irc object.
"""
try:
self.__create_socket()
self.__connect_to_irc(ircInstance)
@@ -58,7 +62,8 @@ class Irc:
self.Base.logs.critical(f'Assertion error: {ae}')
def __create_socket(self) -> None:
"""Create a socket to connect SSL or Normal connection
"""
try:
soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM or socket.SOCK_NONBLOCK)
connexion_information = (self.Config.SERVEUR_IP, self.Config.SERVEUR_PORT)
@@ -69,9 +74,8 @@ class Irc:
ssl_connexion = ssl_context.wrap_socket(soc, server_hostname=self.Config.SERVEUR_HOSTNAME)
ssl_connexion.connect(connexion_information)
self.IrcSocket:SSLSocket = ssl_connexion
self.Base.logs.info(f"Connexion en mode SSL : Version = {self.IrcSocket.version()}")
self.SSL_VERSION = self.IrcSocket.version()
self.Base.logs.info(f"Connexion en mode SSL : Version = {self.SSL_VERSION}")
else:
soc.connect(connexion_information)
self.IrcSocket:socket.socket = soc
@@ -166,7 +170,7 @@ class Irc:
# except Exception as e:
# self.debug(f"Exception: {e}")
def __link(self, writer:socket.socket) -> None:
def __link(self, writer:Union[socket.socket, SSLSocket]) -> None:
"""Créer le link et envoyer les informations nécessaires pour la
connexion au serveur.
@@ -256,6 +260,10 @@ class Irc:
except AssertionError as ae:
self.Base.logs.error(f"Assertion error : {ae}")
def unload(self) -> None:
# This is only to reference the method
return None
##############################################
# FIN CONNEXION IRC #
##############################################
@@ -387,7 +395,15 @@ class Irc:
my_class = getattr(loaded_module, class_name, None) # Récuperer le nom de classe
create_instance_of_the_class = my_class(self.ircObject) # Créer une nouvelle instance de la classe
self.loaded_classes[class_name] = create_instance_of_the_class # Charger la nouvelle class dans la variable globale
if not hasattr(create_instance_of_the_class, 'cmd'):
self.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} :Module {module_name} ne contient pas de méthode cmd")
self.Base.logs.critical(f"The Module {module_name} has not been loaded because cmd method is not available")
self.Base.db_delete_module(module_name)
return False
# Charger la nouvelle class dans la variable globale
self.loaded_classes[class_name] = create_instance_of_the_class
# Enregistrer le module dans la base de données
if not init:
@@ -442,32 +458,15 @@ class Irc:
return None
def insert_db_chan(self, channel:str) -> bool:
"""Ajouter l'ensemble des salons dans la variable {CHAN_DB}
Args:
channel (str): le salon à insérer dans {CHAN_DB}
Returns:
bool: True si insertion OK / False si insertion KO
"""
if channel in self.db_chan:
return False
response = True
# Ajouter un nouveau salon
self.db_chan.append(channel)
# Supprimer les doublons de la liste
self.db_chan = list(set(self.db_chan))
self.Base.logs.debug(f"Le salon {channel} a été ajouté à la liste CHAN_DB")
return response
def create_defender_user(self, nickname:str, level: int, password:str) -> str:
nickname = self.User.get_nickname(nickname)
get_user = self.User.get_User(nickname)
if get_user is None:
response = f'This nickname {nickname} does not exist, it is not possible to create this user'
self.Base.logs.warning(response)
return response
nickname = get_user.nickname
response = ''
if level > 4:
@@ -475,14 +474,8 @@ class Irc:
self.Base.logs.warning(response)
return response
# Verification si le user existe dans notre UID_DB
if not nickname in self.db_uid:
response = f"{nickname} n'est pas connecté, impossible de l'enregistrer pour le moment"
self.Base.logs.warning(response)
return response
hostname = self.db_uid[nickname]['hostname']
vhost = self.db_uid[nickname]['vhost']
hostname = get_user.hostname
vhost = get_user.vhost
spassword = self.Base.crypt_password(password)
mes_donnees = {'admin': nickname}
@@ -626,11 +619,14 @@ class Irc:
# if self.Config.ABUSEIPDB == 1:
# self.Base.create_thread(self.abuseipdb_scan, (cmd[2], ))
self.first_connexion_ip = cmd[2]
self.first_score = cmd[3]
self.first_score = int(cmd[3])
pass
# Possibilité de déclancher les bans a ce niveau.
except IndexError as ie:
self.Base.logs.error(f'{ie}')
except ValueError as ve:
self.first_score = 0
self.Base.logs.error(f'Impossible to convert first_score: {ve}')
case '320':
#:irc.deb.biz.st 320 PyDefender IRCParis07 :is in security-groups: known-users,webirc-users,tls-and-known-users,tls-users
@@ -651,8 +647,7 @@ class Irc:
if self.INIT == 1:
current_version = self.Config.current_version
latest_version = self.Config.latest_version
if current_version != latest_version:
if self.Base.check_for_new_version(False):
version = f'{current_version} >>> {latest_version}'
else:
version = f'{current_version}'
@@ -715,11 +710,56 @@ class Irc:
newnickname = cmd[2]
self.User.update(uid, newnickname)
case 'MODE':
#['@msgid=d0ySx56Yd0nc35oHts2SkC-/J9mVUA1hfM6+Z4494xWUg;time=2024-08-09T12:45:36.651Z',
# ':001', 'MODE', '#a', '+nt', '1723207536']
cmd.pop(0)
if '#' in cmd[2]:
channel = cmd[2]
mode = cmd[3]
self.Channel.update(channel, mode)
case 'SJOIN':
# ['@msgid=ictnEBhHmTUHzkEeVZl6rR;time=2023-12-28T20:03:18.482Z', ':001', 'SJOIN', '1702139101', '#stats', '+nst', ':@001SB890A', '@00BAAAAAI']
# ['@msgid=5sTwGdj349D82L96p749SY;time=2024-08-15T09:50:23.528Z', ':001', 'SJOIN', '1721564574', '#welcome', ':001JD94QH']
# ['@msgid=bvceb6HthbLJapgGLXn1b0;time=2024-08-15T09:50:11.464Z', ':001', 'SJOIN', '1721564574', '#welcome', '+lnrt', '13', ':001CIVLQF', '+11ZAAAAAB', '001QGR10C', '*@0014UE10B', '001NL1O07', '001SWZR05', '001HB8G04', '@00BAAAAAJ', '0019M7101']
cmd.pop(0)
channel = cmd[3]
self.insert_db_chan(channel)
mode = cmd[4]
len_cmd = len(cmd)
list_users:list = []
start_boucle = 0
# Trouver le premier user
for i in range(len_cmd):
s: list = re.findall(fr':', cmd[i])
if s:
start_boucle = i
# Boucle qui va ajouter l'ensemble des users (UID)
for i in range(start_boucle, len(cmd)):
parsed_UID = str(cmd[i])
pattern = fr'[:|@|%|\+|~|\*]*'
pattern = fr':'
parsed_UID = re.sub(pattern, '', parsed_UID)
list_users.append(parsed_UID)
self.Channel.insert(
self.Channel.ChannelModel(
name=channel,
mode=mode,
uids=list_users
)
)
case 'PART':
# ['@unrealircd.org/geoip=FR;unrealircd.org/userhost=50d6492c@80.214.73.44;unrealircd.org/userip=50d6492c@80.214.73.44;msgid=YSIPB9q4PcRu0EVfC9ci7y-/mZT0+Gj5FLiDSZshH5NCw;time=2024-08-15T15:35:53.772Z',
# ':001EPFBRD', 'PART', '#welcome', ':WEB', 'IRC', 'Paris']
uid = str(cmd[1]).replace(':','')
channel = str(cmd[3])
self.Channel.delete_user_from_channel(channel, uid)
pass
case 'UID':
# ['@s2s-md/geoip=cc=GB|cd=United\\sKingdom|asn=16276|asname=OVH\\sSAS;s2s-md/tls_cipher=TLSv1.3-TLS_CHACHA20_POLY1305_SHA256;s2s-md/creationtime=1721564601',
@@ -741,7 +781,7 @@ class Irc:
else:
remote_ip = '127.0.0.1'
score_connexion = str(self.first_score)
score_connexion = self.first_score
self.User.insert(
self.User.UserModel(
@@ -796,7 +836,7 @@ class Irc:
cmd_to_send = convert_to_string.replace(':','')
self.Base.log_cmd(user_trigger, cmd_to_send)
self._hcmds(user_trigger, arg)
self._hcmds(user_trigger, arg, cmd)
if cmd[2] == self.Config.SERVICE_ID:
pattern = fr'^:.*?:(.*)$'
@@ -834,7 +874,7 @@ class Irc:
cmd_to_send = convert_to_string.replace(':','')
self.Base.log_cmd(self.User.get_nickname(user_trigger), cmd_to_send)
self._hcmds(user_trigger, arg)
self._hcmds(user_trigger, arg, cmd)
except IndexError as io:
self.Base.logs.error(f'{io}')
@@ -850,7 +890,7 @@ class Irc:
except IndexError as ie:
self.Base.logs.error(f"{ie} / {cmd} / length {str(len(cmd))}")
def _hcmds(self, user: str, cmd:list) -> None:
def _hcmds(self, user: str, cmd:list, fullcmd: list = []) -> None:
fromuser = self.User.get_nickname(user) # Nickname qui a lancé la commande
uid = self.User.get_uid(fromuser) # Récuperer le uid de l'utilisateur
@@ -871,7 +911,7 @@ class Irc:
# Envoyer la commande aux classes dynamiquement chargées
if command != 'notallowed':
for classe_name, classe_object in self.loaded_classes.items():
classe_object._hcmds(user, cmd)
classe_object._hcmds(user, cmd, fullcmd)
match command:
@@ -1100,7 +1140,7 @@ class Irc:
if 'mods.' + module_name in sys.modules:
self.loaded_classes[class_name].unload()
self.Base.logs.info('Module Already Loaded ... reload the module ...')
self.Base.logs.info('Module Already Loaded ... reloading the module ...')
the_module = sys.modules['mods.' + module_name]
importlib.reload(the_module)
@@ -1154,8 +1194,11 @@ class Irc:
reason.append(cmd[i])
final_reason = ' '.join(reason)
self.db_uid.clear() #Vider UID_DB
self.db_chan = [] #Vider les salons
# self.db_uid.clear() #Vider UID_DB
# self.db_chan = [] #Vider les salons
self.User.UID_DB.clear() # Clear User Object
self.Channel.UID_CHANNEL_DB.clear() # Clear Channel Object
for class_name in self.loaded_classes:
self.loaded_classes[class_name].unload()
@@ -1196,6 +1239,17 @@ class Irc:
self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :{str(running_thread_name)}")
case 'show_channels':
for chan in self.Channel.UID_CHANNEL_DB:
list_nicknames: list = []
for uid in chan.uids:
pattern = fr'[:|@|%|\+|~|\*]*'
parsed_UID = re.sub(pattern, '', uid)
list_nicknames.append(self.User.get_nickname(parsed_UID))
self.send2socket(f":{dnickname} NOTICE {fromuser} : Channel: {chan.name} - Users: {list_nicknames}")
case 'uptime':
uptime = self.get_defender_uptime()
self.send2socket(f':{dnickname} NOTICE {fromuser} : {uptime}')
@@ -1203,22 +1257,6 @@ class Irc:
case 'copyright':
self.send2socket(f':{dnickname} NOTICE {fromuser} : # Defender V.{self.Config.current_version} Developped by adator® and dktmb® #')
case 'sentinel':
# .sentinel on
activation = str(cmd[1]).lower()
service_id = self.Config.SERVICE_ID
channel_to_dont_quit = [self.Config.SALON_JAIL, dchanlog]
if activation == 'on':
for chan in self.db_chan:
if not chan in channel_to_dont_quit:
self.send2socket(f":{service_id} JOIN {chan}")
if activation == 'off':
for chan in self.db_chan:
if not chan in channel_to_dont_quit:
self.send2socket(f":{service_id} PART {chan}")
case 'checkversion':
self.Base.create_thread(

View File

@@ -76,7 +76,7 @@ class Config:
configuration:dict[str, Union[str, int, list, dict]] = json.load(configuration_data)
for key, value in configuration['CONFIG_COLOR'].items():
configuration['CONFIG_COLOR'][key] = value.encode('utf-8').decode('unicode_escape')
configuration['CONFIG_COLOR'][key] = str(value).encode('utf-8').decode('unicode_escape')
return configuration

File diff suppressed because it is too large Load Diff

View File

@@ -1,11 +1,10 @@
import threading
from core.irc import Irc
# Le module crée devra réspecter quelques conditions
# 1. Importer le module de configuration
# 2. Le nom de class devra toujours s'appeler comme le module exemple => nom de class Dktmb | nom du module mod_dktmb
# 3. la fonction __init__ devra toujours avoir les parametres suivant (self, irc:object)
# 1 . Créer la variable irc dans le module
# 1 . Créer la variable Irc dans le module
# 2 . Récuperer la configuration dans une variable
# 3 . Définir et enregistrer les nouvelles commandes
# 4. une fonction _hcmds(self, user:str, cmd: list) devra toujours etre crée.
@@ -13,37 +12,59 @@ from core.irc import Irc
class Test():
def __init__(self, ircInstance:Irc) -> None:
print(f'Module {self.__class__.__name__} loaded ...')
self.irc = ircInstance # Ajouter l'object mod_irc a la classe
# Add Irc Object to the module
self.Irc = ircInstance
self.config = ircInstance.Config # Ajouter la configuration a la classe
# Add Global Configuration to the module
self.Config = ircInstance.Config
# Add Base object to the module
self.Base = ircInstance.Base
# Add logs object to the module
self.Logs = ircInstance.Base.logs
# Add User object to the module
self.User = ircInstance.User
# Add Channel object to the module
self.Channel = ircInstance.Channel
# Créer les nouvelles commandes du module
self.commands = ['test']
self.commands_level = {
0: ['test'],
1: ['test_level_1']
}
self.__set_commands(self.commands) # Enrigstrer les nouvelles commandes dans le code
# Init the module
self.__init_module()
self.core = ircInstance.Base # Instance du module Base
# Log the module
self.Logs.debug(f'Module {self.__class__.__name__} loaded ...')
self.session = '' # Instancier une session pour la base de données
self.__create_db('mod_test') # Créer la base de données si necessaire
def __init_module(self) -> None:
def __set_commands(self, commands:list) -> None:
"""Rajoute les commandes du module au programme principal
self.__set_commands(self.commands_level)
self.__create_tables()
return None
def __set_commands(self, commands:dict[int, list[str]]) -> None:
"""### Rajoute les commandes du module au programme principal
Args:
commands (list): Liste des commandes du module
Returns:
None: Aucun retour attendu
"""
for command in commands:
self.irc.commands.append(command)
for level, com in commands.items():
for c in commands[level]:
if not c in self.Irc.commands:
self.Irc.commands_level[level].append(c)
self.Irc.commands.append(c)
return True
return None
def __create_db(self, db_name:str) -> None:
def __create_tables(self) -> None:
"""Methode qui va créer la base de donnée si elle n'existe pas.
Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module
Args:
@@ -52,36 +73,36 @@ class Test():
Returns:
None: Aucun retour n'es attendu
"""
db_directory = self.core.MODS_DB_PATH
self.session = self.core.db_init(db_directory, db_name)
table_logs = '''CREATE TABLE IF NOT EXISTS logs (
table_logs = '''CREATE TABLE IF NOT EXISTS test_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
server_msg TEXT
)
'''
self.core.db_execute_query(self.session, table_logs)
self.Base.db_execute_query(table_logs)
return None
def unload(self) -> None:
return None
def _hcmds(self, user:str, cmd: list) -> None:
def cmd(self, data:list) -> None:
return None
command = cmd[0].lower()
def _hcmds(self, user:str, cmd: list, fullcmd: list = []) -> None:
command = str(cmd[0]).lower()
dnickname = self.Config.SERVICE_NICKNAME
fromuser = user
match command:
case 'test':
try:
user_action = cmd[1]
self.irc.send2socket(f'PRIVMSG #webmail Je vais voicer {user}')
self.irc.send2socket(f'MODE #webmail +v {user_action}')
self.core.create_log(f"MODE +v sur {user_action}")
except KeyError as ke:
self.core.create_log(f"Key Error : {ke}")
self.Irc.send2socket(f":{dnickname} NOTICE {fromuser} : test command ready ...")
self.Logs.debug(f"Test logs ready")
except KeyError as ke:
self.Logs.error(f"Key Error : {ke}")

383
mods/mod_votekick.py Normal file
View File

@@ -0,0 +1,383 @@
from core.irc import Irc
import re
from dataclasses import dataclass, field
# Activer le systeme sur un salon (activate #salon)
# Le service devra se connecter au salon
# Le service devra se mettre en op
# Soumettre un nom de user (submit nickname)
# voter pour un ban (vote_for)
# voter contre un ban (vote_against)
class Votekick():
@dataclass
class VoteChannelModel:
channel_name: str
target_user: str
voter_users: list
vote_for: int
vote_against: int
VOTE_CHANNEL_DB:list[VoteChannelModel] = []
def __init__(self, ircInstance:Irc) -> None:
# Add Irc Object to the module
self.Irc = ircInstance
# Add Global Configuration to the module
self.Config = ircInstance.Config
# Add Base object to the module
self.Base = ircInstance.Base
# Add logs object to the module
self.Logs = ircInstance.Base.logs
# Add User object to the module
self.User = ircInstance.User
# Add Channel object to the module
self.Channel = ircInstance.Channel
# Créer les nouvelles commandes du module
self.commands_level = {
0: ['vote_for', 'vote_against'],
1: ['activate', 'deactivate', 'submit', 'vote_stat', 'vote_verdict']
}
# Init the module
self.__init_module()
# Log the module
self.Logs.debug(f'Module {self.__class__.__name__} loaded ...')
def __init_module(self) -> None:
self.__set_commands(self.commands_level)
self.__create_tables()
self.join_saved_channels()
return None
def __set_commands(self, commands:dict[int, list[str]]) -> None:
"""### Rajoute les commandes du module au programme principal
Args:
commands (list): Liste des commandes du module
"""
for level, com in commands.items():
for c in commands[level]:
if not c in self.Irc.commands:
self.Irc.commands_level[level].append(c)
self.Irc.commands.append(c)
return None
def __create_tables(self) -> None:
"""Methode qui va créer la base de donnée si elle n'existe pas.
Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module
Args:
database_name (str): Nom de la base de données ( pas d'espace dans le nom )
Returns:
None: Aucun retour n'es attendu
"""
table_logs = '''CREATE TABLE IF NOT EXISTS votekick_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
server_msg TEXT
)
'''
table_vote = '''CREATE TABLE IF NOT EXISTS votekick_channel (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
channel TEXT
)
'''
self.Base.db_execute_query(table_logs)
self.Base.db_execute_query(table_vote)
return None
def unload(self) -> None:
try:
for chan in self.VOTE_CHANNEL_DB:
self.Irc.send2socket(f":{self.Config.SERVICE_NICKNAME} PART {chan.channel_name}")
self.VOTE_CHANNEL_DB = []
self.Logs.debug(f'Delete memory DB VOTE_CHANNEL_DB: {self.VOTE_CHANNEL_DB}')
return None
except UnboundLocalError as ne:
self.Logs.error(f'{ne}')
except NameError as ue:
self.Logs.error(f'{ue}')
except:
self.Logs.error('Error on the module')
def init_vote_system(self, channel: str) -> bool:
response = False
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
chan.target_user = ''
chan.voter_users = []
chan.vote_against = 0
chan.vote_for = 0
response = True
return response
def insert_vote_channel(self, ChannelObject: VoteChannelModel) -> bool:
result = False
found = False
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == ChannelObject.channel_name:
found = True
if not found:
self.VOTE_CHANNEL_DB.append(ChannelObject)
self.Logs.debug(f"The channel has been added {ChannelObject}")
self.db_add_vote_channel(ChannelObject.channel_name)
return result
def db_add_vote_channel(self, channel:str) -> bool:
"""Cette fonction ajoute les salons ou seront autoriser les votes
Args:
channel (str): le salon à enregistrer.
"""
current_datetime = self.Base.get_datetime()
mes_donnees = {'channel': channel}
response = self.Base.db_execute_query("SELECT id FROM votekick_channel WHERE channel = :channel", mes_donnees)
isChannelExist = response.fetchone()
if isChannelExist is None:
mes_donnees = {'datetime': current_datetime, 'channel': channel}
insert = self.Base.db_execute_query(f"INSERT INTO votekick_channel (datetime, channel) VALUES (:datetime, :channel)", mes_donnees)
if insert.rowcount > 0:
return True
else:
return False
else:
return False
def db_delete_vote_channel(self, channel: str) -> bool:
"""Cette fonction supprime les salons de join de Defender
Args:
channel (str): le salon à enregistrer.
"""
mes_donnes = {'channel': channel}
response = self.Base.db_execute_query("DELETE FROM votekick_channel WHERE channel = :channel", mes_donnes)
affected_row = response.rowcount
if affected_row > 0:
return True
else:
return False
def join_saved_channels(self) -> None:
result = self.Base.db_execute_query("SELECT id, channel FROM votekick_channel")
channels = result.fetchall()
unixtime = self.Base.get_unixtime()
for channel in channels:
id, chan = channel
self.insert_vote_channel(self.VoteChannelModel(channel_name=chan, target_user='', voter_users=[], vote_for=0, vote_against=0))
self.Irc.send2socket(f":{self.Config.SERVEUR_ID} SJOIN {unixtime} {chan} + :{self.Config.SERVICE_ID}")
self.Irc.send2socket(f":{self.Config.SERVICE_NICKNAME} SAMODE {chan} +o {self.Config.SERVICE_NICKNAME}")
return None
def is_vote_ongoing(self, channel: str) -> bool:
response = False
for vote in self.VOTE_CHANNEL_DB:
if vote.channel_name == channel:
if vote.target_user:
response = True
return response
def cmd(self, data:list) -> None:
cmd = list(data).copy()
match cmd[2]:
case 'SJOIN':
pass
case _:
pass
return None
def _hcmds(self, user:str, cmd: list, fullcmd: list = []) -> None:
# cmd is the command starting from the user command
# full cmd is sending the entire server response
command = str(cmd[0]).lower()
dnickname = self.Config.SERVICE_NICKNAME
fromuser = user
match command:
case 'vote_for':
try:
# vote_for
channel = str(fullcmd[2]).lower()
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
if fromuser in chan.voter_users:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :You already submitted a vote')
else:
chan.vote_for += 1
chan.voter_users.append(fromuser)
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :Vote recorded, thank you')
except KeyError as ke:
self.Logs.error(f'Key Error: {ke}')
case 'vote_against':
try:
# vote_against
channel = str(fullcmd[2]).lower()
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
if fromuser in chan.voter_users:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :You already submitted a vote')
else:
chan.vote_against += 1
chan.voter_users.append(fromuser)
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :Vote recorded, thank you')
except KeyError as ke:
self.Logs.error(f'Key Error: {ke}')
case 'vote_stat':
try:
channel = str(fullcmd[2]).lower()
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :Channel: {chan.channel_name} | Target: {self.User.get_nickname(chan.target_user)} | For: {chan.vote_for} | Against: {chan.vote_against} | Number of voters: {str(len(chan.voter_users))}')
except KeyError as ke:
self.Logs.error(f'Key Error: {ke}')
case 'vote_verdict':
try:
channel = str(fullcmd[2]).lower()
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
target_user = self.User.get_nickname(chan.target_user)
if chan.vote_for > chan.vote_against:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :The user {self.Config.CONFIG_COLOR["gras"]}{target_user}{self.Config.CONFIG_COLOR["nogc"]} will be kicked from this channel')
self.Irc.send2socket(f":{dnickname} KICK {channel} {target_user} Following the vote, you are not welcome in {channel}")
elif chan.vote_for <= chan.vote_against:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :This user will stay on this channel')
# Init the system
if self.init_vote_system(channel):
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :System vote re initiated')
except KeyError as ke:
self.Logs.error(f'Key Error: {ke}')
case 'submit':
# submit nickname
try:
nickname_submitted = cmd[1]
channel = str(fullcmd[2]).lower()
uid_submitted = self.User.get_uid(nickname_submitted)
user_submitted = self.User.get_User(nickname_submitted)
# check if there is an ongoing vote
if self.is_vote_ongoing(channel):
for vote in self.VOTE_CHANNEL_DB:
if vote.channel_name == channel:
ongoing_user = self.User.get_nickname(vote.target_user)
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :There is an ongoing vote on {ongoing_user}')
return False
# check if the user exist
if user_submitted is None:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :This nickname <{nickname_submitted}> do not exist')
return False
uid_cleaned = self.Base.clean_uid(uid_submitted)
ChannelInfo = self.Channel.get_Channel(channel)
clean_uids_in_channel: list = []
for uid in ChannelInfo.uids:
clean_uids_in_channel.append(self.Base.clean_uid(uid))
if not uid_cleaned in clean_uids_in_channel:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :This nickname <{nickname_submitted}> is not available in this channel')
return False
# check if Ircop or Service or Bot
pattern = fr'[o|B|S]'
operator_user = re.findall(pattern, user_submitted.umodes)
if operator_user:
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :You cant vote for this user ! he/she is protected')
return False
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
chan.target_user = self.User.get_uid(nickname_submitted)
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :{nickname_submitted} has been targeted for a vote')
except KeyError as ke:
self.Logs.error(f'Key Error: {ke}')
except TypeError as te:
self.Logs.error(te)
case 'activate':
try:
# activate #channel
channel = str(cmd[1]).lower()
self.insert_vote_channel(
self.VoteChannelModel(
channel_name=channel,
target_user='',
voter_users=[],
vote_for=0,
vote_against=0
)
)
self.Irc.send2socket(f":{dnickname} JOIN {channel}")
self.Irc.send2socket(f":{dnickname} SAMODE {channel} +o {dnickname}")
self.Irc.send2socket(f":{dnickname} PRIVMSG {channel} :You can now use !submit <nickname> to decide if he will stay or not on this channel ")
except KeyError as ke:
self.Logs.error(f"Key Error : {ke}")
case 'deactivate':
try:
# deactivate #channel
channel = str(cmd[1]).lower()
self.Irc.send2socket(f":{dnickname} SAMODE {channel} -o {dnickname}")
self.Irc.send2socket(f":{dnickname} PART {channel}")
for chan in self.VOTE_CHANNEL_DB:
if chan.channel_name == channel:
self.VOTE_CHANNEL_DB.remove(chan)
self.db_delete_vote_channel(chan.channel_name)
self.Logs.debug(f"Test logs ready")
except KeyError as ke:
self.Logs.error(f"Key Error : {ke}")