Mise en forme du code

This commit is contained in:
adator85
2024-03-20 20:33:50 +01:00
parent 667281ffb4
commit 681e0da041
4 changed files with 35 additions and 37 deletions

View File

@@ -32,7 +32,7 @@ class Base:
self.db_create_first_admin() # Créer un nouvel admin si la base de données est vide
def get_unixtime(self)->int:
def get_unixtime(self) -> int:
"""
Cette fonction retourne un UNIXTIME de type 12365456
Return: Current time in seconds since the Epoch (int)
@@ -40,7 +40,7 @@ class Base:
unixtime = int( time.time() )
return unixtime
def get_datetime(self)->str:
def get_datetime(self) -> str:
"""
Retourne une date au format string (24-12-2023 20:50:59)
"""
@@ -137,7 +137,7 @@ class Base:
(:createdOn, :user, :password, :hostname, :vhost, :level)"""
, mes_donnees)
pass
return None
def create_timer(self, time_to_wait: float, func: object, func_args: tuple = ()) -> None:
@@ -243,7 +243,7 @@ class Base:
self.running_sockets.remove(soc)
print(f"> Socket ==> closed {str(soc.fileno())}")
pass
return None
def db_init(self) -> tuple[Engine, Connection]:

View File

@@ -37,7 +37,7 @@ class Install:
print(f"===> Version of python : {python_version()} ==> OK")
return True
def checkDependencies(self) -> None:
"""### Verifie les dépendances si elles sont installées
- Test si les modules sont installés

View File

@@ -1,6 +1,6 @@
import ssl, re, importlib, sys, time, threading, socket
from datetime import datetime, timedelta
from typing import Union
from core.configuration import Config
from core.base import Base
@@ -130,7 +130,6 @@ class Irc:
Args:
writer (StreamWriter): permet l'envoi des informations au serveur.
"""
nickname = self.Config.SERVICE_NICKNAME
username = self.Config.SERVICE_USERNAME
realname = self.Config.SERVICE_REALNAME
@@ -151,8 +150,6 @@ class Irc:
unixtime = self.Base.get_unixtime()
# Envoyer un message d'identification
# strtobytes = bytes(":" + sid + " PASS :" + password + "\r\n", 'utf-8')
# self.IrcSocket.send(strtobytes)
writer.send(f":{sid} PASS :{password}\r\n".encode('utf-8'))
writer.send(f":{sid} PROTOCTL NICKv2 VHP UMODE2 NICKIP SJOIN SJOIN2 SJ3 NOQUIT TKLEXT MLOCK SID MTAGS\r\n".encode('utf-8'))
writer.send(f":{sid} PROTOCTL EAUTH={link},,,{service_name}-v{version}\r\n".encode('utf-8'))
@@ -163,12 +160,10 @@ class Irc:
writer.send(f":{sid} SJOIN {unixtime} {chan} + :{service_id}\r\n".encode('utf-8'))
writer.send(f":{sid} MODE {chan} +{cmodes}\r\n".encode('utf-8'))
writer.send(f":{service_id} SAMODE {chan} +{umodes} {nickname}\r\n".encode('utf-8'))
# writer.write(f"USER {nickname} {username} {username} {nickname} {username} :{username}\r\n".encode('utf-8'))
# writer.write(f"USER {username} {username} {username} :{username}\r\n".encode('utf-8'))
# writer.write(f"NICK {nickname}\r\n".encode('utf-8'))
def send2socket(self, send_message:str)->None:
return None
def send2socket(self, send_message:str) -> None:
"""Envoit les commandes à envoyer au serveur.
Args:
@@ -207,6 +202,7 @@ class Irc:
##############################################
# FIN CONNEXION IRC #
##############################################
def load_existing_modules(self) -> None:
"""Charge les modules qui existe déja dans la base de données
@@ -219,7 +215,7 @@ class Irc:
return None
def get_defender_uptime(self)->str:
def get_defender_uptime(self) -> str:
"""Savoir depuis quand Defender est connecté
Returns:
@@ -228,7 +224,7 @@ class Irc:
current_datetime = datetime.now()
diff_date = current_datetime - self.defender_connexion_datetime
uptime = timedelta(days=diff_date.days, seconds=diff_date.seconds)
return uptime
def heartbeat(self, beat:float) -> None:
@@ -379,7 +375,7 @@ class Irc:
return None
def update_db_uid(self, uid:str, newnickname:str) -> None:
# Récupérer l'ancien nickname
oldnickname = self.db_uid[uid]['nickname']
@@ -391,7 +387,7 @@ class Irc:
'umodes': self.db_uid[uid]['umodes'],
'vhost': self.db_uid[uid]['vhost']
}
# Modification du nickname dans la ligne UID
self.db_uid[uid]['nickname'] = newnickname
@@ -403,7 +399,7 @@ class Irc:
response = False
self.debug(f"{oldnickname} changed to {newnickname}")
return None
def delete_db_uid(self, uid:str) -> None:
@@ -430,8 +426,6 @@ class Irc:
umodes = self.db_uid[uid]['umodes']
vhost = self.db_uid[uid]['vhost']
level = int(level)
self.db_admin[uid] = {
'nickname': nickname,
@@ -481,7 +475,7 @@ class Irc:
"""
if channel in self.db_chan:
return False
response = True
# Ajouter un nouveau salon
self.db_chan.append(channel)
@@ -535,7 +529,7 @@ class Irc:
self.debug(response)
return response
def get_uid(self, uidornickname:str) -> str | None:
def get_uid(self, uidornickname:str) -> Union[str, None]:
uid_recherche = uidornickname
response = None
@@ -548,8 +542,8 @@ class Irc:
return response
def get_nickname(self, uidornickname:str) -> str | None:
def get_nickname(self, uidornickname:str) -> Union[str, None]:
nickname_recherche = uidornickname
response = None
@@ -563,11 +557,11 @@ class Irc:
return response
def is_cmd_allowed(self,nickname:str, cmd:str) -> bool:
# Vérifier si le user est identifié et si il a les droits
is_command_allowed = False
uid = self.get_uid(nickname)
if uid in self.db_admin:
admin_level = self.db_admin[uid]['level']
@@ -703,7 +697,7 @@ class Irc:
cmd.pop(0)
uid_who_quit = str(cmd[0]).replace(':', '')
self.delete_db_uid(uid_who_quit)
case 'PONG':
# ['@msgid=aTNJhp17kcPboF5diQqkUL;time=2023-12-28T20:35:58.411Z', ':irc.deb.biz.st', 'PONG', 'irc.deb.biz.st', ':Dev-PyDefender']
self.Base.execute_periodic_action()
@@ -785,13 +779,13 @@ class Irc:
if arg[0] == '\x01VERSION\x01':
self.send2socket(f':{dnickname} NOTICE {user_trigger} :\x01VERSION Service {self.Config.SERVICE_NICKNAME} V{self.Config.DEFENDER_VERSION}\x01')
return False
# Réponse a un TIME
if arg[0] == '\x01TIME\x01':
current_datetime = self.Base.get_datetime()
self.send2socket(f':{dnickname} NOTICE {user_trigger} :\x01TIME {current_datetime}\x01')
return False
# Réponse a un PING
if arg[0] == '\x01PING':
recieved_unixtime = int(arg[1].replace('\x01',''))
@@ -874,7 +868,7 @@ class Irc:
query = f"SELECT id, level FROM {self.Base.DB_SCHEMA['admins']} WHERE user = :user AND password = :password"
result = self.Base.db_execute_query(query, mes_donnees)
user_from_db = result.fetchone()
if not user_from_db is None:
uid_user = self.get_uid(user_to_log)
self.insert_db_admin(uid_user, user_from_db[1])
@@ -1102,7 +1096,7 @@ class Irc:
except IndexError:
self.debug('_hcmd die: out of index')
self.send2socket(f"QUIT Good bye")
case 'restart':
@@ -1158,9 +1152,9 @@ class Irc:
# .sentinel on
activation = str(cmd[1]).lower()
service_id = self.Config.SERVICE_ID
channel_to_dont_quit = [self.Config.SALON_JAIL, dchanlog]
if activation == 'on':
for chan in self.db_chan:
if not chan in channel_to_dont_quit:

