mirror of
https://github.com/iio612/DEFENDER.git
synced 2026-02-13 11:14:23 +00:00
V6.2.1 Adding main module utils and rehash utils to manager reload/rehash/restart
This commit is contained in:
@@ -2,14 +2,20 @@
|
||||
Main utils library.
|
||||
'''
|
||||
import gc
|
||||
import ssl
|
||||
import socket
|
||||
from pathlib import Path
|
||||
from re import sub
|
||||
from typing import Literal, Optional, Any
|
||||
from re import match, sub
|
||||
import sys
|
||||
from typing import Literal, Optional, Any, TYPE_CHECKING
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from time import time
|
||||
from random import choice
|
||||
from hashlib import md5, sha3_512
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.irc import Irc
|
||||
|
||||
def convert_to_int(value: Any) -> Optional[int]:
|
||||
"""Convert a value to int
|
||||
|
||||
@@ -51,6 +57,48 @@ def get_datetime() -> datetime:
|
||||
"""
|
||||
return datetime.now()
|
||||
|
||||
def get_ssl_context() -> ssl.SSLContext:
|
||||
"""Generate the ssl context
|
||||
|
||||
Returns:
|
||||
SSLContext: The SSL Context
|
||||
"""
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
ctx.check_hostname = False
|
||||
ctx.verify_mode = ssl.CERT_NONE
|
||||
return ctx
|
||||
|
||||
def create_socket(uplink: 'Irc') -> None:
|
||||
"""Create a socket to connect SSL or Normal connection
|
||||
"""
|
||||
try:
|
||||
soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM or socket.SOCK_NONBLOCK)
|
||||
connexion_information = (uplink.Config.SERVEUR_IP, uplink.Config.SERVEUR_PORT)
|
||||
|
||||
if uplink.Config.SERVEUR_SSL:
|
||||
# Create SSL Context object
|
||||
ssl_context = get_ssl_context()
|
||||
ssl_connexion = ssl_context.wrap_socket(soc, server_hostname=uplink.Config.SERVEUR_HOSTNAME)
|
||||
ssl_connexion.connect(connexion_information)
|
||||
uplink.IrcSocket = ssl_connexion
|
||||
uplink.Config.SSL_VERSION = uplink.IrcSocket.version()
|
||||
uplink.Logs.info(f"-- Connected using SSL : Version = {uplink.Config.SSL_VERSION}")
|
||||
else:
|
||||
soc.connect(connexion_information)
|
||||
uplink.IrcSocket = soc
|
||||
uplink.Logs.info("-- Connected in a normal mode!")
|
||||
|
||||
return None
|
||||
|
||||
except (ssl.SSLEOFError, ssl.SSLError) as soe:
|
||||
uplink.Logs.critical(f"[SSL ERROR]: {soe}")
|
||||
except OSError as oe:
|
||||
uplink.Logs.critical(f"[OS Error]: {oe}")
|
||||
if 'connection refused' in str(oe).lower():
|
||||
sys.exit(oe)
|
||||
except AttributeError as ae:
|
||||
uplink.Logs.critical(f"AttributeError: {ae}")
|
||||
|
||||
def run_python_garbage_collector() -> int:
|
||||
"""Run Python garbage collector
|
||||
|
||||
@@ -131,3 +179,19 @@ def clean_uid(uid: str) -> Optional[str]:
|
||||
parsed_UID = sub(pattern, '', uid)
|
||||
|
||||
return parsed_UID
|
||||
|
||||
def hide_sensitive_data(srvmsg: list[str]) -> list[str]:
|
||||
try:
|
||||
srv_msg = srvmsg.copy()
|
||||
privmsg_index = srv_msg.index('PRIVMSG')
|
||||
auth_index = privmsg_index + 2
|
||||
if match(r'^:{1}\W?(auth)$', srv_msg[auth_index]) is None:
|
||||
return srv_msg
|
||||
|
||||
for l in range(auth_index + 1, len(srv_msg)):
|
||||
srv_msg[l] = '*' * len(srv_msg[l])
|
||||
|
||||
return srv_msg
|
||||
|
||||
except ValueError:
|
||||
return srvmsg
|
||||
Reference in New Issue
Block a user