mirror of
https://github.com/iio612/DEFENDER.git
synced 2026-02-13 19:24:23 +00:00
6
.gitignore
vendored
Normal file
6
.gitignore
vendored
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
.pyenv/
|
||||||
|
db/
|
||||||
|
logs/
|
||||||
|
__pycache__/
|
||||||
|
configuration.json
|
||||||
|
test.py
|
||||||
375
core/Model.py
Normal file
375
core/Model.py
Normal file
@@ -0,0 +1,375 @@
|
|||||||
|
from dataclasses import dataclass, field
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import Union
|
||||||
|
from core.base import Base
|
||||||
|
|
||||||
|
class User:
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class UserModel:
|
||||||
|
uid: str
|
||||||
|
nickname: str
|
||||||
|
username: str
|
||||||
|
hostname: str
|
||||||
|
umodes: str
|
||||||
|
vhost: str
|
||||||
|
isWebirc: bool
|
||||||
|
remote_ip: str
|
||||||
|
score_connexion: int
|
||||||
|
connexion_datetime: datetime = field(default=datetime.now())
|
||||||
|
|
||||||
|
UID_DB: list[UserModel] = []
|
||||||
|
|
||||||
|
def __init__(self, Base: Base) -> None:
|
||||||
|
self.log = Base.logs
|
||||||
|
pass
|
||||||
|
|
||||||
|
def insert(self, newUser: UserModel) -> bool:
|
||||||
|
"""Insert a new User object
|
||||||
|
|
||||||
|
Args:
|
||||||
|
newUser (UserModel): New userModel object
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if inserted
|
||||||
|
"""
|
||||||
|
result = False
|
||||||
|
exist = False
|
||||||
|
|
||||||
|
for record in self.UID_DB:
|
||||||
|
if record.uid == newUser.uid:
|
||||||
|
exist = True
|
||||||
|
self.log.debug(f'{record.uid} already exist')
|
||||||
|
|
||||||
|
if not exist:
|
||||||
|
self.UID_DB.append(newUser)
|
||||||
|
result = True
|
||||||
|
self.log.debug(f'New User Created: ({newUser})')
|
||||||
|
|
||||||
|
if not result:
|
||||||
|
self.log.critical(f'The User Object was not inserted {newUser}')
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def update(self, uid: str, newNickname: str) -> bool:
|
||||||
|
"""Update the nickname starting from the UID
|
||||||
|
|
||||||
|
Args:
|
||||||
|
uid (str): UID of the user
|
||||||
|
newNickname (str): New nickname
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if updated
|
||||||
|
"""
|
||||||
|
result = False
|
||||||
|
|
||||||
|
for record in self.UID_DB:
|
||||||
|
if record.uid == uid:
|
||||||
|
record.nickname = newNickname
|
||||||
|
result = True
|
||||||
|
self.log.debug(f'UID ({record.uid}) has been updated with new nickname {newNickname}')
|
||||||
|
|
||||||
|
if not result:
|
||||||
|
self.log.critical(f'The new nickname {newNickname} was not updated, uid = {uid}')
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def delete(self, uid: str) -> bool:
|
||||||
|
"""Delete the User starting from the UID
|
||||||
|
|
||||||
|
Args:
|
||||||
|
uid (str): UID of the user
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if deleted
|
||||||
|
"""
|
||||||
|
result = False
|
||||||
|
|
||||||
|
for record in self.UID_DB:
|
||||||
|
if record.uid == uid:
|
||||||
|
self.UID_DB.remove(record)
|
||||||
|
result = True
|
||||||
|
self.log.debug(f'UID ({record.uid}) has been deleted')
|
||||||
|
|
||||||
|
if not result:
|
||||||
|
self.log.critical(f'The UID {uid} was not deleted')
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def get_User(self, uidornickname: str) -> Union[UserModel, None]:
|
||||||
|
"""Get The User Object model
|
||||||
|
|
||||||
|
Args:
|
||||||
|
uidornickname (str): UID or Nickname
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
UserModel|None: The UserModel Object | None
|
||||||
|
"""
|
||||||
|
User = None
|
||||||
|
for record in self.UID_DB:
|
||||||
|
if record.uid == uidornickname:
|
||||||
|
User = record
|
||||||
|
elif record.nickname == uidornickname:
|
||||||
|
User = record
|
||||||
|
|
||||||
|
self.log.debug(f'Search {uidornickname} -- result = {User}')
|
||||||
|
|
||||||
|
return User
|
||||||
|
|
||||||
|
def get_uid(self, uidornickname:str) -> Union[str, None]:
|
||||||
|
"""Get the UID of the user starting from the UID or the Nickname
|
||||||
|
|
||||||
|
Args:
|
||||||
|
uidornickname (str): UID or Nickname
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str|None: Return the UID
|
||||||
|
"""
|
||||||
|
uid = None
|
||||||
|
for record in self.UID_DB:
|
||||||
|
if record.uid == uidornickname:
|
||||||
|
uid = record.uid
|
||||||
|
if record.nickname == uidornickname:
|
||||||
|
uid = record.uid
|
||||||
|
|
||||||
|
self.log.debug(f'The UID that you are looking for {uidornickname} has been found {uid}')
|
||||||
|
return uid
|
||||||
|
|
||||||
|
def get_nickname(self, uidornickname:str) -> Union[str, None]:
|
||||||
|
"""Get the Nickname starting from UID or the nickname
|
||||||
|
|
||||||
|
Args:
|
||||||
|
uidornickname (str): UID or Nickname of the user
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str|None: the nickname
|
||||||
|
"""
|
||||||
|
nickname = None
|
||||||
|
for record in self.UID_DB:
|
||||||
|
if record.nickname == uidornickname:
|
||||||
|
nickname = record.nickname
|
||||||
|
if record.uid == uidornickname:
|
||||||
|
nickname = record.nickname
|
||||||
|
self.log.debug(f'The value to check {uidornickname} -> {nickname}')
|
||||||
|
return nickname
|
||||||
|
|
||||||
|
class Admin:
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class AdminModel:
|
||||||
|
uid: str
|
||||||
|
nickname: str
|
||||||
|
username: str
|
||||||
|
hostname: str
|
||||||
|
umodes: str
|
||||||
|
vhost: str
|
||||||
|
level: int
|
||||||
|
connexion_datetime: datetime = field(default=datetime.now())
|
||||||
|
|
||||||
|
UID_ADMIN_DB: list[AdminModel] = []
|
||||||
|
|
||||||
|
def __init__(self, Base: Base) -> None:
|
||||||
|
self.log = Base.logs
|
||||||
|
pass
|
||||||
|
|
||||||
|
def insert(self, newAdmin: AdminModel) -> bool:
|
||||||
|
|
||||||
|
result = False
|
||||||
|
exist = False
|
||||||
|
|
||||||
|
for record in self.UID_ADMIN_DB:
|
||||||
|
if record.uid == newAdmin.uid:
|
||||||
|
exist = True
|
||||||
|
self.log.debug(f'{record.uid} already exist')
|
||||||
|
|
||||||
|
if not exist:
|
||||||
|
self.UID_ADMIN_DB.append(newAdmin)
|
||||||
|
result = True
|
||||||
|
self.log.debug(f'UID ({newAdmin.uid}) has been created')
|
||||||
|
|
||||||
|
if not result:
|
||||||
|
self.log.critical(f'The User Object was not inserted {newAdmin}')
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def update(self, uid: str, newNickname: str) -> bool:
|
||||||
|
|
||||||
|
result = False
|
||||||
|
|
||||||
|
for record in self.UID_ADMIN_DB:
|
||||||
|
if record.uid == uid:
|
||||||
|
record.nickname = newNickname
|
||||||
|
result = True
|
||||||
|
self.log.debug(f'UID ({record.uid}) has been updated with new nickname {newNickname}')
|
||||||
|
|
||||||
|
if not result:
|
||||||
|
self.log.critical(f'The new nickname {newNickname} was not updated, uid = {uid}')
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def delete(self, uid: str) -> bool:
|
||||||
|
|
||||||
|
result = False
|
||||||
|
|
||||||
|
for record in self.UID_ADMIN_DB:
|
||||||
|
if record.uid == uid:
|
||||||
|
self.UID_ADMIN_DB.remove(record)
|
||||||
|
result = True
|
||||||
|
self.log.debug(f'UID ({record.uid}) has been created')
|
||||||
|
|
||||||
|
if not result:
|
||||||
|
self.log.critical(f'The UID {uid} was not deleted')
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def get_Admin(self, uidornickname: str) -> Union[AdminModel, None]:
|
||||||
|
|
||||||
|
Admin = None
|
||||||
|
for record in self.UID_ADMIN_DB:
|
||||||
|
if record.uid == uidornickname:
|
||||||
|
Admin = record
|
||||||
|
elif record.nickname == uidornickname:
|
||||||
|
Admin = record
|
||||||
|
|
||||||
|
self.log.debug(f'Search {uidornickname} -- result = {Admin}')
|
||||||
|
|
||||||
|
return Admin
|
||||||
|
|
||||||
|
def get_uid(self, uidornickname:str) -> Union[str, None]:
|
||||||
|
|
||||||
|
uid = None
|
||||||
|
for record in self.UID_ADMIN_DB:
|
||||||
|
if record.uid == uidornickname:
|
||||||
|
uid = record.uid
|
||||||
|
if record.nickname == uidornickname:
|
||||||
|
uid = record.uid
|
||||||
|
|
||||||
|
self.log.debug(f'The UID that you are looking for {uidornickname} has been found {uid}')
|
||||||
|
return uid
|
||||||
|
|
||||||
|
def get_nickname(self, uidornickname:str) -> Union[str, None]:
|
||||||
|
|
||||||
|
nickname = None
|
||||||
|
for record in self.UID_ADMIN_DB:
|
||||||
|
if record.nickname == uidornickname:
|
||||||
|
nickname = record.nickname
|
||||||
|
if record.uid == uidornickname:
|
||||||
|
nickname = record.nickname
|
||||||
|
self.log.debug(f'The value {uidornickname} -- {nickname}')
|
||||||
|
return nickname
|
||||||
|
|
||||||
|
class Channel:
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ChannelModel:
|
||||||
|
name: str
|
||||||
|
mode: str
|
||||||
|
uids: list
|
||||||
|
|
||||||
|
UID_CHANNEL_DB: list[ChannelModel] = []
|
||||||
|
|
||||||
|
def __init__(self, Base: Base) -> None:
|
||||||
|
self.log = Base.logs
|
||||||
|
self.Base = Base
|
||||||
|
pass
|
||||||
|
|
||||||
|
def insert(self, newChan: ChannelModel) -> bool:
|
||||||
|
|
||||||
|
result = False
|
||||||
|
exist = False
|
||||||
|
|
||||||
|
for record in self.UID_CHANNEL_DB:
|
||||||
|
if record.name == newChan.name:
|
||||||
|
exist = True
|
||||||
|
self.log.debug(f'{record.name} already exist')
|
||||||
|
|
||||||
|
for user in newChan.uids:
|
||||||
|
record.uids.append(user)
|
||||||
|
|
||||||
|
# Supprimer les doublons
|
||||||
|
del_duplicates = list(set(record.uids))
|
||||||
|
record.uids = del_duplicates
|
||||||
|
self.log.debug(f'Updating a new UID to the channel {record}')
|
||||||
|
|
||||||
|
|
||||||
|
if not exist:
|
||||||
|
self.UID_CHANNEL_DB.append(newChan)
|
||||||
|
result = True
|
||||||
|
self.log.debug(f'New Channel Created: ({newChan})')
|
||||||
|
|
||||||
|
if not result:
|
||||||
|
self.log.critical(f'The Channel Object was not inserted {newChan}')
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def update(self, name: str, newMode: str) -> bool:
|
||||||
|
|
||||||
|
result = False
|
||||||
|
|
||||||
|
for record in self.UID_CHANNEL_DB:
|
||||||
|
if record.name == name:
|
||||||
|
record.mode = newMode
|
||||||
|
result = True
|
||||||
|
self.log.debug(f'Mode ({record.name}) has been updated with new mode {newMode}')
|
||||||
|
|
||||||
|
if not result:
|
||||||
|
self.log.critical(f'The channel mode {newMode} was not updated, name = {name}')
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def delete(self, name: str) -> bool:
|
||||||
|
|
||||||
|
result = False
|
||||||
|
|
||||||
|
for record in self.UID_CHANNEL_DB:
|
||||||
|
if record.name == name:
|
||||||
|
self.UID_CHANNEL_DB.remove(record)
|
||||||
|
result = True
|
||||||
|
self.log.debug(f'Channel ({record.name}) has been created')
|
||||||
|
|
||||||
|
if not result:
|
||||||
|
self.log.critical(f'The Channel {name} was not deleted')
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def delete_user_from_channel(self, chan_name: str, uid:str) -> bool:
|
||||||
|
try:
|
||||||
|
result = False
|
||||||
|
|
||||||
|
for record in self.UID_CHANNEL_DB:
|
||||||
|
if record.name == chan_name:
|
||||||
|
for user_id in record.uids:
|
||||||
|
if self.Base.clean_uid(user_id) == uid:
|
||||||
|
record.uids.remove(user_id)
|
||||||
|
self.log.debug(f'uid {uid} has been removed, here is the new object: {record}')
|
||||||
|
result = True
|
||||||
|
|
||||||
|
for record in self.UID_CHANNEL_DB:
|
||||||
|
if not record.uids:
|
||||||
|
self.UID_CHANNEL_DB.remove(record)
|
||||||
|
self.log.debug(f'Channel {record.name} has been removed, here is the new object: {record}')
|
||||||
|
|
||||||
|
return result
|
||||||
|
except ValueError as ve:
|
||||||
|
self.log.error(f'{ve}')
|
||||||
|
|
||||||
|
def get_Channel(self, name: str) -> Union[ChannelModel, None]:
|
||||||
|
|
||||||
|
Channel = None
|
||||||
|
for record in self.UID_CHANNEL_DB:
|
||||||
|
if record.name == name:
|
||||||
|
Channel = record
|
||||||
|
|
||||||
|
self.log.debug(f'Search {name} -- result = {Channel}')
|
||||||
|
|
||||||
|
return Channel
|
||||||
|
|
||||||
|
def get_mode(self, name:str) -> Union[str, None]:
|
||||||
|
|
||||||
|
mode = None
|
||||||
|
for record in self.UID_CHANNEL_DB:
|
||||||
|
if record.name == name:
|
||||||
|
mode = record.mode
|
||||||
|
|
||||||
|
self.log.debug(f'The mode of the channel {name} has been found: {mode}')
|
||||||
|
return mode
|
||||||
121
core/base.py
121
core/base.py
@@ -1,31 +1,22 @@
|
|||||||
import time, threading, os, random, socket, hashlib, ipaddress, logging, requests, json, sys
|
import time, threading, os, random, socket, hashlib, ipaddress, logging, requests, json, re
|
||||||
|
from typing import Union
|
||||||
|
from base64 import b64decode
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from sqlalchemy import create_engine, Engine, Connection, CursorResult
|
from sqlalchemy import create_engine, Engine, Connection, CursorResult
|
||||||
from sqlalchemy.sql import text
|
from sqlalchemy.sql import text
|
||||||
from core.loadConf import Config
|
from core.loadConf import ConfigDataModel
|
||||||
|
|
||||||
class Base:
|
class Base:
|
||||||
|
|
||||||
CORE_DB_PATH = 'core' + os.sep + 'db' + os.sep # Le dossier bases de données core
|
CORE_DB_PATH = 'core' + os.sep + 'db' + os.sep # Le dossier bases de données core
|
||||||
MODS_DB_PATH = 'mods' + os.sep + 'db' + os.sep # Le dossier bases de données des modules
|
MODS_DB_PATH = 'mods' + os.sep + 'db' + os.sep # Le dossier bases de données des modules
|
||||||
PYTHON_MIN_VERSION = '3.10' # Version min de python
|
PYTHON_MIN_VERSION = '3.10' # Version min de python
|
||||||
DB_SCHEMA:list[str] = {
|
|
||||||
'admins': 'sys_admins',
|
|
||||||
'commandes': 'sys_commandes',
|
|
||||||
'logs': 'sys_logs',
|
|
||||||
'modules': 'sys_modules'
|
|
||||||
}
|
|
||||||
|
|
||||||
DEFENDER_VERSION = '' # MAJOR.MINOR.BATCH
|
def __init__(self, Config: ConfigDataModel) -> None:
|
||||||
LATEST_DEFENDER_VERSION = '' # Latest Version of Defender in git
|
|
||||||
DEFENDER_DB_PATH = 'db' + os.sep # Séparateur en fonction de l'OS
|
|
||||||
DEFENDER_DB_NAME = 'defender' # Le nom de la base de données principale
|
|
||||||
|
|
||||||
def __init__(self, Config: Config) -> None:
|
|
||||||
|
|
||||||
self.Config = Config # Assigner l'objet de configuration
|
self.Config = Config # Assigner l'objet de configuration
|
||||||
self.init_log_system() # Demarrer le systeme de log
|
self.init_log_system() # Demarrer le systeme de log
|
||||||
self.check_for_new_version() # Verifier si une nouvelle version est disponible
|
self.check_for_new_version(True) # Verifier si une nouvelle version est disponible
|
||||||
|
|
||||||
self.running_timers:list[threading.Timer] = [] # Liste des timers en cours
|
self.running_timers:list[threading.Timer] = [] # Liste des timers en cours
|
||||||
self.running_threads:list[threading.Thread] = [] # Liste des threads en cours
|
self.running_threads:list[threading.Thread] = [] # Liste des threads en cours
|
||||||
@@ -48,12 +39,15 @@ class Base:
|
|||||||
with open(version_filename, 'r') as version_data:
|
with open(version_filename, 'r') as version_data:
|
||||||
current_version:dict[str, str] = json.load(version_data)
|
current_version:dict[str, str] = json.load(version_data)
|
||||||
|
|
||||||
self.DEFENDER_VERSION = current_version["version"]
|
# self.DEFENDER_VERSION = current_version["version"]
|
||||||
|
self.Config.current_version = current_version['version']
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def __get_latest_defender_version(self) -> None:
|
def __get_latest_defender_version(self) -> None:
|
||||||
try:
|
try:
|
||||||
|
self.logs.debug(f'Looking for a new version available on Github')
|
||||||
|
print(f'===> Looking for a new version available on Github')
|
||||||
token = ''
|
token = ''
|
||||||
json_url = f'https://raw.githubusercontent.com/adator85/IRC_DEFENDER_MODULES/main/version.json'
|
json_url = f'https://raw.githubusercontent.com/adator85/IRC_DEFENDER_MODULES/main/version.json'
|
||||||
headers = {
|
headers = {
|
||||||
@@ -68,7 +62,8 @@ class Base:
|
|||||||
|
|
||||||
response.raise_for_status() # Vérifie si la requête a réussi
|
response.raise_for_status() # Vérifie si la requête a réussi
|
||||||
json_response:dict = response.json()
|
json_response:dict = response.json()
|
||||||
self.LATEST_DEFENDER_VERSION = json_response["version"]
|
# self.LATEST_DEFENDER_VERSION = json_response["version"]
|
||||||
|
self.Config.latest_version = json_response['version']
|
||||||
|
|
||||||
return None
|
return None
|
||||||
except requests.HTTPError as err:
|
except requests.HTTPError as err:
|
||||||
@@ -76,16 +71,20 @@ class Base:
|
|||||||
except:
|
except:
|
||||||
self.logs.warning(f'Github not available to fetch latest version')
|
self.logs.warning(f'Github not available to fetch latest version')
|
||||||
|
|
||||||
def check_for_new_version(self) -> bool:
|
def check_for_new_version(self, online:bool) -> bool:
|
||||||
try:
|
try:
|
||||||
|
self.logs.debug(f'Checking for a new service version')
|
||||||
|
|
||||||
# Assigner la version actuelle de Defender
|
# Assigner la version actuelle de Defender
|
||||||
self.__set_current_defender_version()
|
self.__set_current_defender_version()
|
||||||
# Récuperer la dernier version disponible dans github
|
# Récuperer la dernier version disponible dans github
|
||||||
self.__get_latest_defender_version()
|
if online:
|
||||||
|
self.logs.debug(f'Retrieve the latest version from Github')
|
||||||
|
self.__get_latest_defender_version()
|
||||||
|
|
||||||
isNewVersion = False
|
isNewVersion = False
|
||||||
latest_version = self.LATEST_DEFENDER_VERSION
|
latest_version = self.Config.latest_version
|
||||||
current_version = self.DEFENDER_VERSION
|
current_version = self.Config.current_version
|
||||||
|
|
||||||
curr_major , curr_minor, curr_patch = current_version.split('.')
|
curr_major , curr_minor, curr_patch = current_version.split('.')
|
||||||
last_major, last_minor, last_patch = latest_version.split('.')
|
last_major, last_minor, last_patch = latest_version.split('.')
|
||||||
@@ -130,7 +129,7 @@ class Base:
|
|||||||
Returns:
|
Returns:
|
||||||
None: Aucun retour
|
None: Aucun retour
|
||||||
"""
|
"""
|
||||||
sql_insert = f"INSERT INTO {self.DB_SCHEMA['logs']} (datetime, server_msg) VALUES (:datetime, :server_msg)"
|
sql_insert = f"INSERT INTO {self.Config.table_log} (datetime, server_msg) VALUES (:datetime, :server_msg)"
|
||||||
mes_donnees = {'datetime': str(self.get_datetime()),'server_msg': f'{log_message}'}
|
mes_donnees = {'datetime': str(self.get_datetime()),'server_msg': f'{log_message}'}
|
||||||
self.db_execute_query(sql_insert, mes_donnees)
|
self.db_execute_query(sql_insert, mes_donnees)
|
||||||
|
|
||||||
@@ -166,7 +165,7 @@ class Base:
|
|||||||
cmd_list[2] = '*******'
|
cmd_list[2] = '*******'
|
||||||
cmd = ' '.join(cmd_list)
|
cmd = ' '.join(cmd_list)
|
||||||
|
|
||||||
insert_cmd_query = f"INSERT INTO {self.DB_SCHEMA['commandes']} (datetime, user, commande) VALUES (:datetime, :user, :commande)"
|
insert_cmd_query = f"INSERT INTO {self.Config.table_commande} (datetime, user, commande) VALUES (:datetime, :user, :commande)"
|
||||||
mes_donnees = {'datetime': self.get_datetime(), 'user': user_cmd, 'commande': cmd}
|
mes_donnees = {'datetime': self.get_datetime(), 'user': user_cmd, 'commande': cmd}
|
||||||
self.db_execute_query(insert_cmd_query, mes_donnees)
|
self.db_execute_query(insert_cmd_query, mes_donnees)
|
||||||
|
|
||||||
@@ -181,7 +180,7 @@ class Base:
|
|||||||
Returns:
|
Returns:
|
||||||
bool: True si le module existe déja dans la base de données sinon False
|
bool: True si le module existe déja dans la base de données sinon False
|
||||||
"""
|
"""
|
||||||
query = f"SELECT id FROM {self.DB_SCHEMA['modules']} WHERE module = :module"
|
query = f"SELECT id FROM {self.Config.table_module} WHERE module = :module"
|
||||||
mes_donnes = {'module': module_name}
|
mes_donnes = {'module': module_name}
|
||||||
results = self.db_execute_query(query, mes_donnes)
|
results = self.db_execute_query(query, mes_donnes)
|
||||||
|
|
||||||
@@ -199,7 +198,7 @@ class Base:
|
|||||||
|
|
||||||
if not self.db_isModuleExist(module_name):
|
if not self.db_isModuleExist(module_name):
|
||||||
self.logs.debug(f"Le module {module_name} n'existe pas alors ont le créer")
|
self.logs.debug(f"Le module {module_name} n'existe pas alors ont le créer")
|
||||||
insert_cmd_query = f"INSERT INTO {self.DB_SCHEMA['modules']} (datetime, user, module) VALUES (:datetime, :user, :module)"
|
insert_cmd_query = f"INSERT INTO {self.Config.table_module} (datetime, user, module) VALUES (:datetime, :user, :module)"
|
||||||
mes_donnees = {'datetime': self.get_datetime(), 'user': user_cmd, 'module': module_name}
|
mes_donnees = {'datetime': self.get_datetime(), 'user': user_cmd, 'module': module_name}
|
||||||
self.db_execute_query(insert_cmd_query, mes_donnees)
|
self.db_execute_query(insert_cmd_query, mes_donnees)
|
||||||
# self.db_close_session(self.session)
|
# self.db_close_session(self.session)
|
||||||
@@ -214,7 +213,7 @@ class Base:
|
|||||||
Args:
|
Args:
|
||||||
cmd (str): le module a enregistrer
|
cmd (str): le module a enregistrer
|
||||||
"""
|
"""
|
||||||
insert_cmd_query = f"DELETE FROM {self.DB_SCHEMA['modules']} WHERE module = :module"
|
insert_cmd_query = f"DELETE FROM {self.Config.table_module} WHERE module = :module"
|
||||||
mes_donnees = {'module': module_name}
|
mes_donnees = {'module': module_name}
|
||||||
self.db_execute_query(insert_cmd_query, mes_donnees)
|
self.db_execute_query(insert_cmd_query, mes_donnees)
|
||||||
|
|
||||||
@@ -222,14 +221,20 @@ class Base:
|
|||||||
|
|
||||||
def db_create_first_admin(self) -> None:
|
def db_create_first_admin(self) -> None:
|
||||||
|
|
||||||
user = self.db_execute_query(f"SELECT id FROM {self.DB_SCHEMA['admins']}")
|
user = self.db_execute_query(f"SELECT id FROM {self.Config.table_admin}")
|
||||||
if not user.fetchall():
|
if not user.fetchall():
|
||||||
admin = self.Config.OWNER
|
admin = self.Config.OWNER
|
||||||
password = self.crypt_password(self.Config.PASSWORD)
|
password = self.crypt_password(self.Config.PASSWORD)
|
||||||
|
|
||||||
mes_donnees = {'createdOn': self.get_datetime(), 'user': admin, 'password': password, 'hostname': '*', 'vhost': '*', 'level': 5}
|
mes_donnees = {'createdOn': self.get_datetime(),
|
||||||
|
'user': admin,
|
||||||
|
'password': password,
|
||||||
|
'hostname': '*',
|
||||||
|
'vhost': '*',
|
||||||
|
'level': 5
|
||||||
|
}
|
||||||
self.db_execute_query(f"""
|
self.db_execute_query(f"""
|
||||||
INSERT INTO {self.DB_SCHEMA['admins']}
|
INSERT INTO {self.Config.table_admin}
|
||||||
(createdOn, user, password, hostname, vhost, level)
|
(createdOn, user, password, hostname, vhost, level)
|
||||||
VALUES
|
VALUES
|
||||||
(:createdOn, :user, :password, :hostname, :vhost, :level)"""
|
(:createdOn, :user, :password, :hostname, :vhost, :level)"""
|
||||||
@@ -348,8 +353,8 @@ class Base:
|
|||||||
|
|
||||||
def db_init(self) -> tuple[Engine, Connection]:
|
def db_init(self) -> tuple[Engine, Connection]:
|
||||||
|
|
||||||
db_directory = self.DEFENDER_DB_PATH
|
db_directory = self.Config.db_path
|
||||||
full_path_db = self.DEFENDER_DB_PATH + self.DEFENDER_DB_NAME
|
full_path_db = self.Config.db_path + self.Config.db_name
|
||||||
|
|
||||||
if not os.path.exists(db_directory):
|
if not os.path.exists(db_directory):
|
||||||
os.makedirs(db_directory)
|
os.makedirs(db_directory)
|
||||||
@@ -361,14 +366,14 @@ class Base:
|
|||||||
|
|
||||||
def __create_db(self) -> None:
|
def __create_db(self) -> None:
|
||||||
|
|
||||||
table_logs = f'''CREATE TABLE IF NOT EXISTS {self.DB_SCHEMA['logs']} (
|
table_logs = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_log} (
|
||||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
datetime TEXT,
|
datetime TEXT,
|
||||||
server_msg TEXT
|
server_msg TEXT
|
||||||
)
|
)
|
||||||
'''
|
'''
|
||||||
|
|
||||||
table_cmds = f'''CREATE TABLE IF NOT EXISTS {self.DB_SCHEMA['commandes']} (
|
table_cmds = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_commande} (
|
||||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
datetime TEXT,
|
datetime TEXT,
|
||||||
user TEXT,
|
user TEXT,
|
||||||
@@ -376,7 +381,7 @@ class Base:
|
|||||||
)
|
)
|
||||||
'''
|
'''
|
||||||
|
|
||||||
table_modules = f'''CREATE TABLE IF NOT EXISTS {self.DB_SCHEMA['modules']} (
|
table_modules = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_module} (
|
||||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
datetime TEXT,
|
datetime TEXT,
|
||||||
user TEXT,
|
user TEXT,
|
||||||
@@ -384,7 +389,7 @@ class Base:
|
|||||||
)
|
)
|
||||||
'''
|
'''
|
||||||
|
|
||||||
table_admins = f'''CREATE TABLE IF NOT EXISTS {self.DB_SCHEMA['admins']} (
|
table_admins = f'''CREATE TABLE IF NOT EXISTS {self.Config.table_admin} (
|
||||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
createdOn TEXT,
|
createdOn TEXT,
|
||||||
user TEXT,
|
user TEXT,
|
||||||
@@ -464,6 +469,17 @@ class Base:
|
|||||||
except ValueError:
|
except ValueError:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
def decode_ip(self, ip_b64encoded: str) -> Union[str, None]:
|
||||||
|
|
||||||
|
binary_ip = b64decode(ip_b64encoded)
|
||||||
|
try:
|
||||||
|
decoded_ip = ipaddress.ip_address(binary_ip)
|
||||||
|
|
||||||
|
return decoded_ip.exploded
|
||||||
|
except ValueError as ve:
|
||||||
|
self.logs.critical(f'This remote ip is not valid : {ve}')
|
||||||
|
return None
|
||||||
|
|
||||||
def get_random(self, lenght:int) -> str:
|
def get_random(self, lenght:int) -> str:
|
||||||
"""
|
"""
|
||||||
Retourn une chaîne aléatoire en fonction de la longueur spécifiée.
|
Retourn une chaîne aléatoire en fonction de la longueur spécifiée.
|
||||||
@@ -491,3 +507,36 @@ class Base:
|
|||||||
|
|
||||||
# Vider le dictionnaire de fonction
|
# Vider le dictionnaire de fonction
|
||||||
self.periodic_func.clear()
|
self.periodic_func.clear()
|
||||||
|
|
||||||
|
def clean_uid(self, uid:str) -> str:
|
||||||
|
"""Clean UID by removing @ / % / + / Owner / and *
|
||||||
|
|
||||||
|
Args:
|
||||||
|
uid (str): The UID to clean
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: Clean UID without any sign
|
||||||
|
"""
|
||||||
|
|
||||||
|
pattern = fr'[@|%|\+|~|\*]*'
|
||||||
|
parsed_UID = re.sub(pattern, '', uid)
|
||||||
|
|
||||||
|
return parsed_UID
|
||||||
|
|
||||||
|
def Is_Channel(self, channelToCheck: str) -> bool:
|
||||||
|
"""Check if the string has the # caractere and return True if this is a channel
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channelToCheck (str): The string to test if it is a channel or not
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if the string is a channel / False if this is not a channel
|
||||||
|
"""
|
||||||
|
|
||||||
|
pattern = fr'^#'
|
||||||
|
isChannel = re.findall(pattern, channelToCheck)
|
||||||
|
|
||||||
|
if not isChannel:
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
return True
|
||||||
274
core/dataClass.py
Normal file
274
core/dataClass.py
Normal file
@@ -0,0 +1,274 @@
|
|||||||
|
from dataclasses import dataclass, field
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import Union
|
||||||
|
|
||||||
|
class User:
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class UserDataClass:
|
||||||
|
uid: str
|
||||||
|
nickname: str
|
||||||
|
username: str
|
||||||
|
hostname: str
|
||||||
|
umodes: str
|
||||||
|
vhost: str
|
||||||
|
isWebirc: bool
|
||||||
|
connexion_datetime: datetime = field(default=datetime.now())
|
||||||
|
|
||||||
|
UID_DB:list[UserDataClass] = []
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def insert(self, user: UserDataClass) -> bool:
|
||||||
|
"""Insert new user
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user (UserDataClass): The User dataclass
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if the record has been created
|
||||||
|
"""
|
||||||
|
exists = False
|
||||||
|
inserted = False
|
||||||
|
|
||||||
|
for record in self.UID_DB:
|
||||||
|
if record.uid == user.uid:
|
||||||
|
exists = True
|
||||||
|
print(f'{user.uid} already exist')
|
||||||
|
|
||||||
|
if not exists:
|
||||||
|
self.UID_DB.append(user)
|
||||||
|
print(f'New record with uid: {user.uid}')
|
||||||
|
inserted = True
|
||||||
|
|
||||||
|
return inserted
|
||||||
|
|
||||||
|
def update(self, uid: str, newnickname: str) -> bool:
|
||||||
|
"""Updating a single record with a new nickname
|
||||||
|
|
||||||
|
Args:
|
||||||
|
uid (str): the uid of the user
|
||||||
|
newnickname (str): the new nickname
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if the record has been updated
|
||||||
|
"""
|
||||||
|
status = False
|
||||||
|
for user in self.UID_DB:
|
||||||
|
if user.uid == uid:
|
||||||
|
user.nickname = newnickname
|
||||||
|
status = True
|
||||||
|
print(f'Updating record with uid: {uid}')
|
||||||
|
|
||||||
|
return status
|
||||||
|
|
||||||
|
def delete(self, uid: str) -> bool:
|
||||||
|
"""Delete a user based on his uid
|
||||||
|
|
||||||
|
Args:
|
||||||
|
uid (str): The UID of the user
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if the record has been deleted
|
||||||
|
"""
|
||||||
|
status = False
|
||||||
|
for user in self.UID_DB:
|
||||||
|
if user.uid == uid:
|
||||||
|
self.UID_DB.remove(user)
|
||||||
|
status = True
|
||||||
|
print(f'Removing record with uid: {uid}')
|
||||||
|
|
||||||
|
return status
|
||||||
|
|
||||||
|
def isexist(self, uidornickname:str) -> bool:
|
||||||
|
"""do the UID or Nickname exist ?
|
||||||
|
|
||||||
|
Args:
|
||||||
|
uidornickname (str): The UID or the Nickname
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if exist or False if don't exist
|
||||||
|
"""
|
||||||
|
result = False
|
||||||
|
for record in self.UID_DB:
|
||||||
|
if record.uid == uidornickname:
|
||||||
|
result = True
|
||||||
|
if record.nickname == uidornickname:
|
||||||
|
result = True
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def get_User(self, uidornickname) -> Union[UserDataClass, None]:
|
||||||
|
|
||||||
|
UserObject = None
|
||||||
|
for record in self.UID_DB:
|
||||||
|
if record.uid == uidornickname:
|
||||||
|
UserObject = record
|
||||||
|
elif record.nickname == uidornickname:
|
||||||
|
UserObject = record
|
||||||
|
|
||||||
|
return UserObject
|
||||||
|
|
||||||
|
def get_uid(self, uidornickname:str) -> Union[str, None]:
|
||||||
|
|
||||||
|
uid = None
|
||||||
|
for record in self.UID_DB:
|
||||||
|
if record.uid == uidornickname:
|
||||||
|
uid = record.uid
|
||||||
|
if record.nickname == uidornickname:
|
||||||
|
uid = record.uid
|
||||||
|
|
||||||
|
return uid
|
||||||
|
|
||||||
|
def get_nickname(self, uidornickname:str) -> Union[str, None]:
|
||||||
|
|
||||||
|
nickname = None
|
||||||
|
for record in self.UID_DB:
|
||||||
|
if record.nickname == uidornickname:
|
||||||
|
nickname = record.nickname
|
||||||
|
if record.uid == uidornickname:
|
||||||
|
nickname = record.nickname
|
||||||
|
|
||||||
|
return nickname
|
||||||
|
|
||||||
|
class Admin:
|
||||||
|
@dataclass
|
||||||
|
class AdminDataClass:
|
||||||
|
uid: str
|
||||||
|
nickname: str
|
||||||
|
username: str
|
||||||
|
hostname: str
|
||||||
|
umodes: str
|
||||||
|
vhost: str
|
||||||
|
level: int
|
||||||
|
connexion_datetime: datetime = field(default=datetime.now())
|
||||||
|
|
||||||
|
UID_ADMIN_DB:list[AdminDataClass] = []
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def insert(self, admin: AdminDataClass) -> bool:
|
||||||
|
"""Insert new user
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user (UserDataClass): The User dataclass
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if the record has been created
|
||||||
|
"""
|
||||||
|
exists = False
|
||||||
|
inserted = False
|
||||||
|
|
||||||
|
for record in self.UID_ADMIN_DB:
|
||||||
|
if record.uid == admin.uid:
|
||||||
|
exists = True
|
||||||
|
print(f'{admin.uid} already exist')
|
||||||
|
|
||||||
|
if not exists:
|
||||||
|
self.UID_ADMIN_DB.append(admin)
|
||||||
|
print(f'New record with uid: {admin.uid}')
|
||||||
|
inserted = True
|
||||||
|
|
||||||
|
return inserted
|
||||||
|
|
||||||
|
def update(self, uid: str, newnickname: str) -> bool:
|
||||||
|
"""Updating a single record with a new nickname
|
||||||
|
|
||||||
|
Args:
|
||||||
|
uid (str): the uid of the user
|
||||||
|
newnickname (str): the new nickname
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if the record has been updated
|
||||||
|
"""
|
||||||
|
status = False
|
||||||
|
for admin in self.UID_ADMIN_DB:
|
||||||
|
if admin.uid == uid:
|
||||||
|
admin.nickname = newnickname
|
||||||
|
status = True
|
||||||
|
print(f'Updating record with uid: {uid}')
|
||||||
|
|
||||||
|
return status
|
||||||
|
|
||||||
|
def delete(self, uid: str) -> bool:
|
||||||
|
"""Delete a user based on his uid
|
||||||
|
|
||||||
|
Args:
|
||||||
|
uid (str): The UID of the user
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if the record has been deleted
|
||||||
|
"""
|
||||||
|
status = False
|
||||||
|
for admin in self.UID_ADMIN_DB:
|
||||||
|
if admin.uid == uid:
|
||||||
|
self.UID_ADMIN_DB.remove(admin)
|
||||||
|
status = True
|
||||||
|
print(f'Removing record with uid: {uid}')
|
||||||
|
|
||||||
|
return status
|
||||||
|
|
||||||
|
def isexist(self, uidornickname:str) -> bool:
|
||||||
|
"""do the UID or Nickname exist ?
|
||||||
|
|
||||||
|
Args:
|
||||||
|
uidornickname (str): The UID or the Nickname
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if exist or False if don't exist
|
||||||
|
"""
|
||||||
|
result = False
|
||||||
|
for record in self.UID_ADMIN_DB:
|
||||||
|
if record.uid == uidornickname:
|
||||||
|
result = True
|
||||||
|
if record.nickname == uidornickname:
|
||||||
|
result = True
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def get_Admin(self, uidornickname) -> Union[AdminDataClass, None]:
|
||||||
|
|
||||||
|
AdminObject = None
|
||||||
|
for record in self.UID_ADMIN_DB:
|
||||||
|
if record.uid == uidornickname:
|
||||||
|
AdminObject = record
|
||||||
|
elif record.nickname == uidornickname:
|
||||||
|
AdminObject = record
|
||||||
|
|
||||||
|
return AdminObject
|
||||||
|
|
||||||
|
def get_uid(self, uidornickname:str) -> Union[str, None]:
|
||||||
|
|
||||||
|
uid = None
|
||||||
|
for record in self.UID_ADMIN_DB:
|
||||||
|
if record.uid == uidornickname:
|
||||||
|
uid = record.uid
|
||||||
|
if record.nickname == uidornickname:
|
||||||
|
uid = record.uid
|
||||||
|
|
||||||
|
return uid
|
||||||
|
|
||||||
|
def get_nickname(self, uidornickname:str) -> Union[str, None]:
|
||||||
|
|
||||||
|
nickname = None
|
||||||
|
for record in self.UID_ADMIN_DB:
|
||||||
|
if record.nickname == uidornickname:
|
||||||
|
nickname = record.nickname
|
||||||
|
if record.uid == uidornickname:
|
||||||
|
nickname = record.nickname
|
||||||
|
|
||||||
|
return nickname
|
||||||
|
|
||||||
|
def get_level(self, uidornickname:str) -> int:
|
||||||
|
|
||||||
|
level = 0
|
||||||
|
for record in self.UID_ADMIN_DB:
|
||||||
|
if record.uid == uidornickname:
|
||||||
|
level = record.level
|
||||||
|
if record.nickname == uidornickname:
|
||||||
|
level = record.level
|
||||||
|
|
||||||
|
return level
|
||||||
|
|
||||||
478
core/irc.py
478
core/irc.py
@@ -3,6 +3,7 @@ from ssl import SSLSocket
|
|||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from typing import Union
|
from typing import Union
|
||||||
from core.loadConf import Config
|
from core.loadConf import Config
|
||||||
|
from core.Model import User, Admin, Channel
|
||||||
from core.base import Base
|
from core.base import Base
|
||||||
|
|
||||||
class Irc:
|
class Irc:
|
||||||
@@ -10,8 +11,7 @@ class Irc:
|
|||||||
def __init__(self) -> 'Irc':
|
def __init__(self) -> 'Irc':
|
||||||
|
|
||||||
self.defender_connexion_datetime = datetime.now() # Date et heure de la premiere connexion de Defender
|
self.defender_connexion_datetime = datetime.now() # Date et heure de la premiere connexion de Defender
|
||||||
self.db_uid = {} # Definir la variable qui contiendra la liste des utilisateurs connectés au réseau
|
self.first_score: int = 100
|
||||||
self.db_admin = {} # Definir la variable qui contiendra la liste des administrateurs
|
|
||||||
self.db_chan = [] # Definir la variable qui contiendra la liste des salons
|
self.db_chan = [] # Definir la variable qui contiendra la liste des salons
|
||||||
self.loaded_classes:dict[str, 'Irc'] = {} # Definir la variable qui contiendra la liste modules chargés
|
self.loaded_classes:dict[str, 'Irc'] = {} # Definir la variable qui contiendra la liste modules chargés
|
||||||
self.beat = 30 # Lancer toutes les 30 secondes des actions de nettoyages
|
self.beat = 30 # Lancer toutes les 30 secondes des actions de nettoyages
|
||||||
@@ -24,13 +24,13 @@ class Irc:
|
|||||||
self.CHARSET = ['utf-8', 'iso-8859-1'] # Charset utiliser pour décoder/encoder les messages
|
self.CHARSET = ['utf-8', 'iso-8859-1'] # Charset utiliser pour décoder/encoder les messages
|
||||||
self.SSL_VERSION = None # Version SSL
|
self.SSL_VERSION = None # Version SSL
|
||||||
|
|
||||||
self.Config = Config().ConfigModel
|
self.Config = Config().ConfigObject
|
||||||
|
|
||||||
# Liste des commandes internes du bot
|
# Liste des commandes internes du bot
|
||||||
self.commands_level = {
|
self.commands_level = {
|
||||||
0: ['help', 'auth', 'copyright'],
|
0: ['help', 'auth', 'copyright'],
|
||||||
1: ['load','reload','unload', 'deauth', 'uptime', 'checkversion'],
|
1: ['load','reload','unload', 'deauth', 'uptime', 'checkversion'],
|
||||||
2: ['show_modules', 'show_timers', 'show_threads', 'sentinel'],
|
2: ['show_modules', 'show_timers', 'show_threads', 'show_channels'],
|
||||||
3: ['quit', 'restart','addaccess','editaccess', 'delaccess']
|
3: ['quit', 'restart','addaccess','editaccess', 'delaccess']
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -41,12 +41,20 @@ class Irc:
|
|||||||
self.commands.append(command)
|
self.commands.append(command)
|
||||||
|
|
||||||
self.Base = Base(self.Config)
|
self.Base = Base(self.Config)
|
||||||
|
self.User = User(self.Base)
|
||||||
|
self.Admin = Admin(self.Base)
|
||||||
|
self.Channel = Channel(self.Base)
|
||||||
self.Base.create_thread(func=self.heartbeat, func_args=(self.beat, ))
|
self.Base.create_thread(func=self.heartbeat, func_args=(self.beat, ))
|
||||||
|
|
||||||
##############################################
|
##############################################
|
||||||
# CONNEXION IRC #
|
# CONNEXION IRC #
|
||||||
##############################################
|
##############################################
|
||||||
def init_irc(self, ircInstance:'Irc') -> None:
|
def init_irc(self, ircInstance:'Irc') -> None:
|
||||||
|
"""Create a socket and connect to irc server
|
||||||
|
|
||||||
|
Args:
|
||||||
|
ircInstance (Irc): Instance of Irc object.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
self.__create_socket()
|
self.__create_socket()
|
||||||
self.__connect_to_irc(ircInstance)
|
self.__connect_to_irc(ircInstance)
|
||||||
@@ -54,7 +62,8 @@ class Irc:
|
|||||||
self.Base.logs.critical(f'Assertion error: {ae}')
|
self.Base.logs.critical(f'Assertion error: {ae}')
|
||||||
|
|
||||||
def __create_socket(self) -> None:
|
def __create_socket(self) -> None:
|
||||||
|
"""Create a socket to connect SSL or Normal connection
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM or socket.SOCK_NONBLOCK)
|
soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM or socket.SOCK_NONBLOCK)
|
||||||
connexion_information = (self.Config.SERVEUR_IP, self.Config.SERVEUR_PORT)
|
connexion_information = (self.Config.SERVEUR_IP, self.Config.SERVEUR_PORT)
|
||||||
@@ -65,9 +74,8 @@ class Irc:
|
|||||||
ssl_connexion = ssl_context.wrap_socket(soc, server_hostname=self.Config.SERVEUR_HOSTNAME)
|
ssl_connexion = ssl_context.wrap_socket(soc, server_hostname=self.Config.SERVEUR_HOSTNAME)
|
||||||
ssl_connexion.connect(connexion_information)
|
ssl_connexion.connect(connexion_information)
|
||||||
self.IrcSocket:SSLSocket = ssl_connexion
|
self.IrcSocket:SSLSocket = ssl_connexion
|
||||||
|
|
||||||
self.Base.logs.info(f"Connexion en mode SSL : Version = {self.IrcSocket.version()}")
|
|
||||||
self.SSL_VERSION = self.IrcSocket.version()
|
self.SSL_VERSION = self.IrcSocket.version()
|
||||||
|
self.Base.logs.info(f"Connexion en mode SSL : Version = {self.SSL_VERSION}")
|
||||||
else:
|
else:
|
||||||
soc.connect(connexion_information)
|
soc.connect(connexion_information)
|
||||||
self.IrcSocket:socket.socket = soc
|
self.IrcSocket:socket.socket = soc
|
||||||
@@ -113,7 +121,7 @@ class Irc:
|
|||||||
|
|
||||||
# Reload configuration
|
# Reload configuration
|
||||||
self.Base.logs.debug('Reloading configuration')
|
self.Base.logs.debug('Reloading configuration')
|
||||||
self.Config = Config().ConfigModel
|
self.Config = Config().ConfigObject
|
||||||
self.Base = Base(self.Config)
|
self.Base = Base(self.Config)
|
||||||
|
|
||||||
self.__create_socket()
|
self.__create_socket()
|
||||||
@@ -162,7 +170,7 @@ class Irc:
|
|||||||
# except Exception as e:
|
# except Exception as e:
|
||||||
# self.debug(f"Exception: {e}")
|
# self.debug(f"Exception: {e}")
|
||||||
|
|
||||||
def __link(self, writer:socket.socket) -> None:
|
def __link(self, writer:Union[socket.socket, SSLSocket]) -> None:
|
||||||
"""Créer le link et envoyer les informations nécessaires pour la
|
"""Créer le link et envoyer les informations nécessaires pour la
|
||||||
connexion au serveur.
|
connexion au serveur.
|
||||||
|
|
||||||
@@ -186,7 +194,7 @@ class Irc:
|
|||||||
sid = self.Config.SERVEUR_ID
|
sid = self.Config.SERVEUR_ID
|
||||||
service_id = self.Config.SERVICE_ID
|
service_id = self.Config.SERVICE_ID
|
||||||
|
|
||||||
version = self.Base.DEFENDER_VERSION
|
version = self.Config.current_version
|
||||||
unixtime = self.Base.get_unixtime()
|
unixtime = self.Base.get_unixtime()
|
||||||
|
|
||||||
# Envoyer un message d'identification
|
# Envoyer un message d'identification
|
||||||
@@ -252,6 +260,10 @@ class Irc:
|
|||||||
except AssertionError as ae:
|
except AssertionError as ae:
|
||||||
self.Base.logs.error(f"Assertion error : {ae}")
|
self.Base.logs.error(f"Assertion error : {ae}")
|
||||||
|
|
||||||
|
def unload(self) -> None:
|
||||||
|
# This is only to reference the method
|
||||||
|
return None
|
||||||
|
|
||||||
##############################################
|
##############################################
|
||||||
# FIN CONNEXION IRC #
|
# FIN CONNEXION IRC #
|
||||||
##############################################
|
##############################################
|
||||||
@@ -262,7 +274,7 @@ class Irc:
|
|||||||
Returns:
|
Returns:
|
||||||
None: Aucun retour requis, elle charge puis c'est tout
|
None: Aucun retour requis, elle charge puis c'est tout
|
||||||
"""
|
"""
|
||||||
result = self.Base.db_execute_query(f"SELECT module FROM {self.Base.DB_SCHEMA['modules']}")
|
result = self.Base.db_execute_query(f"SELECT module FROM {self.Config.table_module}")
|
||||||
for r in result.fetchall():
|
for r in result.fetchall():
|
||||||
self.load_module('sys', r[0], True)
|
self.load_module('sys', r[0], True)
|
||||||
|
|
||||||
@@ -383,7 +395,15 @@ class Irc:
|
|||||||
|
|
||||||
my_class = getattr(loaded_module, class_name, None) # Récuperer le nom de classe
|
my_class = getattr(loaded_module, class_name, None) # Récuperer le nom de classe
|
||||||
create_instance_of_the_class = my_class(self.ircObject) # Créer une nouvelle instance de la classe
|
create_instance_of_the_class = my_class(self.ircObject) # Créer une nouvelle instance de la classe
|
||||||
self.loaded_classes[class_name] = create_instance_of_the_class # Charger la nouvelle class dans la variable globale
|
|
||||||
|
if not hasattr(create_instance_of_the_class, 'cmd'):
|
||||||
|
self.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} :Module {module_name} ne contient pas de méthode cmd")
|
||||||
|
self.Base.logs.critical(f"The Module {module_name} has not been loaded because cmd method is not available")
|
||||||
|
self.Base.db_delete_module(module_name)
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Charger la nouvelle class dans la variable globale
|
||||||
|
self.loaded_classes[class_name] = create_instance_of_the_class
|
||||||
|
|
||||||
# Enregistrer le module dans la base de données
|
# Enregistrer le module dans la base de données
|
||||||
if not init:
|
if not init:
|
||||||
@@ -399,149 +419,54 @@ class Irc:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.Base.logs.error(f"Something went wrong with a module you want to load : {e}")
|
self.Base.logs.error(f"Something went wrong with a module you want to load : {e}")
|
||||||
|
|
||||||
def insert_db_uid(self, uid:str, nickname:str, username:str, hostname:str, umodes:str, vhost:str, isWebirc: bool) -> None:
|
|
||||||
|
|
||||||
if uid in self.db_uid:
|
|
||||||
return None
|
|
||||||
|
|
||||||
self.db_uid[uid] = {
|
|
||||||
'nickname': nickname,
|
|
||||||
'username': username,
|
|
||||||
'hostname': hostname,
|
|
||||||
'umodes': umodes,
|
|
||||||
'vhost': vhost,
|
|
||||||
'isWebirc': isWebirc,
|
|
||||||
'datetime': datetime.now()
|
|
||||||
}
|
|
||||||
|
|
||||||
self.db_uid[nickname] = {
|
|
||||||
'uid': uid,
|
|
||||||
'username': username,
|
|
||||||
'hostname': hostname,
|
|
||||||
'umodes': umodes,
|
|
||||||
'vhost': vhost,
|
|
||||||
'isWebirc': isWebirc,
|
|
||||||
'datetime': datetime.now()
|
|
||||||
}
|
|
||||||
|
|
||||||
return None
|
|
||||||
|
|
||||||
def update_db_uid(self, uid:str, newnickname:str) -> None:
|
|
||||||
|
|
||||||
# Récupérer l'ancien nickname
|
|
||||||
oldnickname = self.db_uid[uid]['nickname']
|
|
||||||
|
|
||||||
# Enregistrement du nouveau nickname
|
|
||||||
self.db_uid[newnickname] = {
|
|
||||||
'uid': uid,
|
|
||||||
'username': self.db_uid[uid]['username'],
|
|
||||||
'hostname': self.db_uid[uid]['hostname'],
|
|
||||||
'umodes': self.db_uid[uid]['umodes'],
|
|
||||||
'vhost': self.db_uid[uid]['vhost']
|
|
||||||
}
|
|
||||||
|
|
||||||
# Modification du nickname dans la ligne UID
|
|
||||||
self.db_uid[uid]['nickname'] = newnickname
|
|
||||||
|
|
||||||
# Supprimer l'ancien nickname
|
|
||||||
if oldnickname in self.db_uid:
|
|
||||||
del self.db_uid[oldnickname]
|
|
||||||
else:
|
|
||||||
self.Base.logs.debug(f"L'ancien nickname {oldnickname} n'existe pas dans UID_DB")
|
|
||||||
response = False
|
|
||||||
|
|
||||||
self.Base.logs.debug(f"{oldnickname} changed to {newnickname}")
|
|
||||||
|
|
||||||
return None
|
|
||||||
|
|
||||||
def delete_db_uid(self, uid:str) -> None:
|
|
||||||
|
|
||||||
uid_reel = self.get_uid(uid)
|
|
||||||
nickname = self.get_nickname(uid_reel)
|
|
||||||
|
|
||||||
if uid_reel in self.db_uid:
|
|
||||||
del self.db_uid[uid]
|
|
||||||
|
|
||||||
if nickname in self.db_uid:
|
|
||||||
del self.db_uid[nickname]
|
|
||||||
|
|
||||||
return None
|
|
||||||
|
|
||||||
def insert_db_admin(self, uid:str, level:int) -> None:
|
def insert_db_admin(self, uid:str, level:int) -> None:
|
||||||
|
|
||||||
if not uid in self.db_uid:
|
if self.User.get_User(uid) is None:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
getUser = self.User.get_User(uid)
|
||||||
|
|
||||||
nickname = self.db_uid[uid]['nickname']
|
nickname = getUser.nickname
|
||||||
username = self.db_uid[uid]['username']
|
username = getUser.username
|
||||||
hostname = self.db_uid[uid]['hostname']
|
hostname = getUser.hostname
|
||||||
umodes = self.db_uid[uid]['umodes']
|
umodes = getUser.umodes
|
||||||
vhost = self.db_uid[uid]['vhost']
|
vhost = getUser.vhost
|
||||||
level = int(level)
|
level = int(level)
|
||||||
|
|
||||||
self.db_admin[uid] = {
|
self.Admin.insert(
|
||||||
'nickname': nickname,
|
self.Admin.AdminModel(
|
||||||
'username': username,
|
uid=uid,
|
||||||
'hostname': hostname,
|
nickname=nickname,
|
||||||
'umodes': umodes,
|
username=username,
|
||||||
'vhost': vhost,
|
hostname=hostname,
|
||||||
'datetime': self.Base.get_datetime(),
|
umodes=umodes,
|
||||||
'level': level
|
vhost=vhost,
|
||||||
}
|
level=level,
|
||||||
|
connexion_datetime=datetime.now()
|
||||||
self.db_admin[nickname] = {
|
)
|
||||||
'uid': uid,
|
)
|
||||||
'username': username,
|
|
||||||
'hostname': hostname,
|
|
||||||
'umodes': umodes,
|
|
||||||
'vhost': vhost,
|
|
||||||
'datetime': self.Base.get_datetime(),
|
|
||||||
'level': level
|
|
||||||
}
|
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def delete_db_admin(self, uid:str) -> None:
|
def delete_db_admin(self, uid:str) -> None:
|
||||||
|
|
||||||
if not uid in self.db_admin:
|
if self.Admin.get_Admin(uid) is None:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
nickname_admin = self.db_admin[uid]['nickname']
|
if not self.Admin.delete(uid):
|
||||||
|
self.Base.logs.critical(f'UID: {uid} was not deleted')
|
||||||
if uid in self.db_admin:
|
|
||||||
del self.db_admin[uid]
|
|
||||||
|
|
||||||
if nickname_admin in self.db_admin:
|
|
||||||
del self.db_admin[nickname_admin]
|
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def insert_db_chan(self, channel:str) -> bool:
|
|
||||||
"""Ajouter l'ensemble des salons dans la variable {CHAN_DB}
|
|
||||||
|
|
||||||
Args:
|
|
||||||
channel (str): le salon à insérer dans {CHAN_DB}
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
bool: True si insertion OK / False si insertion KO
|
|
||||||
"""
|
|
||||||
if channel in self.db_chan:
|
|
||||||
return False
|
|
||||||
|
|
||||||
response = True
|
|
||||||
# Ajouter un nouveau salon
|
|
||||||
self.db_chan.append(channel)
|
|
||||||
|
|
||||||
# Supprimer les doublons de la liste
|
|
||||||
self.db_chan = list(set(self.db_chan))
|
|
||||||
|
|
||||||
self.Base.logs.debug(f"Le salon {channel} a été ajouté à la liste CHAN_DB")
|
|
||||||
|
|
||||||
return response
|
|
||||||
|
|
||||||
def create_defender_user(self, nickname:str, level: int, password:str) -> str:
|
def create_defender_user(self, nickname:str, level: int, password:str) -> str:
|
||||||
|
|
||||||
nickname = self.get_nickname(nickname)
|
get_user = self.User.get_User(nickname)
|
||||||
|
if get_user is None:
|
||||||
|
response = f'This nickname {nickname} does not exist, it is not possible to create this user'
|
||||||
|
self.Base.logs.warning(response)
|
||||||
|
return response
|
||||||
|
|
||||||
|
nickname = get_user.nickname
|
||||||
response = ''
|
response = ''
|
||||||
|
|
||||||
if level > 4:
|
if level > 4:
|
||||||
@@ -549,25 +474,19 @@ class Irc:
|
|||||||
self.Base.logs.warning(response)
|
self.Base.logs.warning(response)
|
||||||
return response
|
return response
|
||||||
|
|
||||||
# Verification si le user existe dans notre UID_DB
|
hostname = get_user.hostname
|
||||||
if not nickname in self.db_uid:
|
vhost = get_user.vhost
|
||||||
response = f"{nickname} n'est pas connecté, impossible de l'enregistrer pour le moment"
|
|
||||||
self.Base.logs.warning(response)
|
|
||||||
return response
|
|
||||||
|
|
||||||
hostname = self.db_uid[nickname]['hostname']
|
|
||||||
vhost = self.db_uid[nickname]['vhost']
|
|
||||||
spassword = self.Base.crypt_password(password)
|
spassword = self.Base.crypt_password(password)
|
||||||
|
|
||||||
mes_donnees = {'admin': nickname}
|
mes_donnees = {'admin': nickname}
|
||||||
query_search_user = f"SELECT id FROM {self.Base.DB_SCHEMA['admins']} WHERE user=:admin"
|
query_search_user = f"SELECT id FROM {self.Config.table_admin} WHERE user=:admin"
|
||||||
r = self.Base.db_execute_query(query_search_user, mes_donnees)
|
r = self.Base.db_execute_query(query_search_user, mes_donnees)
|
||||||
exist_user = r.fetchone()
|
exist_user = r.fetchone()
|
||||||
|
|
||||||
# On verifie si le user exist dans la base
|
# On verifie si le user exist dans la base
|
||||||
if not exist_user:
|
if not exist_user:
|
||||||
mes_donnees = {'datetime': self.Base.get_datetime(), 'user': nickname, 'password': spassword, 'hostname': hostname, 'vhost': vhost, 'level': level}
|
mes_donnees = {'datetime': self.Base.get_datetime(), 'user': nickname, 'password': spassword, 'hostname': hostname, 'vhost': vhost, 'level': level}
|
||||||
self.Base.db_execute_query(f'''INSERT INTO {self.Base.DB_SCHEMA['admins']}
|
self.Base.db_execute_query(f'''INSERT INTO {self.Config.table_admin}
|
||||||
(createdOn, user, password, hostname, vhost, level) VALUES
|
(createdOn, user, password, hostname, vhost, level) VALUES
|
||||||
(:datetime, :user, :password, :hostname, :vhost, :level)
|
(:datetime, :user, :password, :hostname, :vhost, :level)
|
||||||
''', mes_donnees)
|
''', mes_donnees)
|
||||||
@@ -581,41 +500,15 @@ class Irc:
|
|||||||
self.Base.logs.info(response)
|
self.Base.logs.info(response)
|
||||||
return response
|
return response
|
||||||
|
|
||||||
def get_uid(self, uidornickname:str) -> Union[str, None]:
|
def is_cmd_allowed(self, nickname:str, cmd:str) -> bool:
|
||||||
|
|
||||||
uid_recherche = uidornickname
|
|
||||||
response = None
|
|
||||||
for uid, value in self.db_uid.items():
|
|
||||||
if uid == uid_recherche:
|
|
||||||
if 'nickname' in value:
|
|
||||||
response = uid
|
|
||||||
if 'uid' in value:
|
|
||||||
response = value['uid']
|
|
||||||
|
|
||||||
return response
|
|
||||||
|
|
||||||
def get_nickname(self, uidornickname:str) -> Union[str, None]:
|
|
||||||
|
|
||||||
nickname_recherche = uidornickname
|
|
||||||
|
|
||||||
response = None
|
|
||||||
for nickname, value in self.db_uid.items():
|
|
||||||
if nickname == nickname_recherche:
|
|
||||||
if 'nickname' in value:
|
|
||||||
response = value['nickname']
|
|
||||||
if 'uid' in value:
|
|
||||||
response = nickname
|
|
||||||
|
|
||||||
return response
|
|
||||||
|
|
||||||
def is_cmd_allowed(self,nickname:str, cmd:str) -> bool:
|
|
||||||
|
|
||||||
# Vérifier si le user est identifié et si il a les droits
|
# Vérifier si le user est identifié et si il a les droits
|
||||||
is_command_allowed = False
|
is_command_allowed = False
|
||||||
uid = self.get_uid(nickname)
|
uid = self.User.get_uid(nickname)
|
||||||
|
get_admin = self.Admin.get_Admin(uid)
|
||||||
|
|
||||||
if uid in self.db_admin:
|
if not get_admin is None:
|
||||||
admin_level = self.db_admin[uid]['level']
|
admin_level = get_admin.level
|
||||||
|
|
||||||
for ref_level, ref_commands in self.commands_level.items():
|
for ref_level, ref_commands in self.commands_level.items():
|
||||||
# print(f"LevelNo: {ref_level} - {ref_commands} - {admin_level}")
|
# print(f"LevelNo: {ref_level} - {ref_commands} - {admin_level}")
|
||||||
@@ -651,6 +544,18 @@ class Irc:
|
|||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def thread_check_for_new_version(self, fromuser: str) -> None:
|
||||||
|
|
||||||
|
dnickname = self.Config.SERVICE_NICKNAME
|
||||||
|
|
||||||
|
if self.Base.check_for_new_version(True):
|
||||||
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : New Version available : {self.Config.current_version} >>> {self.Config.latest_version}')
|
||||||
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : Please run (git pull origin main) in the current folder')
|
||||||
|
else:
|
||||||
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : You have the latest version of defender')
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
def cmd(self, data:list) -> None:
|
def cmd(self, data:list) -> None:
|
||||||
try:
|
try:
|
||||||
|
|
||||||
@@ -713,10 +618,15 @@ class Irc:
|
|||||||
try:
|
try:
|
||||||
# if self.Config.ABUSEIPDB == 1:
|
# if self.Config.ABUSEIPDB == 1:
|
||||||
# self.Base.create_thread(self.abuseipdb_scan, (cmd[2], ))
|
# self.Base.create_thread(self.abuseipdb_scan, (cmd[2], ))
|
||||||
|
self.first_connexion_ip = cmd[2]
|
||||||
|
self.first_score = int(cmd[3])
|
||||||
pass
|
pass
|
||||||
# Possibilité de déclancher les bans a ce niveau.
|
# Possibilité de déclancher les bans a ce niveau.
|
||||||
except IndexError as ie:
|
except IndexError as ie:
|
||||||
self.Base.logs.error(f'{ie}')
|
self.Base.logs.error(f'{ie}')
|
||||||
|
except ValueError as ve:
|
||||||
|
self.first_score = 0
|
||||||
|
self.Base.logs.error(f'Impossible to convert first_score: {ve}')
|
||||||
|
|
||||||
case '320':
|
case '320':
|
||||||
#:irc.deb.biz.st 320 PyDefender IRCParis07 :is in security-groups: known-users,webirc-users,tls-and-known-users,tls-users
|
#:irc.deb.biz.st 320 PyDefender IRCParis07 :is in security-groups: known-users,webirc-users,tls-and-known-users,tls-users
|
||||||
@@ -735,10 +645,12 @@ class Irc:
|
|||||||
hsid = str(cmd[0]).replace(':','')
|
hsid = str(cmd[0]).replace(':','')
|
||||||
if hsid == self.HSID:
|
if hsid == self.HSID:
|
||||||
if self.INIT == 1:
|
if self.INIT == 1:
|
||||||
if self.Base.check_for_new_version():
|
current_version = self.Config.current_version
|
||||||
version = f'{self.Base.DEFENDER_VERSION} >>> {self.Base.LATEST_DEFENDER_VERSION}'
|
latest_version = self.Config.latest_version
|
||||||
|
if self.Base.check_for_new_version(False):
|
||||||
|
version = f'{current_version} >>> {latest_version}'
|
||||||
else:
|
else:
|
||||||
version = f'{self.Base.DEFENDER_VERSION}'
|
version = f'{current_version}'
|
||||||
|
|
||||||
self.send2socket(f"MODE {self.Config.SERVICE_NICKNAME} +B")
|
self.send2socket(f"MODE {self.Config.SERVICE_NICKNAME} +B")
|
||||||
self.send2socket(f"JOIN {self.Config.SERVICE_CHANLOG}")
|
self.send2socket(f"JOIN {self.Config.SERVICE_CHANLOG}")
|
||||||
@@ -764,13 +676,11 @@ class Irc:
|
|||||||
self.Base.logs.info(f"# VERSION : {version} ")
|
self.Base.logs.info(f"# VERSION : {version} ")
|
||||||
self.Base.logs.info(f"################################################")
|
self.Base.logs.info(f"################################################")
|
||||||
|
|
||||||
if self.Base.check_for_new_version():
|
if self.Base.check_for_new_version(False):
|
||||||
self.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} : New Version available {version}")
|
self.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} : New Version available {version}")
|
||||||
|
|
||||||
# Initialisation terminé aprés le premier PING
|
# Initialisation terminé aprés le premier PING
|
||||||
self.INIT = 0
|
self.INIT = 0
|
||||||
# self.send2socket(f':{self.Config.SERVICE_ID} PING :{hsid}')
|
|
||||||
# print(self.db_uid)
|
|
||||||
|
|
||||||
case _:
|
case _:
|
||||||
pass
|
pass
|
||||||
@@ -784,7 +694,7 @@ class Irc:
|
|||||||
# :001N1WD7L QUIT :Quit: free_znc_1
|
# :001N1WD7L QUIT :Quit: free_znc_1
|
||||||
cmd.pop(0)
|
cmd.pop(0)
|
||||||
uid_who_quit = str(cmd[0]).replace(':', '')
|
uid_who_quit = str(cmd[0]).replace(':', '')
|
||||||
self.delete_db_uid(uid_who_quit)
|
self.User.delete(uid_who_quit)
|
||||||
|
|
||||||
case 'PONG':
|
case 'PONG':
|
||||||
# ['@msgid=aTNJhp17kcPboF5diQqkUL;time=2023-12-28T20:35:58.411Z', ':irc.deb.biz.st', 'PONG', 'irc.deb.biz.st', ':Dev-PyDefender']
|
# ['@msgid=aTNJhp17kcPboF5diQqkUL;time=2023-12-28T20:35:58.411Z', ':irc.deb.biz.st', 'PONG', 'irc.deb.biz.st', ':Dev-PyDefender']
|
||||||
@@ -798,17 +708,63 @@ class Irc:
|
|||||||
cmd.pop(0)
|
cmd.pop(0)
|
||||||
uid = str(cmd[0]).replace(':','')
|
uid = str(cmd[0]).replace(':','')
|
||||||
newnickname = cmd[2]
|
newnickname = cmd[2]
|
||||||
|
self.User.update(uid, newnickname)
|
||||||
|
|
||||||
self.update_db_uid(uid, newnickname)
|
case 'MODE':
|
||||||
|
#['@msgid=d0ySx56Yd0nc35oHts2SkC-/J9mVUA1hfM6+Z4494xWUg;time=2024-08-09T12:45:36.651Z',
|
||||||
|
# ':001', 'MODE', '#a', '+nt', '1723207536']
|
||||||
|
cmd.pop(0)
|
||||||
|
if '#' in cmd[2]:
|
||||||
|
channel = cmd[2]
|
||||||
|
mode = cmd[3]
|
||||||
|
self.Channel.update(channel, mode)
|
||||||
|
|
||||||
case 'SJOIN':
|
case 'SJOIN':
|
||||||
# ['@msgid=ictnEBhHmTUHzkEeVZl6rR;time=2023-12-28T20:03:18.482Z', ':001', 'SJOIN', '1702139101', '#stats', '+nst', ':@001SB890A', '@00BAAAAAI']
|
# ['@msgid=5sTwGdj349D82L96p749SY;time=2024-08-15T09:50:23.528Z', ':001', 'SJOIN', '1721564574', '#welcome', ':001JD94QH']
|
||||||
|
# ['@msgid=bvceb6HthbLJapgGLXn1b0;time=2024-08-15T09:50:11.464Z', ':001', 'SJOIN', '1721564574', '#welcome', '+lnrt', '13', ':001CIVLQF', '+11ZAAAAAB', '001QGR10C', '*@0014UE10B', '001NL1O07', '001SWZR05', '001HB8G04', '@00BAAAAAJ', '0019M7101']
|
||||||
cmd.pop(0)
|
cmd.pop(0)
|
||||||
channel = cmd[3]
|
channel = str(cmd[3]).lower()
|
||||||
self.insert_db_chan(channel)
|
mode = cmd[4]
|
||||||
|
len_cmd = len(cmd)
|
||||||
|
list_users:list = []
|
||||||
|
|
||||||
|
start_boucle = 0
|
||||||
|
|
||||||
|
# Trouver le premier user
|
||||||
|
for i in range(len_cmd):
|
||||||
|
s: list = re.findall(fr':', cmd[i])
|
||||||
|
if s:
|
||||||
|
start_boucle = i
|
||||||
|
|
||||||
|
# Boucle qui va ajouter l'ensemble des users (UID)
|
||||||
|
for i in range(start_boucle, len(cmd)):
|
||||||
|
parsed_UID = str(cmd[i])
|
||||||
|
pattern = fr'[:|@|%|\+|~|\*]*'
|
||||||
|
pattern = fr':'
|
||||||
|
parsed_UID = re.sub(pattern, '', parsed_UID)
|
||||||
|
list_users.append(parsed_UID)
|
||||||
|
|
||||||
|
self.Channel.insert(
|
||||||
|
self.Channel.ChannelModel(
|
||||||
|
name=channel,
|
||||||
|
mode=mode,
|
||||||
|
uids=list_users
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
case 'PART':
|
||||||
|
# ['@unrealircd.org/geoip=FR;unrealircd.org/userhost=50d6492c@80.214.73.44;unrealircd.org/userip=50d6492c@80.214.73.44;msgid=YSIPB9q4PcRu0EVfC9ci7y-/mZT0+Gj5FLiDSZshH5NCw;time=2024-08-15T15:35:53.772Z',
|
||||||
|
# ':001EPFBRD', 'PART', '#welcome', ':WEB', 'IRC', 'Paris']
|
||||||
|
uid = str(cmd[1]).replace(':','')
|
||||||
|
channel = str(cmd[3])
|
||||||
|
self.Channel.delete_user_from_channel(channel, uid)
|
||||||
|
|
||||||
|
pass
|
||||||
|
|
||||||
case 'UID':
|
case 'UID':
|
||||||
|
# ['@s2s-md/geoip=cc=GB|cd=United\\sKingdom|asn=16276|asname=OVH\\sSAS;s2s-md/tls_cipher=TLSv1.3-TLS_CHACHA20_POLY1305_SHA256;s2s-md/creationtime=1721564601',
|
||||||
|
# ':001', 'UID', 'albatros', '0', '1721564597', 'albatros', 'vps-91b2f28b.vps.ovh.net',
|
||||||
|
# '001HB8G04', '0', '+iwxz', 'Clk-A62F1D18.vps.ovh.net', 'Clk-A62F1D18.vps.ovh.net', 'MyZBwg==', ':...']
|
||||||
if 'webirc' in cmd[0]:
|
if 'webirc' in cmd[0]:
|
||||||
isWebirc = True
|
isWebirc = True
|
||||||
else:
|
else:
|
||||||
@@ -820,8 +776,27 @@ class Irc:
|
|||||||
hostname = str(cmd[7])
|
hostname = str(cmd[7])
|
||||||
umodes = str(cmd[10])
|
umodes = str(cmd[10])
|
||||||
vhost = str(cmd[11])
|
vhost = str(cmd[11])
|
||||||
|
if not 'S' in umodes:
|
||||||
|
remote_ip = self.Base.decode_ip(str(cmd[13]))
|
||||||
|
else:
|
||||||
|
remote_ip = '127.0.0.1'
|
||||||
|
|
||||||
self.insert_db_uid(uid, nickname, username, hostname, umodes, vhost, isWebirc)
|
score_connexion = self.first_score
|
||||||
|
|
||||||
|
self.User.insert(
|
||||||
|
self.User.UserModel(
|
||||||
|
uid=uid,
|
||||||
|
nickname=nickname,
|
||||||
|
username=username,
|
||||||
|
hostname=hostname,
|
||||||
|
umodes=umodes,
|
||||||
|
vhost=vhost,
|
||||||
|
isWebirc=isWebirc,
|
||||||
|
remote_ip=remote_ip,
|
||||||
|
score_connexion=score_connexion,
|
||||||
|
connexion_datetime=datetime.now()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
for classe_name, classe_object in self.loaded_classes.items():
|
for classe_name, classe_object in self.loaded_classes.items():
|
||||||
classe_object.cmd(cmd_to_send)
|
classe_object.cmd(cmd_to_send)
|
||||||
@@ -842,7 +817,8 @@ class Irc:
|
|||||||
else:
|
else:
|
||||||
self.Base.logs.info(f'{cmd}')
|
self.Base.logs.info(f'{cmd}')
|
||||||
# user_trigger = get_user.split('!')[0]
|
# user_trigger = get_user.split('!')[0]
|
||||||
user_trigger = self.get_nickname(get_uid_or_nickname)
|
# user_trigger = self.get_nickname(get_uid_or_nickname)
|
||||||
|
user_trigger = self.User.get_nickname(get_uid_or_nickname)
|
||||||
dnickname = self.Config.SERVICE_NICKNAME
|
dnickname = self.Config.SERVICE_NICKNAME
|
||||||
|
|
||||||
pattern = fr'(:\{self.Config.SERVICE_PREFIX})(.*)$'
|
pattern = fr'(:\{self.Config.SERVICE_PREFIX})(.*)$'
|
||||||
@@ -860,7 +836,7 @@ class Irc:
|
|||||||
cmd_to_send = convert_to_string.replace(':','')
|
cmd_to_send = convert_to_string.replace(':','')
|
||||||
self.Base.log_cmd(user_trigger, cmd_to_send)
|
self.Base.log_cmd(user_trigger, cmd_to_send)
|
||||||
|
|
||||||
self._hcmds(user_trigger, arg)
|
self._hcmds(user_trigger, arg, cmd)
|
||||||
|
|
||||||
if cmd[2] == self.Config.SERVICE_ID:
|
if cmd[2] == self.Config.SERVICE_ID:
|
||||||
pattern = fr'^:.*?:(.*)$'
|
pattern = fr'^:.*?:(.*)$'
|
||||||
@@ -874,7 +850,7 @@ class Irc:
|
|||||||
|
|
||||||
# Réponse a un CTCP VERSION
|
# Réponse a un CTCP VERSION
|
||||||
if arg[0] == '\x01VERSION\x01':
|
if arg[0] == '\x01VERSION\x01':
|
||||||
self.send2socket(f':{dnickname} NOTICE {user_trigger} :\x01VERSION Service {self.Config.SERVICE_NICKNAME} V{self.Base.DEFENDER_VERSION}\x01')
|
self.send2socket(f':{dnickname} NOTICE {user_trigger} :\x01VERSION Service {self.Config.SERVICE_NICKNAME} V{self.Config.current_version}\x01')
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# Réponse a un TIME
|
# Réponse a un TIME
|
||||||
@@ -896,9 +872,9 @@ class Irc:
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
cmd_to_send = convert_to_string.replace(':','')
|
cmd_to_send = convert_to_string.replace(':','')
|
||||||
self.Base.log_cmd(self.get_nickname(user_trigger), cmd_to_send)
|
self.Base.log_cmd(self.User.get_nickname(user_trigger), cmd_to_send)
|
||||||
|
|
||||||
self._hcmds(user_trigger, arg)
|
self._hcmds(user_trigger, arg, cmd)
|
||||||
|
|
||||||
except IndexError as io:
|
except IndexError as io:
|
||||||
self.Base.logs.error(f'{io}')
|
self.Base.logs.error(f'{io}')
|
||||||
@@ -914,10 +890,10 @@ class Irc:
|
|||||||
except IndexError as ie:
|
except IndexError as ie:
|
||||||
self.Base.logs.error(f"{ie} / {cmd} / length {str(len(cmd))}")
|
self.Base.logs.error(f"{ie} / {cmd} / length {str(len(cmd))}")
|
||||||
|
|
||||||
def _hcmds(self, user: str, cmd:list) -> None:
|
def _hcmds(self, user: str, cmd:list, fullcmd: list = []) -> None:
|
||||||
|
|
||||||
fromuser = self.get_nickname(user) # Nickname qui a lancé la commande
|
fromuser = self.User.get_nickname(user) # Nickname qui a lancé la commande
|
||||||
uid = self.get_uid(fromuser) # Récuperer le uid de l'utilisateur
|
uid = self.User.get_uid(fromuser) # Récuperer le uid de l'utilisateur
|
||||||
|
|
||||||
# Defender information
|
# Defender information
|
||||||
dnickname = self.Config.SERVICE_NICKNAME # Defender nickname
|
dnickname = self.Config.SERVICE_NICKNAME # Defender nickname
|
||||||
@@ -935,14 +911,14 @@ class Irc:
|
|||||||
# Envoyer la commande aux classes dynamiquement chargées
|
# Envoyer la commande aux classes dynamiquement chargées
|
||||||
if command != 'notallowed':
|
if command != 'notallowed':
|
||||||
for classe_name, classe_object in self.loaded_classes.items():
|
for classe_name, classe_object in self.loaded_classes.items():
|
||||||
classe_object._hcmds(user, cmd)
|
classe_object._hcmds(user, cmd, fullcmd)
|
||||||
|
|
||||||
match command:
|
match command:
|
||||||
|
|
||||||
case 'notallowed':
|
case 'notallowed':
|
||||||
try:
|
try:
|
||||||
current_command = cmd[0]
|
current_command = cmd[0]
|
||||||
self.send2socket(f':{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR["rouge"]}{current_command}{self.Config.CONFIG_COLOR["noire"]} ] - Accès Refusé à {self.get_nickname(fromuser)}')
|
self.send2socket(f':{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR["rouge"]}{current_command}{self.Config.CONFIG_COLOR["noire"]} ] - Accès Refusé à {self.User.get_nickname(fromuser)}')
|
||||||
self.send2socket(f':{dnickname} NOTICE {fromuser} : Accès Refusé')
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : Accès Refusé')
|
||||||
except IndexError as ie:
|
except IndexError as ie:
|
||||||
self.Base.logs.error(f'{ie}')
|
self.Base.logs.error(f'{ie}')
|
||||||
@@ -950,29 +926,29 @@ class Irc:
|
|||||||
case 'deauth':
|
case 'deauth':
|
||||||
|
|
||||||
current_command = cmd[0]
|
current_command = cmd[0]
|
||||||
uid_to_deauth = self.get_uid(fromuser)
|
uid_to_deauth = self.User.get_uid(fromuser)
|
||||||
self.delete_db_admin(uid_to_deauth)
|
self.delete_db_admin(uid_to_deauth)
|
||||||
self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['rouge']}{current_command}{self.Config.CONFIG_COLOR['noire']} ] - {self.get_nickname(fromuser)} est désormais déconnecter de {dnickname}")
|
self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['rouge']}{current_command}{self.Config.CONFIG_COLOR['noire']} ] - {self.User.get_nickname(fromuser)} est désormais déconnecter de {dnickname}")
|
||||||
|
|
||||||
case 'auth':
|
case 'auth':
|
||||||
# ['auth', 'adator', 'password']
|
# ['auth', 'adator', 'password']
|
||||||
current_command = cmd[0]
|
current_command = cmd[0]
|
||||||
user_to_log = self.get_nickname(cmd[1])
|
user_to_log = self.User.get_nickname(cmd[1])
|
||||||
password = cmd[2]
|
password = cmd[2]
|
||||||
|
|
||||||
if not user_to_log is None:
|
if not user_to_log is None:
|
||||||
mes_donnees = {'user': user_to_log, 'password': self.Base.crypt_password(password)}
|
mes_donnees = {'user': user_to_log, 'password': self.Base.crypt_password(password)}
|
||||||
query = f"SELECT id, level FROM {self.Base.DB_SCHEMA['admins']} WHERE user = :user AND password = :password"
|
query = f"SELECT id, level FROM {self.Config.table_admin} WHERE user = :user AND password = :password"
|
||||||
result = self.Base.db_execute_query(query, mes_donnees)
|
result = self.Base.db_execute_query(query, mes_donnees)
|
||||||
user_from_db = result.fetchone()
|
user_from_db = result.fetchone()
|
||||||
|
|
||||||
if not user_from_db is None:
|
if not user_from_db is None:
|
||||||
uid_user = self.get_uid(user_to_log)
|
uid_user = self.User.get_uid(user_to_log)
|
||||||
self.insert_db_admin(uid_user, user_from_db[1])
|
self.insert_db_admin(uid_user, user_from_db[1])
|
||||||
self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['verte']}{current_command}{self.Config.CONFIG_COLOR['noire']} ] - {self.get_nickname(fromuser)} est désormais connecté a {dnickname}")
|
self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['verte']}{current_command}{self.Config.CONFIG_COLOR['noire']} ] - {self.User.get_nickname(fromuser)} est désormais connecté a {dnickname}")
|
||||||
self.send2socket(f":{self.Config.SERVICE_NICKNAME} NOTICE {fromuser} :Connexion a {dnickname} réussie!")
|
self.send2socket(f":{self.Config.SERVICE_NICKNAME} NOTICE {fromuser} :Connexion a {dnickname} réussie!")
|
||||||
else:
|
else:
|
||||||
self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['rouge']}{current_command}{self.Config.CONFIG_COLOR['noire']} ] - {self.get_nickname(fromuser)} a tapé un mauvais mot de pass")
|
self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :[ {self.Config.CONFIG_COLOR['rouge']}{current_command}{self.Config.CONFIG_COLOR['noire']} ] - {self.User.get_nickname(fromuser)} a tapé un mauvais mot de pass")
|
||||||
self.send2socket(f":{self.Config.SERVICE_NICKNAME} NOTICE {fromuser} :Mot de passe incorrecte")
|
self.send2socket(f":{self.Config.SERVICE_NICKNAME} NOTICE {fromuser} :Mot de passe incorrecte")
|
||||||
|
|
||||||
else:
|
else:
|
||||||
@@ -1007,9 +983,14 @@ class Irc:
|
|||||||
self.send2socket(f':{dnickname} NOTICE {fromuser} : .editaccess [USER] [NEWPASSWORD] [NEWLEVEL]')
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : .editaccess [USER] [NEWPASSWORD] [NEWLEVEL]')
|
||||||
return None
|
return None
|
||||||
|
|
||||||
current_user = self.get_nickname(fromuser)
|
get_admin = self.Admin.get_Admin(fromuser)
|
||||||
current_uid = self.get_uid(fromuser)
|
if get_admin is None:
|
||||||
current_user_level = self.db_admin[current_uid]['level']
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : This user {fromuser} has no Admin access')
|
||||||
|
return None
|
||||||
|
|
||||||
|
current_user = self.User.get_nickname(fromuser)
|
||||||
|
current_uid = self.User.get_uid(fromuser)
|
||||||
|
current_user_level = get_admin.level
|
||||||
|
|
||||||
if user_new_level > 5:
|
if user_new_level > 5:
|
||||||
self.send2socket(f':{dnickname} NOTICE {fromuser} : Maximum authorized level is 5')
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : Maximum authorized level is 5')
|
||||||
@@ -1017,7 +998,7 @@ class Irc:
|
|||||||
|
|
||||||
# Rechercher le user dans la base de données.
|
# Rechercher le user dans la base de données.
|
||||||
mes_donnees = {'user': user_to_edit}
|
mes_donnees = {'user': user_to_edit}
|
||||||
query = f"SELECT user, level FROM {self.Base.DB_SCHEMA['admins']} WHERE user = :user"
|
query = f"SELECT user, level FROM {self.Config.table_admin} WHERE user = :user"
|
||||||
result = self.Base.db_execute_query(query, mes_donnees)
|
result = self.Base.db_execute_query(query, mes_donnees)
|
||||||
|
|
||||||
isUserExist = result.fetchone()
|
isUserExist = result.fetchone()
|
||||||
@@ -1033,7 +1014,7 @@ class Irc:
|
|||||||
|
|
||||||
# Le user existe dans la base de données
|
# Le user existe dans la base de données
|
||||||
data_to_update = {'user': user_to_edit, 'password': user_password, 'level': user_new_level}
|
data_to_update = {'user': user_to_edit, 'password': user_password, 'level': user_new_level}
|
||||||
sql_update = f"UPDATE {self.Base.DB_SCHEMA['admins']} SET level = :level, password = :password WHERE user = :user"
|
sql_update = f"UPDATE {self.Config.table_admin} SET level = :level, password = :password WHERE user = :user"
|
||||||
exec_query = self.Base.db_execute_query(sql_update, data_to_update)
|
exec_query = self.Base.db_execute_query(sql_update, data_to_update)
|
||||||
if exec_query.rowcount > 0:
|
if exec_query.rowcount > 0:
|
||||||
self.send2socket(f':{dnickname} NOTICE {fromuser} : User {user_to_edit} has been modified with level {str(user_new_level)}')
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : User {user_to_edit} has been modified with level {str(user_new_level)}')
|
||||||
@@ -1059,14 +1040,20 @@ class Irc:
|
|||||||
if len(cmd) < 3:
|
if len(cmd) < 3:
|
||||||
self.send2socket(f':{dnickname} NOTICE {fromuser} : .delaccess [USER] [CONFIRMUSER]')
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : .delaccess [USER] [CONFIRMUSER]')
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
get_admin = self.Admin.get_Admin(fromuser)
|
||||||
|
|
||||||
|
if get_admin is None:
|
||||||
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : This user {fromuser} has no admin access')
|
||||||
|
return None
|
||||||
|
|
||||||
current_user = self.get_nickname(fromuser)
|
current_user = self.User.get_nickname(fromuser)
|
||||||
current_uid = self.get_uid(fromuser)
|
current_uid = self.User.get_uid(fromuser)
|
||||||
current_user_level = self.db_admin[current_uid]['level']
|
current_user_level = get_admin.level
|
||||||
|
|
||||||
# Rechercher le user dans la base de données.
|
# Rechercher le user dans la base de données.
|
||||||
mes_donnees = {'user': user_to_del}
|
mes_donnees = {'user': user_to_del}
|
||||||
query = f"SELECT user, level FROM {self.Base.DB_SCHEMA['admins']} WHERE user = :user"
|
query = f"SELECT user, level FROM {self.Config.table_admin} WHERE user = :user"
|
||||||
result = self.Base.db_execute_query(query, mes_donnees)
|
result = self.Base.db_execute_query(query, mes_donnees)
|
||||||
info_user = result.fetchone()
|
info_user = result.fetchone()
|
||||||
|
|
||||||
@@ -1078,7 +1065,7 @@ class Irc:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
data_to_delete = {'user': user_to_del}
|
data_to_delete = {'user': user_to_del}
|
||||||
sql_delete = f"DELETE FROM {self.Base.DB_SCHEMA['admins']} WHERE user = :user"
|
sql_delete = f"DELETE FROM {self.Config.table_admin} WHERE user = :user"
|
||||||
exec_query = self.Base.db_execute_query(sql_delete, data_to_delete)
|
exec_query = self.Base.db_execute_query(sql_delete, data_to_delete)
|
||||||
if exec_query.rowcount > 0:
|
if exec_query.rowcount > 0:
|
||||||
self.send2socket(f':{dnickname} NOTICE {fromuser} : User {user_to_del} has been deleted !')
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : User {user_to_del} has been deleted !')
|
||||||
@@ -1090,8 +1077,9 @@ class Irc:
|
|||||||
|
|
||||||
help = ''
|
help = ''
|
||||||
count_level_definition = 0
|
count_level_definition = 0
|
||||||
if uid in self.db_admin:
|
get_admin = self.Admin.get_Admin(uid)
|
||||||
user_level = self.db_admin[uid]['level']
|
if not get_admin is None:
|
||||||
|
user_level = get_admin.level
|
||||||
else:
|
else:
|
||||||
user_level = 0
|
user_level = 0
|
||||||
|
|
||||||
@@ -1152,7 +1140,7 @@ class Irc:
|
|||||||
|
|
||||||
if 'mods.' + module_name in sys.modules:
|
if 'mods.' + module_name in sys.modules:
|
||||||
self.loaded_classes[class_name].unload()
|
self.loaded_classes[class_name].unload()
|
||||||
self.Base.logs.info('Module Already Loaded ... reload the module ...')
|
self.Base.logs.info('Module Already Loaded ... reloading the module ...')
|
||||||
the_module = sys.modules['mods.' + module_name]
|
the_module = sys.modules['mods.' + module_name]
|
||||||
importlib.reload(the_module)
|
importlib.reload(the_module)
|
||||||
|
|
||||||
@@ -1206,8 +1194,11 @@ class Irc:
|
|||||||
reason.append(cmd[i])
|
reason.append(cmd[i])
|
||||||
final_reason = ' '.join(reason)
|
final_reason = ' '.join(reason)
|
||||||
|
|
||||||
self.db_uid.clear() #Vider UID_DB
|
# self.db_uid.clear() #Vider UID_DB
|
||||||
self.db_chan = [] #Vider les salons
|
# self.db_chan = [] #Vider les salons
|
||||||
|
|
||||||
|
self.User.UID_DB.clear() # Clear User Object
|
||||||
|
self.Channel.UID_CHANNEL_DB.clear() # Clear Channel Object
|
||||||
|
|
||||||
for class_name in self.loaded_classes:
|
for class_name in self.loaded_classes:
|
||||||
self.loaded_classes[class_name].unload()
|
self.loaded_classes[class_name].unload()
|
||||||
@@ -1223,7 +1214,7 @@ class Irc:
|
|||||||
|
|
||||||
self.Base.logs.debug(self.loaded_classes)
|
self.Base.logs.debug(self.loaded_classes)
|
||||||
|
|
||||||
results = self.Base.db_execute_query(f'SELECT module FROM {self.Base.DB_SCHEMA["modules"]}')
|
results = self.Base.db_execute_query(f'SELECT module FROM {self.Config.table_module}')
|
||||||
results = results.fetchall()
|
results = results.fetchall()
|
||||||
|
|
||||||
if len(results) == 0:
|
if len(results) == 0:
|
||||||
@@ -1248,37 +1239,30 @@ class Irc:
|
|||||||
|
|
||||||
self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :{str(running_thread_name)}")
|
self.send2socket(f":{dnickname} PRIVMSG {dchanlog} :{str(running_thread_name)}")
|
||||||
|
|
||||||
|
case 'show_channels':
|
||||||
|
|
||||||
|
for chan in self.Channel.UID_CHANNEL_DB:
|
||||||
|
list_nicknames: list = []
|
||||||
|
for uid in chan.uids:
|
||||||
|
pattern = fr'[:|@|%|\+|~|\*]*'
|
||||||
|
parsed_UID = re.sub(pattern, '', uid)
|
||||||
|
list_nicknames.append(self.User.get_nickname(parsed_UID))
|
||||||
|
|
||||||
|
self.send2socket(f":{dnickname} NOTICE {fromuser} : Channel: {chan.name} - Users: {list_nicknames}")
|
||||||
|
|
||||||
case 'uptime':
|
case 'uptime':
|
||||||
uptime = self.get_defender_uptime()
|
uptime = self.get_defender_uptime()
|
||||||
self.send2socket(f':{dnickname} NOTICE {fromuser} : {uptime}')
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : {uptime}')
|
||||||
|
|
||||||
case 'copyright':
|
case 'copyright':
|
||||||
self.send2socket(f':{dnickname} NOTICE {fromuser} : # Defender V.{self.Base.DEFENDER_VERSION} Developped by adator® and dktmb® #')
|
self.send2socket(f':{dnickname} NOTICE {fromuser} : # Defender V.{self.Config.current_version} Developped by adator® and dktmb® #')
|
||||||
|
|
||||||
case 'sentinel':
|
|
||||||
# .sentinel on
|
|
||||||
activation = str(cmd[1]).lower()
|
|
||||||
service_id = self.Config.SERVICE_ID
|
|
||||||
|
|
||||||
channel_to_dont_quit = [self.Config.SALON_JAIL, dchanlog]
|
|
||||||
|
|
||||||
if activation == 'on':
|
|
||||||
for chan in self.db_chan:
|
|
||||||
if not chan in channel_to_dont_quit:
|
|
||||||
self.send2socket(f":{service_id} JOIN {chan}")
|
|
||||||
if activation == 'off':
|
|
||||||
for chan in self.db_chan:
|
|
||||||
if not chan in channel_to_dont_quit:
|
|
||||||
self.send2socket(f":{service_id} PART {chan}")
|
|
||||||
|
|
||||||
case 'checkversion':
|
case 'checkversion':
|
||||||
|
|
||||||
if self.Base.check_for_new_version():
|
self.Base.create_thread(
|
||||||
self.send2socket(f':{dnickname} NOTICE {fromuser} : New Version available : {self.Base.DEFENDER_VERSION} >>> {self.Base.LATEST_DEFENDER_VERSION}')
|
self.thread_check_for_new_version,
|
||||||
self.send2socket(f':{dnickname} NOTICE {fromuser} : Please run (git pull origin main) in the current folder')
|
(fromuser, )
|
||||||
else:
|
)
|
||||||
self.send2socket(f':{dnickname} NOTICE {fromuser} : You have the latest version of defender')
|
|
||||||
pass
|
|
||||||
|
|
||||||
case _:
|
case _:
|
||||||
pass
|
pass
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import json, os
|
import json
|
||||||
|
from os import sep
|
||||||
from typing import Union
|
from typing import Union
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
|
|
||||||
@@ -45,20 +46,44 @@ class ConfigDataModel:
|
|||||||
|
|
||||||
DEBUG_LEVEL: int # Le niveau des logs DEBUG 10 | INFO 20 | WARNING 30 | ERROR 40 | CRITICAL 50
|
DEBUG_LEVEL: int # Le niveau des logs DEBUG 10 | INFO 20 | WARNING 30 | ERROR 40 | CRITICAL 50
|
||||||
|
|
||||||
CONFIG_COLOR: dict
|
CONFIG_COLOR: dict[str, str]
|
||||||
|
|
||||||
|
table_admin: str
|
||||||
|
table_commande: str
|
||||||
|
table_log: str
|
||||||
|
table_module: str
|
||||||
|
|
||||||
|
current_version: str
|
||||||
|
latest_version: str
|
||||||
|
db_name: str
|
||||||
|
db_path: str
|
||||||
|
|
||||||
def __post_init__(self):
|
def __post_init__(self):
|
||||||
# Initialiser SERVICE_ID après la création de l'objet
|
# Initialiser SERVICE_ID après la création de l'objet
|
||||||
self.SERVICE_ID:str = f"{self.SERVEUR_ID}AAAAAB"
|
self.SERVICE_ID:str = f"{self.SERVEUR_ID}AAAAAB"
|
||||||
|
|
||||||
|
|
||||||
class Config:
|
class Config:
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
|
||||||
import_config = self.__load_json_configuration()
|
self.ConfigObject: ConfigDataModel = self.__load_service_configuration()
|
||||||
|
return None
|
||||||
|
|
||||||
ConfigModel = ConfigDataModel(
|
def __load_json_service_configuration(self):
|
||||||
|
|
||||||
|
conf_filename = f'core{sep}configuration.json'
|
||||||
|
with open(conf_filename, 'r') as configuration_data:
|
||||||
|
configuration:dict[str, Union[str, int, list, dict]] = json.load(configuration_data)
|
||||||
|
|
||||||
|
for key, value in configuration['CONFIG_COLOR'].items():
|
||||||
|
configuration['CONFIG_COLOR'][key] = str(value).encode('utf-8').decode('unicode_escape')
|
||||||
|
|
||||||
|
return configuration
|
||||||
|
|
||||||
|
def __load_service_configuration(self) -> ConfigDataModel:
|
||||||
|
import_config = self.__load_json_service_configuration()
|
||||||
|
|
||||||
|
ConfigObject: ConfigDataModel = ConfigDataModel(
|
||||||
SERVEUR_IP=import_config["SERVEUR_IP"],
|
SERVEUR_IP=import_config["SERVEUR_IP"],
|
||||||
SERVEUR_HOSTNAME=import_config["SERVEUR_HOSTNAME"],
|
SERVEUR_HOSTNAME=import_config["SERVEUR_HOSTNAME"],
|
||||||
SERVEUR_LINK=import_config["SERVEUR_LINK"],
|
SERVEUR_LINK=import_config["SERVEUR_LINK"],
|
||||||
@@ -87,19 +112,15 @@ class Config:
|
|||||||
WHITELISTED_IP=import_config["WHITELISTED_IP"],
|
WHITELISTED_IP=import_config["WHITELISTED_IP"],
|
||||||
GLINE_DURATION=import_config["GLINE_DURATION"],
|
GLINE_DURATION=import_config["GLINE_DURATION"],
|
||||||
DEBUG_LEVEL=import_config["DEBUG_LEVEL"],
|
DEBUG_LEVEL=import_config["DEBUG_LEVEL"],
|
||||||
CONFIG_COLOR=import_config["CONFIG_COLOR"]
|
CONFIG_COLOR=import_config["CONFIG_COLOR"],
|
||||||
|
table_admin='sys_admins',
|
||||||
|
table_commande='sys_commandes',
|
||||||
|
table_log='sys_logs',
|
||||||
|
table_module='sys_modules',
|
||||||
|
current_version='',
|
||||||
|
latest_version='',
|
||||||
|
db_name='defender',
|
||||||
|
db_path=f'db{sep}'
|
||||||
)
|
)
|
||||||
|
|
||||||
self.ConfigModel = ConfigModel
|
return ConfigObject
|
||||||
return None
|
|
||||||
|
|
||||||
def __load_json_configuration(self):
|
|
||||||
|
|
||||||
conf_filename = f'core{os.sep}configuration.json'
|
|
||||||
with open(conf_filename, 'r') as configuration_data:
|
|
||||||
configuration:dict[str, Union[str, int, list, dict]] = json.load(configuration_data)
|
|
||||||
|
|
||||||
for key, value in configuration['CONFIG_COLOR'].items():
|
|
||||||
configuration['CONFIG_COLOR'][key] = value.encode('utf-8').decode('unicode_escape')
|
|
||||||
|
|
||||||
return configuration
|
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -1,11 +1,10 @@
|
|||||||
import threading
|
|
||||||
from core.irc import Irc
|
from core.irc import Irc
|
||||||
|
|
||||||
# Le module crée devra réspecter quelques conditions
|
# Le module crée devra réspecter quelques conditions
|
||||||
# 1. Importer le module de configuration
|
# 1. Importer le module de configuration
|
||||||
# 2. Le nom de class devra toujours s'appeler comme le module exemple => nom de class Dktmb | nom du module mod_dktmb
|
# 2. Le nom de class devra toujours s'appeler comme le module exemple => nom de class Dktmb | nom du module mod_dktmb
|
||||||
# 3. la fonction __init__ devra toujours avoir les parametres suivant (self, irc:object)
|
# 3. la fonction __init__ devra toujours avoir les parametres suivant (self, irc:object)
|
||||||
# 1 . Créer la variable irc dans le module
|
# 1 . Créer la variable Irc dans le module
|
||||||
# 2 . Récuperer la configuration dans une variable
|
# 2 . Récuperer la configuration dans une variable
|
||||||
# 3 . Définir et enregistrer les nouvelles commandes
|
# 3 . Définir et enregistrer les nouvelles commandes
|
||||||
# 4. une fonction _hcmds(self, user:str, cmd: list) devra toujours etre crée.
|
# 4. une fonction _hcmds(self, user:str, cmd: list) devra toujours etre crée.
|
||||||
@@ -13,37 +12,59 @@ from core.irc import Irc
|
|||||||
class Test():
|
class Test():
|
||||||
|
|
||||||
def __init__(self, ircInstance:Irc) -> None:
|
def __init__(self, ircInstance:Irc) -> None:
|
||||||
print(f'Module {self.__class__.__name__} loaded ...')
|
|
||||||
|
|
||||||
self.irc = ircInstance # Ajouter l'object mod_irc a la classe
|
# Add Irc Object to the module
|
||||||
|
self.Irc = ircInstance
|
||||||
|
|
||||||
self.config = ircInstance.Config # Ajouter la configuration a la classe
|
# Add Global Configuration to the module
|
||||||
|
self.Config = ircInstance.Config
|
||||||
|
|
||||||
|
# Add Base object to the module
|
||||||
|
self.Base = ircInstance.Base
|
||||||
|
|
||||||
|
# Add logs object to the module
|
||||||
|
self.Logs = ircInstance.Base.logs
|
||||||
|
|
||||||
|
# Add User object to the module
|
||||||
|
self.User = ircInstance.User
|
||||||
|
|
||||||
|
# Add Channel object to the module
|
||||||
|
self.Channel = ircInstance.Channel
|
||||||
|
|
||||||
# Créer les nouvelles commandes du module
|
# Créer les nouvelles commandes du module
|
||||||
self.commands = ['test']
|
self.commands_level = {
|
||||||
|
0: ['test'],
|
||||||
|
1: ['test_level_1']
|
||||||
|
}
|
||||||
|
|
||||||
self.__set_commands(self.commands) # Enrigstrer les nouvelles commandes dans le code
|
# Init the module
|
||||||
|
self.__init_module()
|
||||||
|
|
||||||
self.core = ircInstance.Base # Instance du module Base
|
# Log the module
|
||||||
|
self.Logs.debug(f'Module {self.__class__.__name__} loaded ...')
|
||||||
|
|
||||||
self.session = '' # Instancier une session pour la base de données
|
def __init_module(self) -> None:
|
||||||
self.__create_db('mod_test') # Créer la base de données si necessaire
|
|
||||||
|
|
||||||
def __set_commands(self, commands:list) -> None:
|
self.__set_commands(self.commands_level)
|
||||||
"""Rajoute les commandes du module au programme principal
|
self.__create_tables()
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def __set_commands(self, commands:dict[int, list[str]]) -> None:
|
||||||
|
"""### Rajoute les commandes du module au programme principal
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
commands (list): Liste des commandes du module
|
commands (list): Liste des commandes du module
|
||||||
|
|
||||||
Returns:
|
|
||||||
None: Aucun retour attendu
|
|
||||||
"""
|
"""
|
||||||
for command in commands:
|
for level, com in commands.items():
|
||||||
self.irc.commands.append(command)
|
for c in commands[level]:
|
||||||
|
if not c in self.Irc.commands:
|
||||||
|
self.Irc.commands_level[level].append(c)
|
||||||
|
self.Irc.commands.append(c)
|
||||||
|
|
||||||
return True
|
return None
|
||||||
|
|
||||||
def __create_db(self, db_name:str) -> None:
|
def __create_tables(self) -> None:
|
||||||
"""Methode qui va créer la base de donnée si elle n'existe pas.
|
"""Methode qui va créer la base de donnée si elle n'existe pas.
|
||||||
Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module
|
Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module
|
||||||
Args:
|
Args:
|
||||||
@@ -52,36 +73,36 @@ class Test():
|
|||||||
Returns:
|
Returns:
|
||||||
None: Aucun retour n'es attendu
|
None: Aucun retour n'es attendu
|
||||||
"""
|
"""
|
||||||
db_directory = self.core.MODS_DB_PATH
|
|
||||||
|
|
||||||
self.session = self.core.db_init(db_directory, db_name)
|
table_logs = '''CREATE TABLE IF NOT EXISTS test_logs (
|
||||||
|
|
||||||
table_logs = '''CREATE TABLE IF NOT EXISTS logs (
|
|
||||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
datetime TEXT,
|
datetime TEXT,
|
||||||
server_msg TEXT
|
server_msg TEXT
|
||||||
)
|
)
|
||||||
'''
|
'''
|
||||||
|
|
||||||
self.core.db_execute_query(self.session, table_logs)
|
self.Base.db_execute_query(table_logs)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def unload(self) -> None:
|
def unload(self) -> None:
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _hcmds(self, user:str, cmd: list) -> None:
|
def cmd(self, data:list) -> None:
|
||||||
|
return None
|
||||||
|
|
||||||
command = cmd[0].lower()
|
def _hcmds(self, user:str, cmd: list, fullcmd: list = []) -> None:
|
||||||
|
|
||||||
|
command = str(cmd[0]).lower()
|
||||||
|
dnickname = self.Config.SERVICE_NICKNAME
|
||||||
|
fromuser = user
|
||||||
|
|
||||||
match command:
|
match command:
|
||||||
|
|
||||||
case 'test':
|
case 'test':
|
||||||
try:
|
try:
|
||||||
user_action = cmd[1]
|
|
||||||
self.irc.send2socket(f'PRIVMSG #webmail Je vais voicer {user}')
|
self.Irc.send2socket(f":{dnickname} NOTICE {fromuser} : test command ready ...")
|
||||||
self.irc.send2socket(f'MODE #webmail +v {user_action}')
|
self.Logs.debug(f"Test logs ready")
|
||||||
self.core.create_log(f"MODE +v sur {user_action}")
|
|
||||||
except KeyError as ke:
|
except KeyError as ke:
|
||||||
self.core.create_log(f"Key Error : {ke}")
|
self.Logs.error(f"Key Error : {ke}")
|
||||||
|
|
||||||
440
mods/mod_votekick.py
Normal file
440
mods/mod_votekick.py
Normal file
@@ -0,0 +1,440 @@
|
|||||||
|
from core.irc import Irc
|
||||||
|
import re
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
|
||||||
|
# Activer le systeme sur un salon (activate #salon)
|
||||||
|
# Le service devra se connecter au salon
|
||||||
|
# Le service devra se mettre en op
|
||||||
|
# Soumettre un nom de user (submit nickname)
|
||||||
|
# voter pour un ban (vote_for)
|
||||||
|
# voter contre un ban (vote_against)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class Votekick():
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class VoteChannelModel:
|
||||||
|
channel_name: str
|
||||||
|
target_user: str
|
||||||
|
voter_users: list
|
||||||
|
vote_for: int
|
||||||
|
vote_against: int
|
||||||
|
|
||||||
|
VOTE_CHANNEL_DB:list[VoteChannelModel] = []
|
||||||
|
|
||||||
|
def __init__(self, ircInstance:Irc) -> None:
|
||||||
|
# Add Irc Object to the module
|
||||||
|
self.Irc = ircInstance
|
||||||
|
|
||||||
|
# Add Global Configuration to the module
|
||||||
|
self.Config = ircInstance.Config
|
||||||
|
|
||||||
|
# Add Base object to the module
|
||||||
|
self.Base = ircInstance.Base
|
||||||
|
|
||||||
|
# Add logs object to the module
|
||||||
|
self.Logs = ircInstance.Base.logs
|
||||||
|
|
||||||
|
# Add User object to the module
|
||||||
|
self.User = ircInstance.User
|
||||||
|
|
||||||
|
# Add Channel object to the module
|
||||||
|
self.Channel = ircInstance.Channel
|
||||||
|
|
||||||
|
# Créer les nouvelles commandes du module
|
||||||
|
self.commands_level = {
|
||||||
|
0: ['vote_for', 'vote_against'],
|
||||||
|
1: ['activate', 'deactivate', 'submit', 'vote_stat', 'vote_verdict', 'vote_cancel']
|
||||||
|
}
|
||||||
|
|
||||||
|
# Init the module
|
||||||
|
self.__init_module()
|
||||||
|
|
||||||
|
# Log the module
|
||||||
|
self.Logs.debug(f'Module {self.__class__.__name__} loaded ...')
|
||||||
|
|
||||||
|
def __init_module(self) -> None:
|
||||||
|
|
||||||
|
self.__set_commands(self.commands_level)
|
||||||
|
self.__create_tables()
|
||||||
|
self.join_saved_channels()
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def __set_commands(self, commands:dict[int, list[str]]) -> None:
|
||||||
|
"""### Rajoute les commandes du module au programme principal
|
||||||
|
|
||||||
|
Args:
|
||||||
|
commands (list): Liste des commandes du module
|
||||||
|
"""
|
||||||
|
for level, com in commands.items():
|
||||||
|
for c in commands[level]:
|
||||||
|
if not c in self.Irc.commands:
|
||||||
|
self.Irc.commands_level[level].append(c)
|
||||||
|
self.Irc.commands.append(c)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def __create_tables(self) -> None:
|
||||||
|
"""Methode qui va créer la base de donnée si elle n'existe pas.
|
||||||
|
Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module
|
||||||
|
Args:
|
||||||
|
database_name (str): Nom de la base de données ( pas d'espace dans le nom )
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
None: Aucun retour n'es attendu
|
||||||
|
"""
|
||||||
|
|
||||||
|
table_logs = '''CREATE TABLE IF NOT EXISTS votekick_logs (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
datetime TEXT,
|
||||||
|
server_msg TEXT
|
||||||
|
)
|
||||||
|
'''
|
||||||
|
|
||||||
|
table_vote = '''CREATE TABLE IF NOT EXISTS votekick_channel (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
datetime TEXT,
|
||||||
|
channel TEXT
|
||||||
|
)
|
||||||
|
'''
|
||||||
|
|
||||||
|
self.Base.db_execute_query(table_logs)
|
||||||
|
self.Base.db_execute_query(table_vote)
|
||||||
|
return None
|
||||||
|
|
||||||
|
def unload(self) -> None:
|
||||||
|
try:
|
||||||
|
for chan in self.VOTE_CHANNEL_DB:
|
||||||
|
self.Irc.send2socket(f":{self.Config.SERVICE_NICKNAME} PART {chan.channel_name}")
|
||||||
|
|
||||||
|
self.VOTE_CHANNEL_DB = []
|
||||||
|
self.Logs.debug(f'Delete memory DB VOTE_CHANNEL_DB: {self.VOTE_CHANNEL_DB}')
|
||||||
|
|
||||||
|
return None
|
||||||
|
except UnboundLocalError as ne:
|
||||||
|
self.Logs.error(f'{ne}')
|
||||||
|
except NameError as ue:
|
||||||
|
self.Logs.error(f'{ue}')
|
||||||
|
except:
|
||||||
|
self.Logs.error('Error on the module')
|
||||||
|
|
||||||
|
def init_vote_system(self, channel: str) -> bool:
|
||||||
|
|
||||||
|
response = False
|
||||||
|
for chan in self.VOTE_CHANNEL_DB:
|
||||||
|
if chan.channel_name == channel:
|
||||||
|
chan.target_user = ''
|
||||||
|
chan.voter_users = []
|
||||||
|
chan.vote_against = 0
|
||||||
|
chan.vote_for = 0
|
||||||
|
response = True
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
def insert_vote_channel(self, ChannelObject: VoteChannelModel) -> bool:
|
||||||
|
result = False
|
||||||
|
found = False
|
||||||
|
for chan in self.VOTE_CHANNEL_DB:
|
||||||
|
if chan.channel_name == ChannelObject.channel_name:
|
||||||
|
found = True
|
||||||
|
|
||||||
|
if not found:
|
||||||
|
self.VOTE_CHANNEL_DB.append(ChannelObject)
|
||||||
|
self.Logs.debug(f"The channel has been added {ChannelObject}")
|
||||||
|
self.db_add_vote_channel(ChannelObject.channel_name)
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def db_add_vote_channel(self, channel:str) -> bool:
|
||||||
|
"""Cette fonction ajoute les salons ou seront autoriser les votes
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel (str): le salon à enregistrer.
|
||||||
|
"""
|
||||||
|
current_datetime = self.Base.get_datetime()
|
||||||
|
mes_donnees = {'channel': channel}
|
||||||
|
|
||||||
|
response = self.Base.db_execute_query("SELECT id FROM votekick_channel WHERE channel = :channel", mes_donnees)
|
||||||
|
|
||||||
|
isChannelExist = response.fetchone()
|
||||||
|
|
||||||
|
if isChannelExist is None:
|
||||||
|
mes_donnees = {'datetime': current_datetime, 'channel': channel}
|
||||||
|
insert = self.Base.db_execute_query(f"INSERT INTO votekick_channel (datetime, channel) VALUES (:datetime, :channel)", mes_donnees)
|
||||||
|
if insert.rowcount > 0:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def db_delete_vote_channel(self, channel: str) -> bool:
|
||||||
|
"""Cette fonction supprime les salons de join de Defender
|
||||||
|
|
||||||
|
Args:
|
||||||
|
channel (str): le salon à enregistrer.
|
||||||
|
"""
|
||||||
|
mes_donnes = {'channel': channel}
|
||||||
|
response = self.Base.db_execute_query("DELETE FROM votekick_channel WHERE channel = :channel", mes_donnes)
|
||||||
|
|
||||||
|
affected_row = response.rowcount
|
||||||
|
|
||||||
|
if affected_row > 0:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def join_saved_channels(self) -> None:
|
||||||
|
|
||||||
|
result = self.Base.db_execute_query("SELECT id, channel FROM votekick_channel")
|
||||||
|
channels = result.fetchall()
|
||||||
|
unixtime = self.Base.get_unixtime()
|
||||||
|
|
||||||
|
for channel in channels:
|
||||||
|
id, chan = channel
|
||||||
|
self.insert_vote_channel(self.VoteChannelModel(channel_name=chan, target_user='', voter_users=[], vote_for=0, vote_against=0))
|
||||||
|
self.Irc.send2socket(f":{self.Config.SERVEUR_ID} SJOIN {unixtime} {chan} + :{self.Config.SERVICE_ID}")
|
||||||
|
self.Irc.send2socket(f":{self.Config.SERVICE_NICKNAME} SAMODE {chan} +o {self.Config.SERVICE_NICKNAME}")
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def is_vote_ongoing(self, channel: str) -> bool:
|
||||||
|
|
||||||
|
response = False
|
||||||
|
for vote in self.VOTE_CHANNEL_DB:
|
||||||
|
if vote.channel_name == channel:
|
||||||
|
if vote.target_user:
|
||||||
|
response = True
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
def timer_vote_verdict(self, channel: str) -> None:
|
||||||
|
|
||||||
|
dnickname = self.Config.SERVICE_NICKNAME
|
||||||
|
|
||||||
|
for chan in self.VOTE_CHANNEL_DB:
|
||||||
|
if chan.channel_name == channel:
|
||||||
|
target_user = self.User.get_nickname(chan.target_user)
|
||||||
|
if chan.vote_for > chan.vote_against:
|
||||||
|
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :The user {self.Config.CONFIG_COLOR["gras"]}{target_user}{self.Config.CONFIG_COLOR["nogc"]} will be kicked from this channel')
|
||||||
|
self.Irc.send2socket(f":{dnickname} KICK {channel} {target_user} Following the vote, you are not welcome in {channel}")
|
||||||
|
elif chan.vote_for <= chan.vote_against:
|
||||||
|
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :This user will stay on this channel')
|
||||||
|
|
||||||
|
# Init the system
|
||||||
|
if self.init_vote_system(channel):
|
||||||
|
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :System vote re initiated')
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def cmd(self, data:list) -> None:
|
||||||
|
cmd = list(data).copy()
|
||||||
|
|
||||||
|
match cmd[2]:
|
||||||
|
case 'SJOIN':
|
||||||
|
pass
|
||||||
|
case _:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _hcmds(self, user:str, cmd: list, fullcmd: list = []) -> None:
|
||||||
|
# cmd is the command starting from the user command
|
||||||
|
# full cmd is sending the entire server response
|
||||||
|
|
||||||
|
command = str(cmd[0]).lower()
|
||||||
|
dnickname = self.Config.SERVICE_NICKNAME
|
||||||
|
fromuser = user
|
||||||
|
|
||||||
|
if len(fullcmd) >= 3:
|
||||||
|
fromchannel = str(fullcmd[2]).lower() if self.Base.Is_Channel(str(fullcmd[2]).lower()) else None
|
||||||
|
else:
|
||||||
|
fromchannel = None
|
||||||
|
|
||||||
|
if len(cmd) >= 2:
|
||||||
|
sentchannel = str(cmd[1]).lower() if self.Base.Is_Channel(str(cmd[1]).lower()) else None
|
||||||
|
else:
|
||||||
|
sentchannel = None
|
||||||
|
|
||||||
|
if not fromchannel is None:
|
||||||
|
channel = fromchannel
|
||||||
|
elif not sentchannel is None:
|
||||||
|
channel = sentchannel
|
||||||
|
else:
|
||||||
|
channel = None
|
||||||
|
|
||||||
|
match command:
|
||||||
|
|
||||||
|
case 'vote_cancel':
|
||||||
|
try:
|
||||||
|
if channel is None:
|
||||||
|
self.Logs.error(f"The channel is not known, defender can't cancel the vote")
|
||||||
|
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :You need to specify the channel => /msg {dnickname} vote_cancel #channel')
|
||||||
|
|
||||||
|
for vote in self.VOTE_CHANNEL_DB:
|
||||||
|
if vote.channel_name == channel:
|
||||||
|
self.init_vote_system(channel)
|
||||||
|
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :Vote system re-initiated')
|
||||||
|
|
||||||
|
except IndexError as ke:
|
||||||
|
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} vote_cancel #channel')
|
||||||
|
self.Logs.error(f'Index Error: {ke}')
|
||||||
|
|
||||||
|
case 'vote_for':
|
||||||
|
try:
|
||||||
|
# vote_for
|
||||||
|
channel = str(fullcmd[2]).lower()
|
||||||
|
for chan in self.VOTE_CHANNEL_DB:
|
||||||
|
if chan.channel_name == channel:
|
||||||
|
if fromuser in chan.voter_users:
|
||||||
|
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :You already submitted a vote')
|
||||||
|
else:
|
||||||
|
chan.vote_for += 1
|
||||||
|
chan.voter_users.append(fromuser)
|
||||||
|
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :Vote recorded, thank you')
|
||||||
|
|
||||||
|
except KeyError as ke:
|
||||||
|
self.Logs.error(f'Key Error: {ke}')
|
||||||
|
except IndexError as ie:
|
||||||
|
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} :/msg {dnickname} vote_cancel #channel')
|
||||||
|
self.Logs.error(f'Index Error: {ie}')
|
||||||
|
|
||||||
|
case 'vote_against':
|
||||||
|
try:
|
||||||
|
# vote_against
|
||||||
|
channel = str(fullcmd[2]).lower()
|
||||||
|
for chan in self.VOTE_CHANNEL_DB:
|
||||||
|
if chan.channel_name == channel:
|
||||||
|
if fromuser in chan.voter_users:
|
||||||
|
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :You already submitted a vote')
|
||||||
|
else:
|
||||||
|
chan.vote_against += 1
|
||||||
|
chan.voter_users.append(fromuser)
|
||||||
|
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :Vote recorded, thank you')
|
||||||
|
|
||||||
|
except KeyError as ke:
|
||||||
|
self.Logs.error(f'Key Error: {ke}')
|
||||||
|
|
||||||
|
case 'vote_stat':
|
||||||
|
try:
|
||||||
|
# channel = str(fullcmd[2]).lower()
|
||||||
|
for chan in self.VOTE_CHANNEL_DB:
|
||||||
|
if chan.channel_name == channel:
|
||||||
|
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :Channel: {chan.channel_name} | Target: {self.User.get_nickname(chan.target_user)} | For: {chan.vote_for} | Against: {chan.vote_against} | Number of voters: {str(len(chan.voter_users))}')
|
||||||
|
|
||||||
|
except KeyError as ke:
|
||||||
|
self.Logs.error(f'Key Error: {ke}')
|
||||||
|
|
||||||
|
case 'vote_verdict':
|
||||||
|
try:
|
||||||
|
# channel = str(fullcmd[2]).lower()
|
||||||
|
for chan in self.VOTE_CHANNEL_DB:
|
||||||
|
if chan.channel_name == channel:
|
||||||
|
target_user = self.User.get_nickname(chan.target_user)
|
||||||
|
if chan.vote_for > chan.vote_against:
|
||||||
|
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :The user {self.Config.CONFIG_COLOR["gras"]}{target_user}{self.Config.CONFIG_COLOR["nogc"]} will be kicked from this channel')
|
||||||
|
self.Irc.send2socket(f":{dnickname} KICK {channel} {target_user} Following the vote, you are not welcome in {channel}")
|
||||||
|
elif chan.vote_for <= chan.vote_against:
|
||||||
|
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :This user will stay on this channel')
|
||||||
|
|
||||||
|
# Init the system
|
||||||
|
if self.init_vote_system(channel):
|
||||||
|
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :System vote re initiated')
|
||||||
|
|
||||||
|
except KeyError as ke:
|
||||||
|
self.Logs.error(f'Key Error: {ke}')
|
||||||
|
|
||||||
|
case 'submit':
|
||||||
|
# submit nickname
|
||||||
|
try:
|
||||||
|
nickname_submitted = cmd[1]
|
||||||
|
# channel = str(fullcmd[2]).lower()
|
||||||
|
uid_submitted = self.User.get_uid(nickname_submitted)
|
||||||
|
user_submitted = self.User.get_User(nickname_submitted)
|
||||||
|
|
||||||
|
# check if there is an ongoing vote
|
||||||
|
if self.is_vote_ongoing(channel):
|
||||||
|
for vote in self.VOTE_CHANNEL_DB:
|
||||||
|
if vote.channel_name == channel:
|
||||||
|
ongoing_user = self.User.get_nickname(vote.target_user)
|
||||||
|
|
||||||
|
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :There is an ongoing vote on {ongoing_user}')
|
||||||
|
return False
|
||||||
|
|
||||||
|
# check if the user exist
|
||||||
|
if user_submitted is None:
|
||||||
|
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :This nickname <{nickname_submitted}> do not exist')
|
||||||
|
return False
|
||||||
|
|
||||||
|
uid_cleaned = self.Base.clean_uid(uid_submitted)
|
||||||
|
ChannelInfo = self.Channel.get_Channel(channel)
|
||||||
|
|
||||||
|
clean_uids_in_channel: list = []
|
||||||
|
for uid in ChannelInfo.uids:
|
||||||
|
clean_uids_in_channel.append(self.Base.clean_uid(uid))
|
||||||
|
|
||||||
|
if not uid_cleaned in clean_uids_in_channel:
|
||||||
|
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :This nickname <{nickname_submitted}> is not available in this channel')
|
||||||
|
return False
|
||||||
|
|
||||||
|
# check if Ircop or Service or Bot
|
||||||
|
pattern = fr'[o|B|S]'
|
||||||
|
operator_user = re.findall(pattern, user_submitted.umodes)
|
||||||
|
if operator_user:
|
||||||
|
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :You cant vote for this user ! he/she is protected')
|
||||||
|
return False
|
||||||
|
|
||||||
|
for chan in self.VOTE_CHANNEL_DB:
|
||||||
|
if chan.channel_name == channel:
|
||||||
|
chan.target_user = self.User.get_uid(nickname_submitted)
|
||||||
|
|
||||||
|
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :{nickname_submitted} has been targeted for a vote')
|
||||||
|
|
||||||
|
self.Base.create_timer(60, self.timer_vote_verdict, (channel, ))
|
||||||
|
self.Irc.send2socket(f':{dnickname} PRIVMSG {channel} :This vote will end after 60 secondes')
|
||||||
|
|
||||||
|
except KeyError as ke:
|
||||||
|
self.Logs.error(f'Key Error: {ke}')
|
||||||
|
except TypeError as te:
|
||||||
|
self.Logs.error(te)
|
||||||
|
|
||||||
|
case 'activate':
|
||||||
|
try:
|
||||||
|
# activate #channel
|
||||||
|
# channel = str(cmd[1]).lower()
|
||||||
|
|
||||||
|
self.insert_vote_channel(
|
||||||
|
self.VoteChannelModel(
|
||||||
|
channel_name=channel,
|
||||||
|
target_user='',
|
||||||
|
voter_users=[],
|
||||||
|
vote_for=0,
|
||||||
|
vote_against=0
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
self.Irc.send2socket(f":{dnickname} JOIN {channel}")
|
||||||
|
self.Irc.send2socket(f":{dnickname} SAMODE {channel} +o {dnickname}")
|
||||||
|
self.Irc.send2socket(f":{dnickname} PRIVMSG {channel} :You can now use !submit <nickname> to decide if he will stay or not on this channel ")
|
||||||
|
|
||||||
|
except KeyError as ke:
|
||||||
|
self.Logs.error(f"Key Error : {ke}")
|
||||||
|
|
||||||
|
case 'deactivate':
|
||||||
|
try:
|
||||||
|
# deactivate #channel
|
||||||
|
# channel = str(cmd[1]).lower()
|
||||||
|
|
||||||
|
self.Irc.send2socket(f":{dnickname} SAMODE {channel} -o {dnickname}")
|
||||||
|
self.Irc.send2socket(f":{dnickname} PART {channel}")
|
||||||
|
|
||||||
|
for chan in self.VOTE_CHANNEL_DB:
|
||||||
|
if chan.channel_name == channel:
|
||||||
|
self.VOTE_CHANNEL_DB.remove(chan)
|
||||||
|
self.db_delete_vote_channel(chan.channel_name)
|
||||||
|
|
||||||
|
self.Logs.debug(f"Test logs ready")
|
||||||
|
except KeyError as ke:
|
||||||
|
self.Logs.error(f"Key Error : {ke}")
|
||||||
@@ -1,3 +1,3 @@
|
|||||||
{
|
{
|
||||||
"version": "4.1.0"
|
"version": "5.0.1"
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user