View File

@@ -1,4 +1,5 @@
from datetime import datetime
from typing import Union
import re, socket, psutil, requests, json
from core.irc import Irc
@@ -355,7 +356,7 @@ class Defender():
return uptime_minutes
def system_reputation(self, uid:str)->None:
def system_reputation(self, uid:str)-> None:
# Reputation security
# - Activation ou désactivation du système --> OK
# - Le user sera en mesure de changer la limite de la réputation --> OK
@@ -578,7 +579,7 @@ class Defender():
return matching_ports
def abuseipdb_scan(self, remote_ip:str) -> dict[str, any] | None:
def abuseipdb_scan(self, remote_ip:str) -> Union[dict[str, any], None]:
"""Analyse l'ip avec AbuseIpDB
Cette methode devra etre lancer toujours via un thread ou un timer.
Args:
@@ -635,7 +636,7 @@ class Defender():
except KeyError as ke:
self.Irc.debug(f"AbuseIpDb KeyError : {ke}")
def freeipapi_scan(self, remote_ip:str) -> dict[str, any] | None:
def freeipapi_scan(self, remote_ip:str) -> Union[dict[str, any], None]:
"""Analyse l'ip avec Freeipapi
Cette methode devra etre lancer toujours via un thread ou un timer.
Args:
@@ -807,6 +808,9 @@ class Defender():
if self.defConfig['abuseipdb_scan'] == 1:
self.Base.create_thread(self.abuseipdb_scan, (cmd[7], ))
if self.defConfig['freeipapi_scan'] == 1:
self.Base.create_thread(self.freeipapi_scan, (cmd[7], ))
case 'NICK':
# :0010BS24L NICK [NEWNICK] 1697917711
# Changement de nickname