Compare commits

42 Commits

Author SHA1 Message Date
adator
c371910066 Creating JSONRPC Server 2025-11-16 19:13:26 +01:00
adator
2e422c93e5 Base.py refactoring. Nothing fancy. 2025-11-16 13:35:09 +01:00
adator
ba989b7f26 Update debug message. 2025-11-16 13:22:33 +01:00
adator
fc01de34b2 Adding new debug messages when module exist or not 2025-11-16 13:21:57 +01:00
adator
a3dcc20a06 Handle SETHOST response to update the vhost of the user 2025-11-16 13:20:11 +01:00
adator
7ffc58d4ff Trigger a thread clean-up before running a new thread 2025-11-16 13:15:49 +01:00
adator
6a0d4e2286 Updating some translation, refactoring the code! 2025-11-11 03:49:16 +01:00
adator
7dd15f2dac Updating translation! 2025-11-11 03:48:37 +01:00
adator
10cad7cda6 Refactoring code! 2025-11-11 03:48:20 +01:00
adator
999072a88a Refactoring unreal6 code! 2025-11-11 03:47:02 +01:00
adator
8932e1441a Update docstring of the test module. 2025-11-10 23:38:19 +01:00
adator
9cee758b6f Remove unused imported library! 2025-11-10 23:13:17 +01:00
adator
511e0c0715 Introduce MOD_HEADER in all modules. impact modules.py, definition.py, unreal6 protocol. 2025-11-10 23:09:13 +01:00
adator
371c8fb5f1 Exclude modules.txt files from the commit 2025-11-10 23:08:18 +01:00
adator
401e785383 Remove deprecated class (abstractproperty). 2025-11-10 00:15:53 +01:00
adator
a7efede75e Introduce MOD_HEADER constante in all modules as mandatory constante. 2025-11-10 00:13:35 +01:00
adator
a1254c7a39 Refactoring the Code, Dispatch server responses to all modules including UID, avoid multi calls to get_user, get_nickname... 2025-11-09 23:39:19 +01:00
adator
c05990f862 Refactoring the Code! 2025-11-09 23:37:42 +01:00
adator
de2b5fa8e2 Refactoring the Code, comment the code to dispatch the server response to all modules 2025-11-09 23:36:58 +01:00
adator
371645149d Refactoring the Code, clean unsused methods. avoid multi calls to get_user, get_nickname ... 2025-11-09 23:35:27 +01:00
adator
a6cf11ae2a Update the log level when userobj is not found! 2025-11-09 23:33:17 +01:00
adator
445cbc27b0 exclude users.txt file 2025-11-09 23:32:21 +01:00
adator
f9eb374798 When fp is None, return False. log when login via fingerprint. 2025-11-09 20:53:30 +01:00
adator
17cb2ada5f Fix server response, fix ircd parser aswell, update first setup on base.py 2025-11-08 21:21:38 +01:00
adator
b52a57f95a Fix library path in settings.py 2025-11-07 22:46:09 +01:00
adator
1bfd95c291 refactoring code 2025-11-06 20:12:28 +01:00
adator
0c6c3cd6ac Refactoring the code, create new folder modules. 2025-11-02 22:52:27 +01:00
adator
0e6384c26c modify and move protocol interface to interfaces folder. refactoring all dependencies 2025-11-02 22:21:55 +01:00
adator
79c1b94a92 Update parse_quit, now it returns MUser object and the reason. 2025-11-02 21:28:44 +01:00
adator
5a1432c1e6 Update parse_uid, now it returns MUser object. 2025-11-02 21:17:15 +01:00
adator
34b5b4204e Update parse_privmsg, now it returns sender, reciever, channel objects and the message 2025-11-02 20:58:56 +01:00
adator
ff58cbb022 Adding serveur_protocol to the configuration exemple. 2025-11-02 00:16:42 +01:00
adator
6450418859 Update docstring 2025-11-02 00:16:04 +01:00
adator
9f2da13f88 Unload the module when the protocol is not unreal6 2025-11-02 00:15:43 +01:00
adator
0117e1dd3a Update the protocol inspircd to work with the protocol interfaces. 2025-11-02 00:14:36 +01:00
adator
deb76baf30 Update the version of defender 6.3.2>>6.3.3 2025-11-01 22:24:57 +01:00
adator
29f049b3c3 Update the protocol interface, no more __init__ constructor needed in the child class! 2025-11-01 22:20:49 +01:00
adator
fb41a13d0a Move mod_clone to use the module interface. 2025-11-01 22:11:15 +01:00
adator
769ab8b632 update rest of modules to fit requirements 2025-11-01 22:00:08 +01:00
adator
2fbe75b83e update module management 2025-11-01 17:39:16 +01:00
adator
8abae5df3e fix db_patch, the table name is no more hard coded! 2025-11-01 15:57:46 +01:00
adator
1a71a6eb4d 1st version of module interface! 2025-10-30 00:35:50 +01:00
42 changed files with 2064 additions and 1835 deletions

4
.gitignore vendored
View File

@@ -10,4 +10,6 @@ configuration.yaml
configuration_inspircd.json
configuration_unreal6.json
*.log
test.py
test.py
users.txt
modules.txt

View File

@@ -43,6 +43,9 @@ endif
clean:
ifeq ($(OS), Linux)
@export echo $DBUS_SESSION_BUS_ADDRESS && \
systemctl --user stop defender
$(info Defender has been stopped...)
@if [ -e .pyenv ]; then \
rm -rf .pyenv; \
echo "Virtual Env has been removed!"; \

View File

@@ -6,6 +6,7 @@ configuration:
SERVEUR_PASSWORD: "YOUR_LINK_PASSWORD"
SERVEUR_ID: "006"
SERVEUR_SSL: true
SERVEUR_PROTOCOL: "unreal6" # unreal6 or inspircd
SERVICE_NAME: "defender"
SERVICE_NICKNAME: "PyDefender"
@@ -39,8 +40,8 @@ configuration:
API_TIMEOUT: 2
PORTS_TO_SCAN: [3028 8080 1080 1085 4145 9050]
WHITELISTED_IP: ["127.0.0.1"]
PORTS_TO_SCAN: [3028, 8080, 1080, 1085, 4145, 9050]
WHITELISTED_IP: ["127.0.0.1", "192.168.1.1"]
GLINE_DURATION: "30"
DEBUG_LEVEL: 20

View File

@@ -1,16 +1,12 @@
import importlib
import os
import re
import json
import sys
import time
import socket
import threading
import ipaddress
import ast
import requests
from pathlib import Path
from types import ModuleType
from dataclasses import fields
from typing import Any, Optional, TYPE_CHECKING
from base64 import b64decode, b64encode
@@ -30,7 +26,8 @@ class Base:
self.Utils = loader.Utils
self.logs = loader.Logs
self.check_for_new_version(True) # Verifier si une nouvelle version est disponible
# Check if new Defender version is available
self.check_for_new_version(True)
# Liste des timers en cours
self.running_timers: list[threading.Timer] = self.Settings.RUNNING_TIMERS
@@ -47,9 +44,17 @@ class Base:
# Création du lock
self.lock = self.Settings.LOCK
self.install: bool = False # Initialisation de la variable d'installation
self.engine, self.cursor = self.db_init() # Initialisation de la connexion a la base de données
self.__create_db() # Initialisation de la base de données
# Init install variable
self.install: bool = False
# Init database connection
self.engine, self.cursor = self.db_init()
# Create the database
# self.__create_db()
def init(self) -> None:
self.__create_db()
def __set_current_defender_version(self) -> None:
"""This will put the current version of Defender
@@ -325,7 +330,7 @@ class Base:
self.logs.error(f'Assertion Error -> {ae}')
return None
def create_thread(self, func:object, func_args: tuple = (), run_once:bool = False, daemon: bool = True) -> None:
def create_thread(self, func: object, func_args: tuple = (), run_once: bool = False, daemon: bool = True) -> None:
"""Create a new thread and store it into running_threads variable
Args:
@@ -334,6 +339,9 @@ class Base:
run_once (bool, optional): If you want to ensure that this method/function run once. Defaults to False.
"""
try:
# Clean unused threads first
self.garbage_collector_thread()
func_name = func.__name__
if run_once:
@@ -347,8 +355,8 @@ class Base:
self.running_threads.append(th)
self.logs.debug(f"-- Thread ID : {str(th.ident)} | Thread name : {th.name} | Running Threads : {len(threading.enumerate())}")
except AssertionError as ae:
self.logs.error(f'{ae}')
except Exception as err:
self.logs.error(err, exc_info=True)
def is_thread_alive(self, thread_name: str) -> bool:
"""Check if the thread is still running! using the is_alive method of Threads.
@@ -425,11 +433,11 @@ class Base:
if thread.name != 'heartbeat':
if not thread.is_alive():
self.running_threads.remove(thread)
self.logs.info(f"-- Thread {str(thread.name)} {str(thread.native_id)} removed")
self.logs.debug(f"-- Thread {str(thread.name)} {str(thread.native_id)} has been removed!")
# print(threading.enumerate())
except AssertionError as ae:
self.logs.error(f'Assertion Error -> {ae}')
except Exception as err:
self.logs.error(err, exc_info=True)
def garbage_collector_sockets(self) -> None:
@@ -485,7 +493,7 @@ class Base:
engine = create_engine(f'sqlite:///{full_path_db}.db', echo=False)
cursor = engine.connect()
self.logs.info("-- database connexion has been initiated")
self.logs.info("-- Database connexion has been initiated")
return engine, cursor
def __create_db(self) -> None:
@@ -539,6 +547,7 @@ class Base:
vhost TEXT,
password TEXT,
fingerprint TEXT,
language TEXT,
level INTEGER
)
'''
@@ -599,8 +608,8 @@ class Base:
def db_patch(self, table_name: str, column_name: str, column_type: str) -> bool:
if not self.db_is_column_exist(table_name, column_name):
patch = f"ALTER TABLE {self.Config.TABLE_ADMIN} ADD COLUMN {column_name} {column_type}"
update_row = f"UPDATE {self.Config.TABLE_ADMIN} SET language = 'EN' WHERE language is null"
patch = f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_type}"
update_row = f"UPDATE {table_name} SET language = 'EN' WHERE language is null"
self.db_execute_query(patch)
self.db_execute_query(update_row)
self.logs.debug(f"The patch has been applied")

View File

@@ -0,0 +1,125 @@
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Optional
from dataclasses import dataclass
if TYPE_CHECKING:
from core.irc import Irc
class IModule(ABC):
@abstractmethod
@dataclass
class ModConfModel:
"""The Model containing the module parameters
"""
def __init__(self, uplink: 'Irc') -> None:
# Module name (Mandatory)
self.module_name = 'mod_' + str(self.__class__.__name__).lower()
# Add Irc Object to the module (Mandatory)
self.Irc = uplink
# Add Loader object to the module (Mandatory)
self.Loader = uplink.Loader
# Add Protocol to the module (Mandatory)
self.Protocol = uplink.Protocol
# Add Global Configuration to the module (Mandatory)
self.Config = uplink.Config
# Add Settings to the module (Mandatory)
self.Settings = uplink.Settings
# Add Base object to the module (Mandatory)
self.Base = uplink.Base
# Add Main Utils (Mandatory)
self.MainUtils = uplink.Utils
# Add logs object to the module (Mandatory)
self.Logs = uplink.Loader.Logs
# Add User object to the module (Mandatory)
self.User = uplink.User
# Add Client object to the module (Mandatory)
self.Client = uplink.Client
# Add Admin object to the module (Mandatory)
self.Admin = uplink.Admin
# Add Channel object to the module (Mandatory)
self.Channel = uplink.Channel
# Add Reputation object to the module (Optional)
self.Reputation = uplink.Reputation
# Load the child classes
self.load()
# Inspect child classes
self.inspect_class()
self.create_tables()
# Sync the configuration with core configuration (Mandatory)
uplink.Base.db_sync_core_config(self.module_name, self.ModConfig)
# Log the module
self.Logs.debug(f'Loading Module {self.module_name} ...')
def update_configuration(self, param_key: str, param_value: str) -> None:
"""Update the local and core configuration
Args:
param_key (str): The parameter key
param_value (str): The parameter value
"""
self.Base.db_update_core_config(self.module_name, self.ModConfig, param_key, param_value)
def inspect_class(self):
if not hasattr(self, 'ModConfig'):
raise AttributeError("The Module must init ModConfig attribute in the load method!")
if not hasattr(self, 'MOD_HEADER'):
raise NotImplementedError(f"You must declare the header of the module in {self.__class__.__name__}!")
@abstractmethod
def create_tables(self) -> None:
"""Method that will create the database if it does not exist.
A single Session for this class will be created, which will be used within this class/module.
Returns:
None: No return is expected
"""
@abstractmethod
def load(self) -> None:
"""This method is executed when the module is loaded or reloaded.
"""
@abstractmethod
def unload(self) -> None:
"""This method is executed when the module is unloaded or reloaded.
"""
@abstractmethod
def cmd(self, data: list) -> None:
"""When recieving server messages.
Args:
data (list): The recieved message
"""
@abstractmethod
def hcmds(self, user: str, channel: Optional[str], cmd: list[str], fullcmd: Optional[list[str]] = None) -> None:
"""These are the commands recieved from a client
Args:
user (str): The client
channel (str|None): The channel if available
cmd (list): The user command sent
fullcmd (list, optional): The full server message. Defaults to [].
"""

View File

@@ -3,13 +3,38 @@ from typing import Optional, TYPE_CHECKING
from core.classes.protocols.command_handler import CommandHandler
if TYPE_CHECKING:
from core.definition import MClient, MSasl
from core.definition import MClient, MSasl, MUser, MChannel
from core.irc import Irc
class IProtocol(ABC):
Handler: Optional[CommandHandler] = None
def __init__(self, uplink: 'Irc'):
self.name: Optional[str] = None
self.protocol_version: int = -1
self.known_protocol: set[str] = set()
self._Irc = uplink
self._Config = uplink.Config
self._Base = uplink.Base
self._Settings = uplink.Base.Settings
self._Utils = uplink.Loader.Utils
self._Logs = uplink.Loader.Logs
self._User = uplink.User
self._Channel = uplink.Channel
self.Handler = CommandHandler(uplink.Loader)
self.init_protocol()
self._Logs.info(f"[PROTOCOL] Protocol [{self.__class__.__name__}] loaded!")
@abstractmethod
def init_protocol(self):
"""Init protocol
"""
@abstractmethod
def get_ircd_protocol_poisition(self, cmd: list[str], log: bool = False) -> tuple[int, Optional[str]]:
"""Get the position of known commands
@@ -288,56 +313,51 @@ class IProtocol(ABC):
# ------------------------------------------------------------------------
@abstractmethod
def parse_uid(self, serverMsg: list[str]) -> dict[str, str]:
def parse_uid(self, server_msg: list[str]) -> Optional['MUser']:
"""Parse UID and return dictionary.
Args:
serverMsg (list[str]): The UID IRCD message
server_msg (list[str]): The UID IRCD message
Returns:
dict[str, str]: The response as dictionary.
Optional[MUser]: The MUser object or None
"""
@abstractmethod
def parse_quit(self, serverMsg: list[str]) -> dict[str, str]:
def parse_quit(self, server_msg: list[str]) -> tuple[Optional['MUser'], str]:
"""Parse quit and return dictionary.
>>> [':97KAAAAAB', 'QUIT', ':Quit:', 'this', 'is', 'my', 'reason', 'to', 'quit']
Args:
serverMsg (list[str]): The server message to parse
server_msg (list[str]): The server message to parse
Returns:
dict[str, str]: The response as dictionary.
tuple[MUser, str]: The User Who Quit Object and the reason.
"""
@abstractmethod
def parse_nick(self, serverMsg: list[str]) -> dict[str, str]:
def parse_nick(self, server_msg: list[str]) -> tuple[Optional['MUser'], str, str]:
"""Parse nick changes and return dictionary.
>>> [':97KAAAAAC', 'NICK', 'testinspir', '1757360740']
Args:
serverMsg (list[str]): The server message to parse
server_msg (list[str]): The server message to parse
Returns:
dict[str, str]: The response as dictionary.
tuple(MUser, newnickname(str), timestamp(str)): Tuple of the response.
>>> MUser, newnickname, timestamp
"""
@abstractmethod
def parse_privmsg(self, serverMsg: list[str]) -> dict[str, str]:
def parse_privmsg(self, server_msg: list[str]) -> tuple[Optional['MUser'], Optional['MUser'], Optional['MChannel'], str]:
"""Parse PRIVMSG message.
>>> [':97KAAAAAE', 'PRIVMSG', '#welcome', ':This', 'is', 'my', 'public', 'message']
Args:
serverMsg (list[str]): The server message to parse
server_msg (list[str]): The server message to parse
Returns:
dict[str, str]: The response as dictionary.
```python
response = {
"uid": '97KAAAAAE',
"channel": '#welcome',
"message": 'This is my public message'
}
```
tuple[MUser(Sender), MUser(Reciever), MChannel, str]: Sender user model, reciever user model, Channel model, messgae.
"""
# ------------------------------------------------------------------------
@@ -345,174 +365,174 @@ class IProtocol(ABC):
# ------------------------------------------------------------------------
@abstractmethod
def on_svs2mode(self, serverMsg: list[str]) -> None:
def on_svs2mode(self, server_msg: list[str]) -> None:
"""Handle svs2mode coming from a server
>>> [':00BAAAAAG', 'SVS2MODE', '001U01R03', '-r']
Args:
serverMsg (list[str]): Original server message
server_msg (list[str]): Original server message
"""
@abstractmethod
def on_mode(self, serverMsg: list[str]) -> None:
def on_mode(self, server_msg: list[str]) -> None:
"""Handle mode coming from a server
Args:
serverMsg (list[str]): Original server message
server_msg (list[str]): Original server message
"""
@abstractmethod
def on_umode2(self, serverMsg: list[str]) -> None:
def on_umode2(self, server_msg: list[str]) -> None:
"""Handle umode2 coming from a server
>>> [':adator_', 'UMODE2', '-i']
Args:
serverMsg (list[str]): Original server message
server_msg (list[str]): Original server message
"""
@abstractmethod
def on_quit(self, serverMsg: list[str]) -> None:
def on_quit(self, server_msg: list[str]) -> None:
"""Handle quit coming from a server
Args:
serverMsg (list[str]): Original server message
server_msg (list[str]): Original server message
"""
@abstractmethod
def on_squit(self, serverMsg: list[str]) -> None:
def on_squit(self, server_msg: list[str]) -> None:
"""Handle squit coming from a server
Args:
serverMsg (list[str]): Original server message
server_msg (list[str]): Original server message
"""
@abstractmethod
def on_protoctl(self, serverMsg: list[str]) -> None:
def on_protoctl(self, server_msg: list[str]) -> None:
"""Handle protoctl coming from a server
Args:
serverMsg (list[str]): Original server message
server_msg (list[str]): Original server message
"""
@abstractmethod
def on_nick(self, serverMsg: list[str]) -> None:
def on_nick(self, server_msg: list[str]) -> None:
"""Handle nick coming from a server
new nickname
Args:
serverMsg (list[str]): Original server message
server_msg (list[str]): Original server message
"""
@abstractmethod
def on_sjoin(self, serverMsg: list[str]) -> None:
def on_sjoin(self, server_msg: list[str]) -> None:
"""Handle sjoin coming from a server
Args:
serverMsg (list[str]): Original server message
server_msg (list[str]): Original server message
"""
@abstractmethod
def on_part(self, serverMsg: list[str]) -> None:
def on_part(self, server_msg: list[str]) -> None:
"""Handle part coming from a server
Args:
serverMsg (list[str]): Original server message
server_msg (list[str]): Original server message
"""
@abstractmethod
def on_eos(self, serverMsg: list[str]) -> None:
def on_eos(self, server_msg: list[str]) -> None:
"""Handle EOS coming from a server
Args:
serverMsg (list[str]): Original server message
server_msg (list[str]): Original server message
"""
@abstractmethod
def on_reputation(self, serverMsg: list[str]) -> None:
def on_reputation(self, server_msg: list[str]) -> None:
"""Handle REPUTATION coming from a server
Args:
serverMsg (list[str]): Original server message
server_msg (list[str]): Original server message
"""
@abstractmethod
def on_uid(self, serverMsg: list[str]) -> None:
def on_uid(self, server_msg: list[str]) -> None:
"""Handle uid message coming from the server
Args:
serverMsg (list[str]): Original server message
server_msg (list[str]): Original server message
"""
@abstractmethod
def on_privmsg(self, serverMsg: list[str]) -> None:
def on_privmsg(self, server_msg: list[str]) -> None:
"""Handle PRIVMSG message coming from the server
Args:
serverMsg (list[str]): Original server message
server_msg (list[str]): Original server message
"""
@abstractmethod
def on_server_ping(self, serverMsg: list[str]) -> None:
def on_server_ping(self, server_msg: list[str]) -> None:
"""Send a PONG message to the server
Args:
serverMsg (list[str]): List of str coming from the server
server_msg (list[str]): List of str coming from the server
"""
@abstractmethod
def on_server(self, serverMsg: list[str]) -> None:
def on_server(self, server_msg: list[str]) -> None:
"""_summary_
Args:
serverMsg (list[str]): _description_
server_msg (list[str]): _description_
"""
@abstractmethod
def on_version(self, serverMsg: list[str]) -> None:
def on_version(self, server_msg: list[str]) -> None:
"""Sending Server Version to the server
Args:
serverMsg (list[str]): List of str coming from the server
server_msg (list[str]): List of str coming from the server
"""
@abstractmethod
def on_time(self, serverMsg: list[str]) -> None:
def on_time(self, server_msg: list[str]) -> None:
"""Sending TIME answer to a requestor
Args:
serverMsg (list[str]): List of str coming from the server
server_msg (list[str]): List of str coming from the server
"""
@abstractmethod
def on_ping(self, serverMsg: list[str]) -> None:
def on_ping(self, server_msg: list[str]) -> None:
"""Sending a PING answer to requestor
Args:
serverMsg (list[str]): List of str coming from the server
server_msg (list[str]): List of str coming from the server
"""
@abstractmethod
def on_version_msg(self, serverMsg: list[str]) -> None:
def on_version_msg(self, server_msg: list[str]) -> None:
"""Handle version coming from the server
\n ex. /version Defender
Args:
serverMsg (list[str]): Original message from the server
server_msg (list[str]): Original message from the server
"""
@abstractmethod
def on_smod(self, serverMsg: list[str]) -> None:
def on_smod(self, server_msg: list[str]) -> None:
"""Handle SMOD message coming from the server
Args:
serverMsg (list[str]): Original server message
server_msg (list[str]): Original server message
"""
@abstractmethod
def on_sasl(self, serverMsg: list[str]) -> Optional['MSasl']:
def on_sasl(self, server_msg: list[str]) -> Optional['MSasl']:
"""Handle SASL coming from a server
Args:
serverMsg (list[str]): Original server message
server_msg (list[str]): Original server message
Returns:
@@ -530,18 +550,27 @@ class IProtocol(ABC):
"""
@abstractmethod
def on_md(self, serverMsg: list[str]) -> None:
def on_md(self, server_msg: list[str]) -> None:
"""Handle MD responses
[':001', 'MD', 'client', '001MYIZ03', 'certfp', ':d1235648...']
Args:
serverMsg (list[str]): The server reply
server_msg (list[str]): The server reply
"""
@abstractmethod
def on_kick(self, serverMsg: list[str]) -> None:
def on_kick(self, server_msg: list[str]) -> None:
"""When a user is kicked out from a channel
Eg. ['@unrealircd.org...', ':001', 'KICK', '#jsonrpc', '001ELW13T', ':Kicked', 'from', 'JSONRPC', 'User']
Args:
serverMsg (list[str]): The server message
server_msg (list[str]): The server message
"""
@abstractmethod
def on_sethost(self, server_msg: list[str]) -> None:
"""On SETHOST command
>>> [':001DN7305', 'SETHOST', ':netadmin.example.org']
Args:
server_msg (list[str]): _description_
"""

View File

@@ -93,18 +93,11 @@ class Admin:
Returns:
bool: True if the admin has been deleted
"""
for record in self.UID_ADMIN_DB:
if record.uid == uidornickname:
# If the admin exist, delete and do not go further
self.UID_ADMIN_DB.remove(record)
self.Logs.debug(f'UID ({record.uid}) has been deleted')
return True
if record.nickname.lower() == uidornickname.lower():
# If the admin exist, delete and do not go further
self.UID_ADMIN_DB.remove(record)
self.Logs.debug(f'nickname ({record.nickname}) has been deleted')
return True
admin_obj = self.get_admin(uidornickname)
if admin_obj:
self.UID_ADMIN_DB.remove(admin_obj)
self.Logs.debug(f'UID ({admin_obj.uid}) has been deleted')
return True
self.Logs.debug(f'The UID {uidornickname} was not deleted')
@@ -190,6 +183,9 @@ class Admin:
Returns:
bool: True if found
"""
if fp is None:
return False
query = f"SELECT user, level, language FROM {self.Config.TABLE_ADMIN} WHERE fingerprint = :fp"
data = {'fp': fp}
exe = self.Base.db_execute_query(query, data)
@@ -200,12 +196,13 @@ class Admin:
language = result[2]
user_obj = self.User.get_user(uidornickname)
if user_obj:
admin_obj = self.Definition.MAdmin(**user_obj.to_dict(),account=account, level=level, language=language)
admin_obj = self.Definition.MAdmin(**user_obj.to_dict(), account=account, level=level, language=language)
if self.insert(admin_obj):
self.Setting.current_admin = admin_obj
self.Logs.debug(f"[Fingerprint login] {user_obj.nickname} ({admin_obj.account}) has been logged in successfully!")
return True
return False
return False
def db_is_admin_exist(self, admin_nickname: str) -> bool:
"""Verify if the admin exist in the database!

View File

@@ -11,10 +11,13 @@ if TYPE_CHECKING:
REHASH_MODULES = [
'core.definition',
'core.utils',
'core.classes.config',
'core.classes.modules.config',
'core.base',
'core.classes.commands',
'core.classes.protocols.interface',
'core.classes.modules.commands',
'core.classes.modules.rpc',
'core.classes.interfaces.iprotocol',
'core.classes.interfaces.imodule',
'core.classes.protocols.command_handler',
'core.classes.protocols.factory',
'core.classes.protocols.unreal6',
'core.classes.protocols.inspircd'
@@ -32,10 +35,6 @@ def restart_service(uplink: 'Irc', reason: str = "Restarting with no reason!") -
for module in uplink.ModuleUtils.model_get_loaded_modules().copy():
uplink.ModuleUtils.unload_one_module(uplink, module.module_name)
uplink.ModuleUtils.model_clear() # Clear loaded modules.
uplink.User.UID_DB.clear() # Clear User Object
uplink.Channel.UID_CHANNEL_DB.clear() # Clear Channel Object
uplink.Client.CLIENT_DB.clear() # Clear Client object
uplink.Base.garbage_collector_thread()
uplink.Logs.debug(f'[{uplink.Config.SERVICE_NICKNAME} RESTART]: Reloading configuration!')
@@ -58,6 +57,11 @@ def restart_service(uplink: 'Irc', reason: str = "Restarting with no reason!") -
uplink.Protocol = uplink.Loader.PFactory.get()
uplink.Protocol.register_command()
uplink.ModuleUtils.model_clear() # Clear loaded modules.
uplink.User.UID_DB.clear() # Clear User Object
uplink.Channel.UID_CHANNEL_DB.clear() # Clear Channel Object
uplink.Client.CLIENT_DB.clear() # Clear Client object
uplink.init_service_user()
uplink.Utils.create_socket(uplink)
uplink.Protocol.send_link()
@@ -66,6 +70,8 @@ def restart_service(uplink: 'Irc', reason: str = "Restarting with no reason!") -
def rehash_service(uplink: 'Irc', nickname: str) -> None:
need_a_restart = ["SERVEUR_ID"]
uplink.Settings.set_cache('db_commands', uplink.Commands.DB_COMMANDS)
uplink.Loader.RpcServer.stop_server()
restart_flag = False
config_model_bakcup = uplink.Config
mods = REHASH_MODULES
@@ -77,7 +83,7 @@ def rehash_service(uplink: 'Irc', nickname: str) -> None:
channel=uplink.Config.SERVICE_CHANLOG
)
uplink.Utils = sys.modules['core.utils']
uplink.Config = uplink.Loader.ConfModule.Configuration(uplink.Loader).configuration_model
uplink.Config = uplink.Loader.Config = uplink.Loader.ConfModule.Configuration(uplink.Loader).configuration_model
uplink.Config.HSID = config_model_bakcup.HSID
uplink.Config.DEFENDER_INIT = config_model_bakcup.DEFENDER_INIT
uplink.Config.DEFENDER_RESTART = config_model_bakcup.DEFENDER_RESTART
@@ -110,6 +116,8 @@ def rehash_service(uplink: 'Irc', nickname: str) -> None:
# Reload Main Commands Module
uplink.Commands = uplink.Loader.CommandModule.Command(uplink.Loader)
uplink.Loader.RpcServer = uplink.Loader.RpcServerModule.JSONRPCServer(uplink.Loader)
uplink.Loader.RpcServer.start_server()
uplink.Commands.DB_COMMANDS = uplink.Settings.get_cache('db_commands')
uplink.Loader.Base = uplink.Loader.BaseModule.Base(uplink.Loader)

View File

@@ -0,0 +1,240 @@
import base64
import json
import logging
from enum import Enum
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import TYPE_CHECKING, Any, Optional
from core.classes.modules.rpc.rpc_user import RPCUser
from core.classes.modules.rpc.rpc_channel import RPCChannel
from core.classes.modules.rpc.rpc_command import RPCCommand
if TYPE_CHECKING:
from core.loader import Loader
ProxyLoader: Optional['Loader'] = None
class RPCRequestHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
pass
def do_POST(self):
logs = ProxyLoader.Logs
self.server_version = 'Defender6'
self.sys_version = ProxyLoader.Config.CURRENT_VERSION
content_length = int(self.headers['Content-Length'])
body = self.rfile.read(content_length)
request_data: dict = json.loads(body)
rip, rport = self.client_address
if not self.authenticate(request_data):
return None
response_data = {
'jsonrpc': '2.0',
'id': request_data.get('id', 123)
}
method = request_data.get("method")
params: dict[str, Any] = request_data.get("params", {})
response_data['method'] = method
http_code = 200
match method:
case 'user.list':
user = RPCUser(ProxyLoader)
response_data['result'] = user.user_list()
logs.debug(f'[RPC] {method} recieved from {rip}:{rport}')
del user
case 'user.get':
user = RPCUser(ProxyLoader)
uid_or_nickname = params.get('uid_or_nickname', None)
response_data['result'] = user.user_get(uid_or_nickname)
logs.debug(f'[RPC] {method} recieved from {rip}:{rport}')
del user
case 'channel.list':
channel = RPCChannel(ProxyLoader)
response_data['result'] = channel.channel_list()
logs.debug(f'[RPC] {method} recieved from {rip}:{rport}')
del channel
case 'command.list':
command = RPCCommand(ProxyLoader)
response_data['result'] = command.command_list()
logs.debug(f'[RPC] {method} recieved from {rip}:{rport}')
del command
case 'command.get.by.module':
command = RPCCommand(ProxyLoader)
module_name = params.get('name', None)
response_data['result'] = command.command_get_by_module(module_name)
logs.debug(f'[RPC] {method} recieved from {rip}:{rport}')
del command
case 'command.get.by.name':
command = RPCCommand(ProxyLoader)
command_name = params.get('name', None)
response_data['result'] = command.command_get_by_name(command_name)
logs.debug(f'[RPC] {method} recieved from {rip}:{rport}')
del command
case _:
response_data['error'] = create_error_response(JSONRPCErrorCode.METHOD_NOT_FOUND)
logs.debug(f'[RPC ERROR] {method} recieved from {rip}:{rport}')
http_code = 404
self.send_response(http_code)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(response_data).encode('utf-8'))
return None
def do_GET(self):
self.server_version = 'Defender6'
self.sys_version = ProxyLoader.Config.CURRENT_VERSION
content_length = int(self.headers['Content-Length'])
body = self.rfile.read(content_length)
request_data: dict = json.loads(body)
if not self.authenticate(request_data):
return None
response_data = {'jsonrpc': '2.0', 'id': request_data.get('id', 321),
'error': create_error_response(JSONRPCErrorCode.INVALID_REQUEST)}
self.send_response(404)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(response_data).encode('utf-8'))
return None
def authenticate(self, request_data: dict) -> bool:
logs = ProxyLoader.Logs
auth = self.headers.get('Authorization', None)
if auth is None:
self.send_auth_error(request_data)
return False
# Authorization header format: Basic base64(username:password)
auth_type, auth_string = auth.split(' ', 1)
if auth_type.lower() != 'basic':
self.send_auth_error(request_data)
return False
try:
# Decode the base64-encoded username:password
decoded_credentials = base64.b64decode(auth_string).decode('utf-8')
username, password = decoded_credentials.split(":", 1)
# Check the username and password.
for rpcuser in ProxyLoader.Irc.Config.RPC_USERS:
if rpcuser.get('USERNAME', None) == username and rpcuser.get('PASSWORD', None) == password:
return True
self.send_auth_error(request_data)
return False
except Exception as e:
self.send_auth_error(request_data)
logs.error(e)
return False
def send_auth_error(self, request_data: dict) -> None:
response_data = {
'jsonrpc': '2.0',
'id': request_data.get('id', 123),
'error': create_error_response(JSONRPCErrorCode.AUTHENTICATION_ERROR)
}
self.send_response(401)
self.send_header('WWW-Authenticate', 'Basic realm="Authorization Required"')
self.end_headers()
self.wfile.write(json.dumps(response_data).encode('utf-8'))
class JSONRPCServer:
def __init__(self, loader: 'Loader'):
global ProxyLoader
ProxyLoader = loader
self._Loader = loader
self._Base = loader.Base
self._Logs = loader.Logs
self.rpc_server: Optional[HTTPServer] = None
self.connected: bool = False
def start_server(self, server_class=HTTPServer, handler_class=RPCRequestHandler, *, hostname: str = 'localhost', port: int = 5000):
logging.getLogger('http.server').setLevel(logging.CRITICAL)
server_address = (hostname, port)
self.rpc_server = server_class(server_address, handler_class)
self._Logs.debug(f"Server ready on http://{hostname}:{port}...")
self._Base.create_thread(self.thread_start_rpc_server, (), True)
def thread_start_rpc_server(self) -> None:
self._Loader.Irc.Protocol.send_priv_msg(
self._Loader.Config.SERVICE_NICKNAME, "Defender RPC Server has started successfuly!", self._Loader.Config.SERVICE_CHANLOG
)
self.connected = True
self.rpc_server.serve_forever()
ProxyLoader.Logs.debug(f"RPC Server down!")
def stop_server(self):
self._Base.create_thread(self.thread_stop_rpc_server)
def thread_stop_rpc_server(self):
self.rpc_server.shutdown()
ProxyLoader.Logs.debug(f"RPC Server shutdown!")
self.rpc_server.server_close()
ProxyLoader.Logs.debug(f"RPC Server clean-up!")
self._Base.garbage_collector_thread()
self._Loader.Irc.Protocol.send_priv_msg(
self._Loader.Config.SERVICE_NICKNAME, "Defender RPC Server has stopped successfuly!", self._Loader.Config.SERVICE_CHANLOG
)
self.connected = False
class JSONRPCErrorCode(Enum):
PARSE_ERROR = -32700 # Syntax error in the request (malformed JSON)
INVALID_REQUEST = -32600 # Invalid Request (incorrect structure or missing fields)
METHOD_NOT_FOUND = -32601 # Method not found (the requested method does not exist)
INVALID_PARAMS = -32602 # Invalid Params (the parameters provided are incorrect)
INTERNAL_ERROR = -32603 # Internal Error (an internal server error occurred)
# Custom application-specific errors (beyond standard JSON-RPC codes)
CUSTOM_ERROR = 1001 # Custom application-defined error (e.g., user not found)
AUTHENTICATION_ERROR = 1002 # Authentication failure (e.g., invalid credentials)
PERMISSION_ERROR = 1003 # Permission error (e.g., user does not have access to this method)
RESOURCE_NOT_FOUND = 1004 # Resource not found (e.g., the requested resource does not exist)
DUPLICATE_REQUEST = 1005 # Duplicate request (e.g., a similar request has already been processed)
def description(self):
"""Returns a description associated with each error code"""
descriptions = {
JSONRPCErrorCode.PARSE_ERROR: "The JSON request is malformed.",
JSONRPCErrorCode.INVALID_REQUEST: "The request is invalid (missing or incorrect fields).",
JSONRPCErrorCode.METHOD_NOT_FOUND: "The requested method could not be found.",
JSONRPCErrorCode.INVALID_PARAMS: "The parameters provided are invalid.",
JSONRPCErrorCode.INTERNAL_ERROR: "An internal error occurred on the server.",
JSONRPCErrorCode.CUSTOM_ERROR: "A custom error defined by the application.",
JSONRPCErrorCode.AUTHENTICATION_ERROR: "User authentication failed.",
JSONRPCErrorCode.PERMISSION_ERROR: "User does not have permission to access this method.",
JSONRPCErrorCode.RESOURCE_NOT_FOUND: "The requested resource could not be found.",
JSONRPCErrorCode.DUPLICATE_REQUEST: "The request is a duplicate or is already being processed.",
}
return descriptions.get(self, "Unknown error")
def create_error_response(error_code: JSONRPCErrorCode, details: dict = None) -> dict[str, str]:
"""Create a JSON-RPC error!"""
response = {
"code": error_code.value,
"message": error_code.description(),
}
if details:
response["data"] = details
return response

View File

@@ -0,0 +1,12 @@
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from core.loader import Loader
class RPCChannel:
def __init__(self, loader: 'Loader'):
self._Loader = loader
self._Channel = loader.Channel
def channel_list(self) -> list[dict]:
return [chan.to_dict() for chan in self._Channel.UID_CHANNEL_DB]

View File

@@ -0,0 +1,21 @@
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from core.loader import Loader
class RPCCommand:
def __init__(self, loader: 'Loader'):
self._Loader = loader
self._Command = loader.Commands
def command_list(self) -> list[dict]:
return [command.to_dict() for command in self._Command.DB_COMMANDS]
def command_get_by_module(self, module_name: str) -> list[dict]:
return [command.to_dict() for command in self._Command.DB_COMMANDS if command.module_name.lower() == module_name.lower()]
def command_get_by_name(self, command_name: str) -> dict:
for command in self._Command.DB_COMMANDS:
if command.command_name.lower() == command_name.lower():
return command.to_dict()
return {}

View File

@@ -0,0 +1,30 @@
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from core.loader import Loader
from core.definition import MUser
class RPCUser:
def __init__(self, loader: 'Loader'):
self._Loader = loader
self._User = loader.User
def user_list(self) -> list[dict]:
users = self._User.UID_DB.copy()
copy_users: list['MUser'] = []
for user in users:
copy_user = user.copy()
copy_user.connexion_datetime = copy_user.connexion_datetime.strftime('%d-%m-%Y')
copy_users.append(copy_user)
return [user.to_dict() for user in copy_users]
def user_get(self, uidornickname: str) -> Optional[dict]:
user = self._User.get_user(uidornickname)
if user:
user_copy = user.copy()
user_copy.connexion_datetime = user_copy.connexion_datetime.strftime('%d-%m-%Y')
return user_copy.to_dict()
return None

View File

@@ -7,7 +7,7 @@ from typing import Any, Optional, TYPE_CHECKING
from core.definition import MSModule, MAdmin
if TYPE_CHECKING:
from core.classes.user import User
from core.classes.modules.user import User
class Settings:
"""This Class will never be reloaded.

View File

@@ -1,7 +1,7 @@
from typing import TYPE_CHECKING, Optional
from .unreal6 import Unrealircd6
from .inspircd import Inspircd
from .interface import IProtocol
from ..interfaces.iprotocol import IProtocol
if TYPE_CHECKING:
from core.irc import Irc

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
from datetime import datetime
from json import dumps
from dataclasses import dataclass, field, asdict, fields
from dataclasses import dataclass, field, asdict, fields, replace
from typing import Literal, Any, Optional
from os import sep
@@ -14,6 +14,10 @@ class MainModel:
def to_json(self) -> str:
"""Return the object of a dataclass a json str."""
return dumps(self.to_dict())
def copy(self):
"""Return the object of a dataclass a json str."""
return replace(self)
def get_attributes(self) -> list[str]:
"""Return a list of attributes name"""
@@ -205,6 +209,9 @@ class MConfig(MainModel):
PASSWORD: str = "password"
"""The password of the admin of the service"""
RPC_USERS: list[dict] = field(default_factory=list)
"""The Defender rpc users"""
JSONRPC_URL: str = None
"""The RPC url, if local https://127.0.0.1:PORT/api should be fine"""
@@ -349,6 +356,14 @@ class MModule(MainModel):
class_name: str = None
class_instance: Optional[Any] = None
@dataclass
class DefenderModuleHeader(MainModel):
name: str = ''
version: str = ''
description: str = ''
author: str = ''
core_version: str = ''
@dataclass
class MSModule:
"""Server Modules model"""

View File

@@ -6,8 +6,8 @@ import time
from ssl import SSLSocket
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, Optional, Union
from core.classes import rehash
from core.classes.protocols.interface import IProtocol
from core.classes.modules import rehash
from core.classes.interfaces.iprotocol import IProtocol
from core.utils import tr
if TYPE_CHECKING:
@@ -125,11 +125,13 @@ class Irc:
self.build_command(3, 'core', 'cert', 'Append your new fingerprint to your account!')
self.build_command(4, 'core', 'rehash', 'Reload the configuration file without restarting')
self.build_command(4, 'core', 'raw', 'Send a raw command directly to the IRC server')
self.build_command(4, 'core', 'print_vars', 'Print users in a file.')
self.build_command(4, 'core', 'start_rpc', 'Start defender jsonrpc server')
self.build_command(4, 'core', 'stop_rpc', 'Stop defender jsonrpc server')
# Define the IrcSocket object
self.IrcSocket: Optional[Union[socket.socket, SSLSocket]] = None
self.__create_table()
self.Base.create_thread(func=self.heartbeat, func_args=(self.beat, ))
##############################################
@@ -179,14 +181,21 @@ class Irc:
# 4072 max what the socket can grab
buffer_size = self.IrcSocket.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
data_in_bytes = self.IrcSocket.recv(buffer_size)
data = data_in_bytes.splitlines(True)
count_bytes = len(data_in_bytes)
while count_bytes > 4070:
# If the received message is > 4070 then loop and add the value to the variable
eol = True
if data_in_bytes[-2:] != b"\r\n":
eol = False
while not eol:
new_data = self.IrcSocket.recv(buffer_size)
data_in_bytes += new_data
count_bytes = len(new_data)
if data_in_bytes[-2:] == eol:
eol = False
# while count_bytes > 4070:
# # If the received message is > 4070 then loop and add the value to the variable
# new_data = self.IrcSocket.recv(buffer_size)
# data_in_bytes += new_data
# count_bytes = len(new_data)
data = data_in_bytes.splitlines(True)
@@ -353,11 +362,6 @@ class Irc:
self.Protocol.send2socket(f":{self.Config.SERVEUR_LINK} SASL {self.Settings.MAIN_SERVER_HOSTNAME} {s.client_uid} D F")
self.Protocol.send2socket(f":{self.Config.SERVEUR_LINK} 904 {s.username} :SASL authentication failed")
def __create_table(self) -> None:
"""## Create core tables
"""
pass
def get_defender_uptime(self) -> str:
"""Savoir depuis quand Defender est connecté
@@ -370,7 +374,7 @@ class Irc:
return uptime
def heartbeat(self, beat:float) -> None:
def heartbeat(self, beat: float) -> None:
"""Execute certaines commandes de nettoyage toutes les x secondes
x étant définit a l'initialisation de cette class (self.beat)
@@ -447,7 +451,7 @@ class Irc:
# Check if the user already exist
if not self.Admin.db_is_admin_exist(nickname):
mes_donnees = {'datetime': self.Utils.get_sdatetime(), 'user': nickname, 'password': spassword, 'hostname': hostname, 'vhost': vhost, 'level': level, 'language': 'EN'}
mes_donnees = {'datetime': self.Utils.get_sdatetime(), 'user': nickname, 'password': spassword, 'hostname': hostname, 'vhost': vhost, 'level': level, 'language': self.Config.LANG}
self.Base.db_execute_query(f'''INSERT INTO {self.Config.TABLE_ADMIN}
(createdOn, user, password, hostname, vhost, level, language) VALUES
(:datetime, :user, :password, :hostname, :vhost, :level, :language)
@@ -480,28 +484,26 @@ class Irc:
"""
try:
original_response: list[str] = data.copy()
RED = self.Config.COLORS.red
GREEN = self.Config.COLORS.green
NOGC = self.Config.COLORS.nogc
if len(original_response) < 2:
self.Logs.warning(f'Size ({str(len(original_response))}) - {original_response}')
return None
self.Logs.debug(f">> {self.Utils.hide_sensitive_data(original_response)}")
pos, parsed_protocol = self.Protocol.get_ircd_protocol_poisition(cmd=original_response, log=True)
modules = self.ModuleUtils.model_get_loaded_modules().copy()
for parsed in self.Protocol.Handler.get_ircd_commands():
if parsed.command_name.upper() == parsed_protocol:
parsed.func(original_response)
if len(original_response) > 2:
if original_response[2] != 'UID':
# Envoyer la commande aux classes dynamiquement chargées
for module in self.ModuleUtils.model_get_loaded_modules().copy():
for module in modules:
module.class_instance.cmd(original_response)
# if len(original_response) > 2:
# if original_response[2] != 'UID':
# # Envoyer la commande aux classes dynamiquement chargées
# for module in self.ModuleUtils.model_get_loaded_modules().copy():
# module.class_instance.cmd(original_response)
except IndexError as ie:
self.Logs.error(f"IndexError: {ie}")
except Exception as err:
@@ -519,13 +521,21 @@ class Irc:
Returns:
None: Nothing to return
"""
u = self.User.get_user(user)
"""The User Object"""
if u is None:
return None
fromuser = self.User.get_nickname(user) # Nickname qui a lancé la commande
uid = self.User.get_uid(user) # Récuperer le uid de l'utilisateur
c = self.Client.get_Client(u.uid)
"""The Client Object"""
fromuser = u.nickname
uid = u.uid
self.Settings.current_admin = self.Admin.get_admin(user) # set Current admin if any.
RED = self.Config.COLORS.red
GREEN = self.Config.COLORS.green
BLACK = self.Config.COLORS.black
NOGC = self.Config.COLORS.nogc
# Defender information
@@ -568,7 +578,7 @@ class Irc:
case 'deauth':
current_command = str(cmd[0]).upper()
uid_to_deauth = self.User.get_uid(fromuser)
uid_to_deauth = uid
self.delete_db_admin(uid_to_deauth)
self.Protocol.send_priv_msg(
@@ -581,11 +591,20 @@ class Irc:
return None
case 'firstauth':
# firstauth OWNER_NICKNAME OWNER_PASSWORD
current_nickname = self.User.get_nickname(fromuser)
current_uid = self.User.get_uid(fromuser)
# Syntax. /msg defender firstauth OWNER_NICKNAME OWNER_PASSWORD
# Check command
current_nickname = fromuser
current_uid = uid
current_command = str(cmd[0])
if current_nickname is None:
self.Logs.critical(f"This nickname [{fromuser}] don't exist")
return None
if len(cmd) < 3:
self.Protocol.send_notice(dnickname,fromuser, tr("Syntax. /msg %s %s [OWNER_NICKNAME] [OWNER_PASSWORD]", self.Config.SERVICE_NICKNAME, current_command))
return None
query = f"SELECT count(id) as c FROM {self.Config.TABLE_ADMIN}"
result = self.Base.db_execute_query(query)
result_db = result.fetchone()
@@ -596,11 +615,7 @@ class Irc:
nick_to=fromuser,
msg=tr("You can't use this command anymore ! Please use [%sauth] instead", self.Config.SERVICE_PREFIX)
)
return False
if current_nickname is None:
self.Logs.critical(f"This nickname [{fromuser}] don't exist")
return False
return None
# Credentials sent from the user
cmd_owner = str(cmd[1])
@@ -610,59 +625,33 @@ class Irc:
config_owner = self.Config.OWNER
config_password = self.Config.PASSWORD
if current_nickname != cmd_owner:
self.Logs.critical(f"The current nickname [{fromuser}] is different than the nickname sent [{cmd_owner}] !")
self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg=f"The current nickname [{fromuser}] is different than the nickname sent [{cmd_owner}] !"
)
return False
if current_nickname != config_owner:
self.Logs.critical(f"The current nickname [{current_nickname}] is different than the configuration owner [{config_owner}] !")
self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg=f"The current nickname [{current_nickname}] is different than the configuration owner [{config_owner}] !"
)
return False
if cmd_owner != config_owner:
self.Logs.critical(f"The nickname sent [{cmd_owner}] is different than the configuration owner [{config_owner}] !")
self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg=f"The nickname sent [{cmd_owner}] is different than the configuration owner [{config_owner}] !"
msg=tr("The nickname sent [%s] is different than the one set in the configuration file !", cmd_owner)
)
return False
return None
if cmd_owner == config_owner and cmd_password == config_password:
self.Base.db_create_first_admin()
self.insert_db_admin(current_uid, cmd_owner, 5, self.Config.LANG)
self.Protocol.send_priv_msg(
msg=f"[ {self.Config.COLORS.green}{str(current_command).upper()} ]{self.Config.COLORS.black} - {self.User.get_nickname(fromuser)} est désormais connecté a {dnickname}",
msg=tr("[%s %s %s] - %s is now connected to %s", GREEN, current_command.upper(), NOGC, fromuser, dnickname),
nick_from=dnickname,
channel=dchanlog
)
self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg=f"Connexion a {dnickname} réussie!"
)
self.Protocol.send_notice(dnickname, fromuser, tr("Successfuly connected to %s", dnickname))
else:
self.Protocol.send_priv_msg(
msg=f"[ {self.Config.COLORS.red}{str(current_command).upper()} ]{self.Config.COLORS.black} - {self.User.get_nickname(fromuser)} a tapé un mauvais mot de pass",
msg=tr("[ %s %s %s ] - %s provided a wrong password!", RED, current_command.upper(), NOGC, current_nickname),
nick_from=dnickname,
channel=dchanlog
)
self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg=f"Mot de passe incorrecte"
)
self.Protocol.send_notice(dnickname, fromuser, tr("Wrong password!"))
case 'auth':
# Syntax. !auth nickname password
@@ -670,22 +659,21 @@ class Irc:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} [nickname] [password]")
return None
current_command = cmd[0]
user_to_log = cmd[1]
password = cmd[2]
current_client = self.User.get_user(fromuser)
current_client = u
admin_obj = self.Admin.get_admin(fromuser)
if current_client is None:
# This case should never happen
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {GREEN}{str(current_command).upper()}{NOGC} ] - Nickname {fromuser} is trying to connect to defender wrongly",
msg=f"[ {RED}{str(command).upper()} FAIL{NOGC} ] - Nickname {fromuser} is trying to connect to defender wrongly",
channel=dchanlog)
return None
if admin_obj:
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {GREEN}{str(current_command).upper()}{NOGC} ] - {fromuser} is already connected to {dnickname}",
msg=f"[ {GREEN}{str(command).upper()}{NOGC} ] - {fromuser} is already connected to {dnickname}",
channel=dchanlog)
self.Protocol.send_notice(dnickname, fromuser, tr("You are already connected to %s", dnickname))
return None
@@ -701,15 +689,15 @@ class Irc:
language = str(user_from_db[3])
self.insert_db_admin(current_client.uid, account, level, language)
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {GREEN}{str(current_command).upper()}{NOGC} ] - {current_client.nickname} ({account}) est désormais connecté a {dnickname}",
msg=f"[ {GREEN}{str(command).upper()} SUCCESS{NOGC} ] - {current_client.nickname} ({account}) est désormais connecté a {dnickname}",
channel=dchanlog)
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Connexion a {dnickname} réussie!")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=tr("Successfuly connected to %s", dnickname))
return None
else:
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {RED}{str(current_command).upper()}{NOGC} ] - {current_client.nickname} a tapé un mauvais mot de pass",
msg=f"[ {RED}{str(command).upper()} FAIL{NOGC} ] - {current_client.nickname} a tapé un mauvais mot de pass",
channel=dchanlog)
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Mot de passe incorrecte")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=tr("Wrong password!"))
return None
case 'addaccess':
@@ -735,7 +723,7 @@ class Irc:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} addaccess [nickname] [level] [password]")
case 'editaccess':
# .editaccess [USER] [PASSWORD] [LEVEL]
# .editaccess [USER] [NEW_PASSWORD] [LEVEL]
try:
if len(cmd) < 3:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Right command : /msg {dnickname} editaccess [nickname] [NEWPASSWORD] [NEWLEVEL]")
@@ -750,8 +738,8 @@ class Irc:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"This user {fromuser} has no Admin access")
return None
current_user = self.User.get_nickname(fromuser)
current_uid = self.User.get_uid(fromuser)
current_user = fromuser
current_uid = uid
current_user_level = get_admin.level
user_new_level = int(cmd[3]) if len(cmd) == 4 else get_admin.level
@@ -815,8 +803,8 @@ class Irc:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"This user {fromuser} has no admin access")
return None
current_user = self.User.get_nickname(fromuser)
current_uid = self.User.get_uid(fromuser)
current_user = fromuser
current_uid = uid
current_user_level = get_admin.level
# Rechercher le user dans la base de données.
@@ -845,17 +833,37 @@ class Irc:
case 'cert':
# Syntax !cert
try:
if len(cmd) < 2:
self.Protocol.send_notice(dnickname, fromuser, f"Right command : /msg {dnickname} cert add")
self.Protocol.send_notice(dnickname, fromuser, f"Right command : /msg {dnickname} cert del")
return None
admin_obj = self.Admin.get_admin(fromuser)
if admin_obj:
if admin_obj.fingerprint is not None:
query = f'UPDATE {self.Config.TABLE_ADMIN} SET fingerprint = :fingerprint WHERE user = :user'
r = self.Base.db_execute_query(query, {'fingerprint': admin_obj.fingerprint, 'user': admin_obj.account})
if r.rowcount > 0:
self.Protocol.send_notice(dnickname, fromuser, f'[ {GREEN}CERT{NOGC} ] Your new fingerprint has been attached to your account. {admin_obj.fingerprint}')
else:
self.Protocol.send_notice(dnickname, fromuser, f'[ {RED}CERT{NOGC} ] Impossible to add your fingerprint.{admin_obj.fingerprint}')
else:
self.Protocol.send_notice(dnickname, fromuser, f'[ {RED}CERT{NOGC} ] There is no fingerprint to add.')
param = cmd[1] # add or del
match param:
case 'add':
if admin_obj:
if admin_obj.fingerprint is not None:
query = f'UPDATE {self.Config.TABLE_ADMIN} SET fingerprint = :fingerprint WHERE user = :user'
r = self.Base.db_execute_query(query, {'fingerprint': admin_obj.fingerprint, 'user': admin_obj.account})
if r.rowcount > 0:
self.Protocol.send_notice(dnickname, fromuser, f'[ {GREEN}CERT{NOGC} ] Your new fingerprint has been attached to your account. {admin_obj.fingerprint}')
else:
self.Protocol.send_notice(dnickname, fromuser, f'[ {RED}CERT{NOGC} ] Impossible to add your fingerprint.{admin_obj.fingerprint}')
else:
self.Protocol.send_notice(dnickname, fromuser, f'[ {RED}CERT{NOGC} ] There is no fingerprint to add.')
case 'del':
if admin_obj:
query = f"UPDATE {self.Config.TABLE_ADMIN} SET fingerprint = :fingerprint WHERE user =:user"
r = self.Base.db_execute_query(query, {'fingerprint': None, 'user': admin_obj.account})
if r.rowcount > 0:
self.Protocol.send_notice(dnickname, fromuser, f'[ {GREEN}CERT{NOGC} ] Your fingerprint has been removed from your account. {admin_obj.fingerprint}')
else:
self.Protocol.send_notice(dnickname, fromuser, f'[ {RED}CERT{NOGC} ] Impossible to remove your fingerprint.{admin_obj.fingerprint}')
case _:
self.Protocol.send_notice(dnickname, fromuser, f"Right command : /msg {dnickname} cert add")
self.Protocol.send_notice(dnickname, fromuser, f"Right command : /msg {dnickname} cert del")
return None
except Exception as e:
self.Logs.error(e)
@@ -883,7 +891,7 @@ class Irc:
)
return None
user_obj = self.User.get_user(fromuser)
user_obj = u
if user_obj is None:
self.Logs.error(f"Nickname ({fromuser}) doesn't exist, it is impossible to register this nickname")
@@ -938,8 +946,8 @@ class Irc:
account = str(cmd[1]) # account
encrypted_password = self.Loader.Utils.hash_password(cmd[2])
user_obj = self.User.get_user(fromuser)
client_obj = self.Client.get_Client(user_obj.uid)
user_obj = u
client_obj = c
if client_obj is not None:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You are already logged in")
@@ -979,19 +987,18 @@ class Irc:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} <account>")
return None
user_obj = self.User.get_user(fromuser)
user_obj = u
if user_obj is None:
self.Logs.error(f"The User [{fromuser}] is not available in the database")
return None
client_obj = self.Client.get_Client(user_obj.uid)
client_obj = c
if client_obj is None:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="Nothing to logout. please login first")
return None
self.Protocol.send_svslogout(client_obj)
# self.Protocol.send_svsmode(nickname=fromuser, user_mode='-r')
self.Client.delete(user_obj.uid)
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You have been logged out successfully")
@@ -1011,6 +1018,10 @@ class Irc:
case 'load':
try:
# Load a module ex: .load mod_defender
if len(cmd) < 2:
self.Protocol.send_notice(dnickname, fromuser, tr("Syntax. /msg %s %s MODULE_NAME", dnickname, command.upper()))
return None
mod_name = str(cmd[1])
self.ModuleUtils.load_one_module(self, mod_name, fromuser)
return None
@@ -1023,6 +1034,9 @@ class Irc:
# unload mod_defender
try:
# The module name. exemple: mod_defender
if len(cmd) < 2:
self.Protocol.send_notice(dnickname, fromuser, tr("Syntax. /msg %s %s MODULE_NAME", dnickname, command.upper()))
return None
module_name = str(cmd[1]).lower()
self.ModuleUtils.unload_one_module(self, module_name, False)
return None
@@ -1033,6 +1047,10 @@ class Irc:
# reload mod_defender
try:
# ==> mod_defender
if len(cmd) < 2:
self.Protocol.send_notice(dnickname, fromuser, tr("Syntax. /msg %s %s MODULE_NAME", dnickname, command.upper()))
return None
module_name = str(cmd[1]).lower()
self.ModuleUtils.reload_one_module(self, module_name, fromuser)
return None
@@ -1075,8 +1093,12 @@ class Irc:
case 'restart':
final_reason = ' '.join(cmd[1:])
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"{dnickname.capitalize()} is going to restart!")
self.Config.DEFENDER_RESTART = 1 # Set restart status to 1 saying that the service will restart
self.Config.DEFENDER_INIT = 1 # set init to 1 saying that the service will be re initiated
# Set restart status to 1 saying that the service will restart
self.Config.DEFENDER_RESTART = 1
# set init to 1 saying that the service will be re initiated
self.Config.DEFENDER_INIT = 1
case 'rehash':
rehash.rehash_service(self, fromuser)
@@ -1212,7 +1234,7 @@ class Irc:
self.Protocol.send_notice(
nick_from=dnickname,
nick_to=fromuser,
msg=f"{uptime}"
msg=uptime
)
return None
@@ -1233,5 +1255,30 @@ class Irc:
self.Protocol.send_raw(raw_command)
return None
case 'print_vars':
with open('users.txt', 'w') as fw:
i = 1
for u in self.User.UID_DB:
w = fw.write(u.to_dict().__str__() + "\n")
self.Logs.debug(f" {i} - chars written {w}")
i += 1
self.Protocol.send_priv_msg(dnickname, "Data written in users.txt file", dchanlog)
with open('modules.txt', 'w') as fw:
i = 1
for u in self.ModuleUtils.DB_MODULE_HEADERS:
w = fw.write(u.to_dict().__str__() + "\n")
self.Logs.debug(f" {i} - chars written {w}")
i += 1
self.Protocol.send_priv_msg(dnickname, "Data written in modules.txt file", dchanlog)
return None
case 'start_rpc':
self.Loader.RpcServer.start_server()
case 'stop_rpc':
self.Loader.RpcServer.stop_server()
case _:
pass

View File

@@ -2,8 +2,16 @@ traduction:
# Message help
- orig: "Access denied!"
trad: "Accès refusé."
- orig: "Wrong password!"
trad: "Mot de passe incorrect!"
- orig: "%s - %sLoaded%s by %s on %s"
trad: "%s - %sChargé%s par %s le %s"
- orig: "Module %s loaded!"
trad: "Module %s chargé!"
- orig: "cmd method is not available in the module (%s)"
trad: "La méthode cmd n'est pas disponible dans le module (%s)"
- orig: "[%sMODULE ERROR%s] Module %s is facing issues! %s"
trad: "[%sMODULE ERREUR%s] Le module %s a rencontré une erreur! %s"
- orig: "%s - %sNot Loaded%s"
trad: "%s - %sNon chargé%s"
- orig: "Successfuly connected to %s"

View File

@@ -1,13 +1,14 @@
from logging import Logger
from core.classes.settings import global_settings
from core.classes import translation, user, admin, client, channel, reputation, settings, sasl
from core.classes.modules.settings import global_settings
from core.classes.modules import translation, user, admin, client, channel, reputation, settings, sasl
import core.logs as logs
import core.definition as df
import core.utils as utils
import core.base as base_mod
import core.module as module_mod
import core.classes.commands as commands_mod
import core.classes.config as conf_mod
import core.classes.modules.commands as commands_mod
import core.classes.modules.config as conf_mod
import core.classes.modules.rpc.rpc as rpc_mod
import core.irc as irc
import core.classes.protocols.factory as factory
@@ -26,6 +27,8 @@ class Loader:
self.LoggingModule: logs = logs
self.RpcServerModule: rpc_mod = rpc_mod
self.Utils: utils = utils
# Load Classes
@@ -69,4 +72,8 @@ class Loader:
self.PFactory: factory.ProtocolFactorty = factory.ProtocolFactorty(self.Irc)
self.RpcServer: rpc_mod.JSONRPCServer = rpc_mod.JSONRPCServer(self)
self.Base.init()
self.Logs.debug(self.Utils.tr("Loader %s success", __name__))

View File

@@ -6,7 +6,8 @@ import sys
import importlib
from types import ModuleType
from typing import TYPE_CHECKING, Optional
from core.definition import MModule
from core.definition import DefenderModuleHeader, MModule
from core.utils import tr
if TYPE_CHECKING:
from core.loader import Loader
@@ -15,6 +16,7 @@ if TYPE_CHECKING:
class Module:
DB_MODULES: list[MModule] = []
DB_MODULE_HEADERS: list[DefenderModuleHeader] = []
def __init__(self, loader: 'Loader') -> None:
self.__Loader = loader
@@ -31,18 +33,57 @@ class Module:
list[str]: List of all module names.
"""
base_path = Path('mods')
return [file.name.replace('.py', '') for file in base_path.rglob('mod_*.py')]
modules_available = [file.name.replace('.py', '') for file in base_path.rglob('mod_*.py')]
self.__Logs.debug(f"Modules available: {modules_available}")
return modules_available
def get_module_information(self, module_name: str) -> tuple[str, str, str]:
def get_module_information(self, module_name: str) -> tuple[Optional[str], Optional[str], Optional[str]]:
# module_name : mod_defender
if not module_name.lower().startswith('mod_'):
return None, None, None
module_name = module_name.lower()
module_name = module_name.lower() # --> mod_defender
module_folder = module_name.split('_')[1].lower() # --> defender
class_name = module_name.split('_')[1].capitalize() # --> Defender
self.__Logs.debug(f"Module information Folder: {module_folder}, Name: {module_name}, Class: {class_name}")
return module_folder, module_name, class_name
def get_module_header(self, module_name: str) -> Optional[DefenderModuleHeader]:
for mod_h in self.DB_MODULE_HEADERS:
if module_name.lower() == mod_h.name.lower():
self.__Logs.debug(f"Module Header found: {mod_h}")
return mod_h
return None
def create_module_header(self, module_header: dict[str, str]) -> bool:
"""Create a new module header into DB_MODULE_HEADERS
Args:
module_header (dict[str, str]): The module header
Returns:
bool: True if the module header has been created.
"""
mod_header = DefenderModuleHeader(**module_header)
if self.get_module_header(mod_header.name) is None:
self.__Logs.debug(f"[MOD_HEADER] The module header has been created! ({mod_header.name} v{mod_header.version})")
self.DB_MODULE_HEADERS.append(mod_header)
return True
return False
def delete_module_header(self, module_name: str) -> bool:
mod_header = self.get_module_header(module_name)
if mod_header is not None:
self.__Logs.debug(f"[MOD_HEADER] The module header has been deleted ({mod_header.name} v{mod_header.version})")
self.DB_MODULE_HEADERS.remove(mod_header)
return True
self.__Logs.debug(f"[MOD_HEADER ERROR] Impossible to remove the module header ({module_name})")
return False
def load_one_module(self, uplink: 'Irc', module_name: str, nickname: str, is_default: bool = False) -> bool:
module_folder, module_name, class_name = self.get_module_information(module_name)
@@ -66,14 +107,26 @@ class Module:
return self.reload_one_module(uplink, module_name, nickname)
# Charger le module
loaded_module = importlib.import_module(f'mods.{module_folder}.{module_name}')
my_class = getattr(loaded_module, class_name, None) # Récuperer le nom de classe
create_instance_of_the_class = my_class(uplink) # Créer une nouvelle instance de la classe
try:
loaded_module = importlib.import_module(f'mods.{module_folder}.{module_name}')
my_class = getattr(loaded_module, class_name, None) # Récuperer le nom de classe
create_instance_of_the_class = my_class(uplink) # Créer une nouvelle instance de la classe
self.create_module_header(create_instance_of_the_class.MOD_HEADER)
except AttributeError as attr:
red = uplink.Config.COLORS.red
nogc = uplink.Config.COLORS.nogc
uplink.Protocol.send_priv_msg(
nick_from=self.__Config.SERVICE_NICKNAME,
msg=tr("[%sMODULE ERROR%s] Module %s is facing issues! %s", red, nogc, module_name, attr),
channel=self.__Config.SERVICE_CHANLOG
)
self.__Logs.error(msg=attr, exc_info=True)
return False
if not hasattr(create_instance_of_the_class, 'cmd'):
uplink.Protocol.send_priv_msg(
nick_from=self.__Config.SERVICE_NICKNAME,
msg=f"Module {module_name} ne contient pas de méthode cmd",
msg=tr("cmd method is not available in the module (%s)", module_name),
channel=self.__Config.SERVICE_CHANLOG
)
self.__Logs.critical(f"The Module {module_name} has not been loaded because cmd method is not available")
@@ -86,11 +139,14 @@ class Module:
self.db_register_module(module_name, nickname, is_default)
uplink.Protocol.send_priv_msg(
nick_from=self.__Config.SERVICE_NICKNAME,
msg=f"Module {module_name} chargé",
msg=tr("Module %s loaded!", module_name),
channel=self.__Config.SERVICE_CHANLOG
)
self.__Logs.debug(f"Module {class_name} has been loaded")
return True
return False
def load_all_modules(self) -> bool:
...
@@ -113,6 +169,7 @@ class Module:
if self.is_module_exist_in_sys_module(module_name):
module_model = self.model_get_module(module_name)
if module_model:
self.delete_module_header(module_model.class_instance.MOD_HEADER['name'])
module_model.class_instance.unload()
else:
uplink.Protocol.send_priv_msg(
@@ -130,6 +187,7 @@ class Module:
importlib.reload(the_module)
my_class = getattr(the_module, class_name, None)
new_instance = my_class(uplink)
self.create_module_header(new_instance.MOD_HEADER)
module_model.class_instance = new_instance
# Créer le module dans la base de données
@@ -152,7 +210,7 @@ class Module:
return False
except (TypeError, AttributeError, KeyError, Exception) as err:
self.__Logs.error(f"[RELOAD MODULE ERROR]: {err}")
self.__Logs.error(f"[RELOAD MODULE ERROR]: {err}", exc_info=True)
uplink.Protocol.send_priv_msg(
nick_from=self.__Config.SERVICE_NICKNAME,
msg=f"[RELOAD MODULE ERROR]: {err}",
@@ -193,7 +251,9 @@ class Module:
"""Unload a module
Args:
mod_name (str): Module name ex mod_defender
uplink (Irc): The Irc instance
module_name (str): Module name ex mod_defender
keep_in_db (bool): Keep in database
Returns:
bool: True if success
@@ -215,6 +275,7 @@ class Module:
return False
if module:
self.delete_module_header(module.class_instance.MOD_HEADER['name'])
module.class_instance.unload()
self.DB_MODULES.remove(module)
@@ -241,7 +302,7 @@ class Module:
return False
except Exception as err:
self.__Logs.error(f"General Error: {err}")
self.__Logs.error(f"General Error: {err}", exc_info=True)
return False
def unload_all_modules(self) -> bool:
@@ -258,7 +319,9 @@ class Module:
"""
module_folder, module_name, class_name = self.get_module_information(module_name)
if "mods." + module_folder + "." + module_name in sys.modules:
self.__Logs.debug(f"[SYS MODULE] (mods.{module_folder}.{module_name}) found in sys.modules")
return True
self.__Logs.debug(f"[SYS MODULE] (mods.{module_folder}.{module_name}) not found in sys.modules")
return False
'''

View File

@@ -1,19 +1,18 @@
'''
"""
Main utils library.
'''
"""
import gc
import ssl
import socket
import sys
from pathlib import Path
from re import match, sub
from base64 import b64decode
from typing import Literal, Optional, Any, TYPE_CHECKING
from datetime import datetime, timedelta, timezone
from datetime import datetime
from time import time
from random import choice
from hashlib import md5, sha3_512
from core.classes.settings import global_settings
from core.classes.modules.settings import global_settings
if TYPE_CHECKING:
from core.irc import Irc
@@ -84,9 +83,9 @@ def get_unixtime() -> int:
Returns:
int: Current time in seconds since the Epoch (int)
"""
cet_offset = timezone(timedelta(hours=2))
now_cet = datetime.now(cet_offset)
unixtime_cet = int(now_cet.timestamp())
# cet_offset = timezone(timedelta(hours=2))
# now_cet = datetime.now(cet_offset)
# unixtime_cet = int(now_cet.timestamp())
return int(time())
def get_sdatetime() -> str:
@@ -142,9 +141,9 @@ def create_socket(uplink: 'Irc') -> None:
except OSError as oe:
uplink.Logs.critical(f"[OS Error]: {oe}")
if 'connection refused' in str(oe).lower():
sys.exit(oe)
sys.exit(oe.__str__())
if oe.errno == 10053:
sys.exit(oe)
sys.exit(oe.__str__())
except AttributeError as ae:
uplink.Logs.critical(f"AttributeError: {ae}")
@@ -178,7 +177,7 @@ def generate_random_string(lenght: int) -> str:
return randomize
def hash_password(password: str, algorithm: Literal["md5, sha3_512"] = 'md5') -> str:
def hash_password(password: str, algorithm: Literal["md5", "sha3_512"] = 'md5') -> str:
"""Return the crypted password following the selected algorithm
Args:
@@ -191,16 +190,16 @@ def hash_password(password: str, algorithm: Literal["md5, sha3_512"] = 'md5') ->
match algorithm:
case 'md5':
password = md5(password.encode()).hexdigest()
return password
hashed_password = md5(password.encode()).hexdigest()
return hashed_password
case 'sha3_512':
password = sha3_512(password.encode()).hexdigest()
return password
hashed_password = sha3_512(password.encode()).hexdigest()
return hashed_password
case _:
password = md5(password.encode()).hexdigest()
return password
hashed_password = md5(password.encode()).hexdigest()
return hashed_password
def get_all_modules() -> list[str]:
"""Get list of all main modules
@@ -225,9 +224,9 @@ def clean_uid(uid: str) -> Optional[str]:
return None
pattern = fr'[:|@|%|\+|~|\*]*'
parsed_UID = sub(pattern, '', uid)
parsed_uid = sub(pattern, '', uid)
return parsed_UID
return parsed_uid
def hide_sensitive_data(srvmsg: list[str]) -> list[str]:
try:

View File

@@ -10,7 +10,7 @@ from core import install
#############################################
try:
install.update_packages()
# install.update_packages()
from core.loader import Loader
loader = Loader()
loader.Irc.init_irc()

View File

@@ -1,90 +1,29 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING, Optional, Any
from core.classes.interfaces.imodule import IModule
import mods.clone.utils as utils
import mods.clone.threads as thds
import mods.clone.schemas as schemas
from mods.clone.clone_manager import CloneManager
if TYPE_CHECKING:
from core.irc import Irc
from faker import Faker
class Clone:
class Clone(IModule):
def __init__(self, irc_instance: 'Irc') -> None:
@dataclass
class ModConfModel(schemas.ModConfModel):
...
# Module name (Mandatory)
self.module_name = 'mod_' + str(self.__class__.__name__).lower()
MOD_HEADER: dict[str, str] = {
'name':'Clone',
'version':'1.0.0',
'description':'Connect thousands of clones to your IRCD, by group. You can use them as security moderation.',
'author':'Defender Team',
'core_version':'Defender-6'
}
# Add Irc Object to the module (Mandatory)
self.Irc = irc_instance
# Add Irc Protocol Object to the module (Mandatory)
self.Protocol = irc_instance.Protocol
# Add Global Configuration to the module (Mandatory)
self.Config = irc_instance.Config
# Add Base object to the module (Mandatory)
self.Base = irc_instance.Base
# Add logs object to the module (Mandatory)
self.Logs = irc_instance.Loader.Logs
# Add User object to the module (Mandatory)
self.User = irc_instance.User
# Add Channel object to the module (Mandatory)
self.Channel = irc_instance.Channel
# Add global definitions
self.Definition = irc_instance.Loader.Definition
# The Global Settings
self.Settings = irc_instance.Loader.Settings
self.Schemas = schemas
self.Utils = utils
self.Threads = thds
self.Faker: Optional['Faker'] = self.Utils.create_faker_object('en_GB')
self.Clone = CloneManager(self)
metadata = self.Settings.get_cache('UID_CLONE_DB')
if metadata is not None:
self.Clone.UID_CLONE_DB = metadata
self.Logs.debug(f"Cache Size = {self.Settings.get_cache_size()}")
# Créer les nouvelles commandes du module
self.Irc.build_command(1, self.module_name, 'clone', 'Connect, join, part, kill and say clones')
# Init the module (Mandatory)
self.__init_module()
# Log the module
self.Logs.debug(f'Module {self.module_name} loaded ...')
def __init_module(self) -> None:
# Créer les tables necessaire a votre module (ce n'es pas obligatoire)
self.__create_tables()
self.stop = False
# Load module configuration (Mandatory)
self.__load_module_configuration()
self.Channel.db_query_channel(action='add', module_name=self.module_name, channel_name=self.Config.CLONE_CHANNEL)
self.Protocol.send_sjoin(self.Config.CLONE_CHANNEL)
self.Protocol.send_set_mode('+o', nickname=self.Config.SERVICE_NICKNAME, channel_name=self.Config.CLONE_CHANNEL)
self.Protocol.send_set_mode('+nts', channel_name=self.Config.CLONE_CHANNEL)
self.Protocol.send_set_mode('+k', channel_name=self.Config.CLONE_CHANNEL, params=self.Config.CLONE_CHANNEL_PASSWORD)
def __create_tables(self) -> None:
def create_tables(self) -> None:
"""Methode qui va créer la base de donnée si elle n'existe pas.
Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module
@@ -104,20 +43,28 @@ class Clone:
return None
def __load_module_configuration(self) -> None:
"""### Load Module Configuration
"""
try:
# Variable qui va contenir les options de configuration du module Defender
self.ModConfig = self.Schemas.ModConfModel()
def load(self) -> None:
self.ModConfig = self.ModConfModel()
self.stop = False
self.Schemas = schemas
self.Utils = utils
self.Threads = thds
self.Faker: Optional['Faker'] = self.Utils.create_faker_object('en_GB')
self.Clone = CloneManager(self)
metadata = self.Settings.get_cache('UID_CLONE_DB')
# Sync the configuration with core configuration (Mandatory)
# self.Base.db_sync_core_config(self.module_name, self.ModConfig)
if metadata is not None:
self.Clone.UID_CLONE_DB = metadata
self.Logs.debug(f"Cache Size = {self.Settings.get_cache_size()}")
return None
# Créer les nouvelles commandes du module
self.Irc.build_command(1, self.module_name, 'clone', 'Connect, join, part, kill and say clones')
except TypeError as te:
self.Logs.critical(te)
self.Channel.db_query_channel(action='add', module_name=self.module_name, channel_name=self.Config.CLONE_CHANNEL)
self.Protocol.send_sjoin(self.Config.CLONE_CHANNEL)
self.Protocol.send_set_mode('+o', nickname=self.Config.SERVICE_NICKNAME, channel_name=self.Config.CLONE_CHANNEL)
self.Protocol.send_set_mode('+nts', channel_name=self.Config.CLONE_CHANNEL)
self.Protocol.send_set_mode('+k', channel_name=self.Config.CLONE_CHANNEL, params=self.Config.CLONE_CHANNEL_PASSWORD)
def unload(self) -> None:
"""Cette methode sera executée a chaque désactivation ou

View File

@@ -176,15 +176,13 @@ def create_new_clone(uplink: 'Clone', faker_instance: 'Faker', group: str = 'Def
def handle_on_privmsg(uplink: 'Clone', srvmsg: list[str]) -> None:
parser = uplink.Protocol.parse_privmsg(srvmsg)
uid_sender = uplink.Irc.Utils.clean_uid(parser.get('uid_sender', None))
senderObj = uplink.User.get_user(uid_sender)
senderObj, recieverObj, channel, message = uplink.Protocol.parse_privmsg(srvmsg)
if senderObj is not None:
if senderObj.hostname in uplink.Config.CLONE_LOG_HOST_EXEMPT:
return
senderMsg = parser.get('message', None)
clone_obj = uplink.Clone.get_clone(parser.get('uid_reciever', None))
senderMsg = message
clone_obj = recieverObj
if clone_obj is None:
return

View File

@@ -1,13 +1,12 @@
from typing import Optional, TYPE_CHECKING
from dataclasses import dataclass
from core.classes.interfaces.imodule import IModule
import mods.command.utils as utils
if TYPE_CHECKING:
from core.irc import Irc
from core.definition import MUser
from sqlalchemy import CursorResult, Row, Sequence
class Command:
class Command(IModule):
@dataclass
class ModConfModel:
@@ -15,44 +14,52 @@ class Command:
"""
pass
def __init__(self, ircInstance: 'Irc') -> None:
MOD_HEADER: dict[str, str] = {
'name':'Command',
'version':'1.0.0',
'description':'Module contains all IRC commands',
'author':'Defender Team',
'core_version':'Defender-6'
}
# Module name (Mandatory)
self.module_name = 'mod_' + str(self.__class__.__name__).lower()
def create_tables(self) -> None:
"""Methode qui va créer la base de donnée si elle n'existe pas.
Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module
Args:
database_name (str): Nom de la base de données ( pas d'espace dans le nom )
# Add Irc Object to the module (Mandatory)
self.Irc = ircInstance
Returns:
None: Aucun retour n'es attendu
"""
# Add Loader Object to the module (Mandatory)
self.Loader = ircInstance.Loader
table_automode = '''CREATE TABLE IF NOT EXISTS command_automode (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_on TEXT,
updated_on TEXT,
nickname TEXT,
channel TEXT,
mode TEXT
)
'''
# Add Protocol object to the module (Mandatory)
self.Protocol = ircInstance.Protocol
# Add Global Configuration to the module (Mandatory)
self.Config = ircInstance.Config
# Add Base object to the module (Mandatory)
self.Base = ircInstance.Base
# Add main Utils to the module
self.MainUtils = ircInstance.Utils
# Add logs object to the module (Mandatory)
self.Logs = ircInstance.Loader.Logs
# Add User object to the module (Mandatory)
self.User = ircInstance.User
# Add Client object to the module (Mandatory)
self.Client = ircInstance.Client
# Add Channel object to the module (Mandatory)
self.Channel = ircInstance.Channel
self.Base.db_execute_query(table_automode)
return None
def load(self) -> None:
# Module Utils
self.mod_utils = utils
# Build the default configuration model (Mandatory)
self.ModConfig = self.ModConfModel()
self.user_to_notice: str = ''
self.show_219: bool = True
# Register new commands into the protocol
new_cmds = {'403', '401', '006', '018', '219', '223'}
for c in new_cmds:
self.Irc.Protocol.known_protocol.add(c)
self.Irc.build_command(2, self.module_name, 'join', 'Join a channel')
self.Irc.build_command(2, self.module_name, 'assign', 'Assign a user to a role or task')
self.Irc.build_command(2, self.module_name, 'part', 'Leave a channel')
@@ -104,73 +111,6 @@ class Command:
self.Irc.build_command(2, self.module_name, 'klinelist', 'List all K-line bans')
self.Irc.build_command(3, self.module_name, 'map', 'Show the server network map')
# Init the module
self.__init_module()
# Log the module
self.Logs.debug(f'-- Module {self.module_name} loaded ...')
def __init_module(self) -> None:
# Create you own tables (Mandatory)
self.__create_tables()
# Load module configuration and sync with core one (Mandatory)
self.__load_module_configuration()
# End of mandatory methods you can start your customization #
self.user_to_notice: str = ''
self.show_219: bool = True
return None
def __create_tables(self) -> None:
"""Methode qui va créer la base de donnée si elle n'existe pas.
Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module
Args:
database_name (str): Nom de la base de données ( pas d'espace dans le nom )
Returns:
None: Aucun retour n'es attendu
"""
table_automode = '''CREATE TABLE IF NOT EXISTS command_automode (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_on TEXT,
updated_on TEXT,
nickname TEXT,
channel TEXT,
mode TEXT
)
'''
self.Base.db_execute_query(table_automode)
return None
def __load_module_configuration(self) -> None:
"""### Load Module Configuration
"""
try:
# Build the default configuration model (Mandatory)
self.ModConfig = self.ModConfModel()
# Sync the configuration with core configuration (Mandatory)
self.Base.db_sync_core_config(self.module_name, self.ModConfig)
return None
except TypeError as te:
self.Logs.critical(te)
def __update_configuration(self, param_key: str, param_value: str):
"""Update the local and core configuration
Args:
param_key (str): The parameter key
param_value (str): The parameter value
"""
self.Base.db_update_core_config(self.module_name, self.ModConfig, param_key, param_value)
def unload(self) -> None:
self.Irc.Commands.drop_command_by_module(self.module_name)
return None
@@ -186,10 +126,12 @@ class Command:
nogc = self.Config.COLORS.nogc
cmd = list(data).copy()
if len(cmd) < 2:
pos, parsed_cmd = self.Irc.Protocol.get_ircd_protocol_poisition(cmd=cmd, log=True)
if pos == -1:
return None
match cmd[1]:
match parsed_cmd:
# [':irc.deb.biz.st', '403', 'Dev-PyDefender', '#Z', ':No', 'such', 'channel']
case '403' | '401':
try:
@@ -260,22 +202,10 @@ class Command:
except Exception as err:
self.Logs.warning(f'Unknown Error: {str(err)}')
case _:
pass
if len(cmd) < 3:
return None
match cmd[2]:
case 'SJOIN':
# ['@msgid=yldTlbwAGbzCGUcCIHi3ku;time=2024-11-11T17:56:24.297Z', ':001', 'SJOIN', '1728815963', '#znc', ':001LQ0L0C']
# Check if the user has an automode
try:
if len(cmd) < 6:
return None
user_uid = self.User.clean_uid(cmd[5])
userObj: MUser = self.User.get_user(user_uid)
channel_name = cmd[4] if self.Channel.is_valid_channel(cmd[4]) else None
@@ -301,6 +231,9 @@ class Command:
except KeyError as ke:
self.Logs.error(f"Key Error: {err}")
case _:
pass
except Exception as err:
self.Logs.error(f"General Error: {err}")

View File

@@ -1,129 +1,26 @@
from typing import TYPE_CHECKING
from dataclasses import dataclass
from typing import Any
from core.classes.interfaces.imodule import IModule
import mods.defender.schemas as schemas
import mods.defender.utils as utils
import mods.defender.threads as thds
from core.utils import tr
if TYPE_CHECKING:
from core.irc import Irc
class Defender(IModule):
class Defender:
@dataclass
class ModConfModel(schemas.ModConfModel):
...
def __init__(self, irc_instance: 'Irc') -> None:
MOD_HEADER: dict[str, str] = {
'name':'Defender',
'version':'1.0.0',
'description':'Defender main module that uses the reputation security.',
'author':'Defender Team',
'core_version':'Defender-6'
}
# Module name (Mandatory)
self.module_name = 'mod_' + str(self.__class__.__name__).lower()
# Add Irc Object to the module (Mandatory)
self.Irc = irc_instance
# Add Loader Object to the module (Mandatory)
self.Loader = irc_instance.Loader
# Add server protocol Object to the module (Mandatory)
self.Protocol = irc_instance.Protocol
# Add Global Configuration to the module (Mandatory)
self.Config = irc_instance.Config
# Add Base object to the module (Mandatory)
self.Base = irc_instance.Base
# Add logs object to the module (Mandatory)
self.Logs = irc_instance.Loader.Logs
# Add User object to the module (Mandatory)
self.User = irc_instance.User
# Add Channel object to the module (Mandatory)
self.Channel = irc_instance.Channel
# Add Settings object to save objects when reloading modules (Mandatory)
self.Settings = irc_instance.Settings
# Add Reputation object to the module (Optional)
self.Reputation = irc_instance.Reputation
# Add module schemas
self.Schemas = schemas
# Add utils functions
self.Utils = utils
# Create module commands (Mandatory)
self.Irc.build_command(0, self.module_name, 'code', 'Display the code or key for access')
self.Irc.build_command(1, self.module_name, 'info', 'Provide information about the channel or server')
self.Irc.build_command(1, self.module_name, 'autolimit', 'Automatically set channel user limits')
self.Irc.build_command(3, self.module_name, 'reputation', 'Check or manage user reputation')
self.Irc.build_command(3, self.module_name, 'proxy_scan', 'Scan users for proxy connections')
self.Irc.build_command(3, self.module_name, 'flood', 'Handle flood detection and mitigation')
self.Irc.build_command(3, self.module_name, 'status', 'Check the status of the server or bot')
self.Irc.build_command(3, self.module_name, 'timer', 'Set or manage timers')
self.Irc.build_command(3, self.module_name, 'show_reputation', 'Display reputation information')
self.Irc.build_command(3, self.module_name, 'sentinel', 'Monitor and guard the channel or server')
# Init the module (Mandatory)
self.__init_module()
# Log the module
self.Logs.debug(f'-- Module {self.module_name} V2 loaded ...')
def __init_module(self) -> None:
# Create you own tables if needed (Mandatory)
self.__create_tables()
# Load module configuration (Mandatory)
self.__load_module_configuration()
# End of mandatory methods you can start your customization #
self.timeout = self.Config.API_TIMEOUT
# Listes qui vont contenir les ip a scanner avec les différentes API
self.Schemas.DB_ABUSEIPDB_USERS = []
self.Schemas.DB_FREEIPAPI_USERS = []
self.Schemas.DB_CLOUDFILT_USERS = []
self.Schemas.DB_PSUTIL_USERS = []
self.Schemas.DB_LOCALSCAN_USERS = []
# Variables qui indique que les threads sont en cours d'éxecutions
self.abuseipdb_isRunning:bool = True
self.freeipapi_isRunning:bool = True
self.cloudfilt_isRunning:bool = True
self.psutil_isRunning:bool = True
self.localscan_isRunning:bool = True
self.reputationTimer_isRunning:bool = True
self.autolimit_isRunning: bool = True
# Variable qui va contenir les users
self.flood_system = {}
# Contient les premieres informations de connexion
self.reputation_first_connexion = {'ip': '', 'score': -1}
# Laisser vide si aucune clé
self.abuseipdb_key = '13c34603fee4d2941a2c443cc5c77fd750757ca2a2c1b304bd0f418aff80c24be12651d1a3cfe674'
self.cloudfilt_key = 'r1gEtjtfgRQjtNBDMxsg'
# Démarrer les threads pour démarrer les api
self.Base.create_thread(func=thds.thread_freeipapi_scan, func_args=(self, ))
self.Base.create_thread(func=thds.thread_cloudfilt_scan, func_args=(self, ))
self.Base.create_thread(func=thds.thread_abuseipdb_scan, func_args=(self, ))
self.Base.create_thread(func=thds.thread_local_scan, func_args=(self, ))
self.Base.create_thread(func=thds.thread_psutil_scan, func_args=(self, ))
self.Base.create_thread(func=thds.thread_apply_reputation_sanctions, func_args=(self, ))
if self.ModConfig.autolimit == 1:
self.Base.create_thread(func=thds.thread_autolimit, func_args=(self, ))
if self.ModConfig.reputation == 1:
self.Protocol.send_sjoin(self.Config.SALON_JAIL)
self.Protocol.send2socket(f":{self.Config.SERVICE_NICKNAME} SAMODE {self.Config.SALON_JAIL} +o {self.Config.SERVICE_NICKNAME}")
return None
def __create_tables(self) -> None:
def create_tables(self) -> None:
"""Methode qui va créer la base de donnée si elle n'existe pas.
Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module
Args:
@@ -146,20 +43,63 @@ class Defender:
# self.Base.db_execute_query(table_trusted)
return None
def __load_module_configuration(self) -> None:
"""### Load Module Configuration
"""
def load(self):
# Add module schemas
self.Schemas = schemas
# Add utils functions
self.Utils = utils
# Variable qui va contenir les options de configuration du module Defender
self.ModConfig = self.Schemas.ModConfModel()
self.ModConfig: schemas.ModConfModel = self.ModConfModel()
# Sync the configuration with core configuration (Mandatory)
self.Base.db_sync_core_config(self.module_name, self.ModConfig)
# Create module commands (Mandatory)
self.Irc.build_command(0, self.module_name, 'code', 'Display the code or key for access')
self.Irc.build_command(1, self.module_name, 'info', 'Provide information about the channel or server')
self.Irc.build_command(1, self.module_name, 'autolimit', 'Automatically set channel user limits')
self.Irc.build_command(3, self.module_name, 'reputation', 'Check or manage user reputation')
self.Irc.build_command(3, self.module_name, 'proxy_scan', 'Scan users for proxy connections')
self.Irc.build_command(3, self.module_name, 'flood', 'Handle flood detection and mitigation')
self.Irc.build_command(3, self.module_name, 'status', 'Check the status of the server or bot')
self.Irc.build_command(3, self.module_name, 'show_reputation', 'Display reputation information')
self.Irc.build_command(3, self.module_name, 'sentinel', 'Monitor and guard the channel or server')
return None
self.timeout = self.Config.API_TIMEOUT
def __update_configuration(self, param_key: str, param_value: str):
# Listes qui vont contenir les ip a scanner avec les différentes API
self.Schemas.DB_ABUSEIPDB_USERS = self.Schemas.DB_FREEIPAPI_USERS = self.Schemas.DB_CLOUDFILT_USERS = []
self.Schemas.DB_PSUTIL_USERS = self.Schemas.DB_LOCALSCAN_USERS = []
self.Base.db_update_core_config(self.module_name, self.ModConfig, param_key, param_value)
# Variables qui indique que les threads sont en cours d'éxecutions
self.abuseipdb_isRunning = self.freeipapi_isRunning = self.cloudfilt_isRunning = True
self.psutil_isRunning = self.localscan_isRunning = self.reputationTimer_isRunning = True
self.autolimit_isRunning = True
# Variable qui va contenir les users
self.flood_system = {}
# Contient les premieres informations de connexion
self.reputation_first_connexion = {'ip': '', 'score': -1}
# Laisser vide si aucune clé
self.abuseipdb_key = '13c34603fee4d2941a2c443cc5c77fd750757ca2a2c1b304bd0f418aff80c24be12651d1a3cfe674'
self.cloudfilt_key = 'r1gEtjtfgRQjtNBDMxsg'
# Démarrer les threads pour démarrer les api
self.Base.create_thread(func=thds.thread_freeipapi_scan, func_args=(self, ))
self.Base.create_thread(func=thds.thread_cloudfilt_scan, func_args=(self, ))
self.Base.create_thread(func=thds.thread_abuseipdb_scan, func_args=(self, ))
self.Base.create_thread(func=thds.thread_local_scan, func_args=(self, ))
self.Base.create_thread(func=thds.thread_psutil_scan, func_args=(self, ))
self.Base.create_thread(func=thds.thread_apply_reputation_sanctions, func_args=(self, ))
if self.ModConfig.autolimit == 1:
self.Base.create_thread(func=thds.thread_autolimit, func_args=(self, ))
if self.ModConfig.reputation == 1:
self.Protocol.send_sjoin(self.Config.SALON_JAIL)
self.Protocol.send2socket(f":{self.Config.SERVICE_NICKNAME} SAMODE {self.Config.SALON_JAIL} +o {self.Config.SERVICE_NICKNAME}")
def __onload(self):
@@ -207,9 +147,12 @@ class Defender:
return None
def insert_db_trusted(self, uid: str, nickname:str) -> None:
u = self.User.get_user(uid)
if u is None:
return None
uid = self.User.get_uid(uid)
nickname = self.User.get_nickname(nickname)
uid = u.uid
nickname = u.nickname
query = "SELECT id FROM def_trusted WHERE user = ?"
exec_query = self.Base.db_execute_query(query, {"user": nickname})
@@ -245,29 +188,6 @@ class Defender:
except Exception as err:
self.Logs.error(f"General Error: {err}")
def run_db_action_timer(self, wait_for: float = 0) -> None:
query = f"SELECT param_key FROM {self.Config.TABLE_CONFIG}"
res = self.Base.db_execute_query(query)
service_id = self.Config.SERVICE_ID
dchanlog = self.Config.SERVICE_CHANLOG
for param in res.fetchall():
if param[0] == 'reputation':
self.Protocol.send_priv_msg(
nick_from=service_id,
msg=f" ===> {param[0]}",
channel=dchanlog
)
else:
self.Protocol.send_priv_msg(
nick_from=service_id,
msg=f"{param[0]}",
channel=dchanlog
)
return None
def cmd(self, data: list[str]) -> None:
if not data or len(data) < 2:
@@ -298,7 +218,6 @@ class Defender:
return None
case 'SJOIN':
self.Utils.handle_on_sjoin(self, cmd)
return None
@@ -324,10 +243,13 @@ class Defender:
except Exception as err:
self.Logs.error(f"General Error: {err}", exc_info=True)
def hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
def hcmds(self, user: str, channel: Any, cmd: list, fullcmd: list = []) -> None:
u = self.User.get_user(user)
if u is None:
return None
command = str(cmd[0]).lower()
fromuser = user
fromuser = u.nickname
channel = fromchannel = channel if self.Channel.is_valid_channel(channel) else None
dnickname = self.Config.SERVICE_NICKNAME # Defender nickname
@@ -339,17 +261,6 @@ class Defender:
match command:
case 'timer':
try:
timer_sent = self.Base.int_if_possible(cmd[1])
timer_sent = int(timer_sent)
self.Base.create_timer(timer_sent, self.run_db_action_timer)
except TypeError as te:
self.Logs.error(f"Type Error -> {te}")
except ValueError as ve:
self.Logs.error(f"Value Error -> {ve}")
case 'show_reputation':
if not self.Reputation.UID_REPUTATION_DB:
@@ -363,8 +274,8 @@ class Defender:
case 'code':
try:
release_code = cmd[1]
jailed_nickname = self.User.get_nickname(fromuser)
jailed_UID = self.User.get_uid(fromuser)
jailed_nickname = u.nickname
jailed_UID = u.uid
get_reputation = self.Reputation.get_reputation(jailed_UID)
if get_reputation is None:
@@ -394,7 +305,7 @@ class Defender:
self.Protocol.send_sapart(nick_to_sapart=jailed_nickname, channel_name=jailed_salon)
self.Protocol.send_sajoin(nick_to_sajoin=jailed_nickname, channel_name=welcome_salon)
self.Protocol.send2socket(f":{link} REPUTATION {jailed_IP} {self.ModConfig.reputation_score_after_release}")
self.User.get_user(jailed_UID).score_connexion = reputation_seuil + 1
u.score_connexion = reputation_seuil + 1
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[{color_green} MOT DE PASS CORRECT {color_black}] : You have now the right to enjoy the network !",
nick_to=jailed_nickname)
@@ -431,7 +342,7 @@ class Defender:
match arg:
case 'on':
if self.ModConfig.autolimit == 0:
self.__update_configuration('autolimit', 1)
self.update_configuration('autolimit', 1)
self.autolimit_isRunning = True
self.Base.create_thread(func=thds.thread_autolimit, func_args=(self, ))
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[{self.Config.COLORS.green}AUTOLIMIT{self.Config.COLORS.nogc}] Activated", channel=self.Config.SERVICE_CHANLOG)
@@ -440,7 +351,7 @@ class Defender:
case 'off':
if self.ModConfig.autolimit == 1:
self.__update_configuration('autolimit', 0)
self.update_configuration('autolimit', 0)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[{self.Config.COLORS.green}AUTOLIMIT{self.Config.COLORS.nogc}] Deactivated", channel=self.Config.SERVICE_CHANLOG)
else:
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[{self.Config.COLORS.red}AUTOLIMIT{self.Config.COLORS.nogc}] Already Deactivated", channel=self.Config.SERVICE_CHANLOG)
@@ -449,8 +360,8 @@ class Defender:
amount = int(cmd[2])
interval = int(cmd[3])
self.__update_configuration('autolimit_amount', amount)
self.__update_configuration('autolimit_interval', interval)
self.update_configuration('autolimit_amount', amount)
self.update_configuration('autolimit_interval', interval)
self.Protocol.send_priv_msg(
nick_from=dnickname,
msg=f"[{self.Config.COLORS.green}AUTOLIMIT{self.Config.COLORS.nogc}] Amount set to ({amount}) | Interval set to ({interval})",
@@ -489,7 +400,7 @@ class Defender:
return False
# self.update_db_configuration('reputation', 1)
self.__update_configuration(key, 1)
self.update_configuration(key, 1)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}REPUTATION{self.Config.COLORS.black} ] : Activated by {fromuser}", channel=dchanlog)
@@ -515,7 +426,7 @@ class Defender:
)
return False
self.__update_configuration(key, 0)
self.update_configuration(key, 0)
self.Protocol.send_priv_msg(
nick_from=dnickname,
@@ -604,7 +515,7 @@ class Defender:
return False
# self.update_db_configuration(key, 1)
self.__update_configuration(key, 1)
self.update_configuration(key, 1)
self.Protocol.send_priv_msg(
nick_from=dnickname,
@@ -622,7 +533,7 @@ class Defender:
return False
# self.update_db_configuration(key, 0)
self.__update_configuration(key, 0)
self.update_configuration(key, 0)
self.Protocol.send_priv_msg(
nick_from=dnickname,
@@ -635,7 +546,7 @@ class Defender:
key = 'reputation_seuil'
# self.update_db_configuration(key, reputation_seuil)
self.__update_configuration(key, reputation_seuil)
self.update_configuration(key, reputation_seuil)
self.Protocol.send_priv_msg(
nick_from=dnickname,
@@ -647,7 +558,7 @@ class Defender:
case 'timer':
reputation_timer = int(cmd[3])
key = 'reputation_timer'
self.__update_configuration(key, reputation_timer)
self.update_configuration(key, reputation_timer)
self.Protocol.send_priv_msg(
nick_from=dnickname,
@@ -659,7 +570,7 @@ class Defender:
case 'score_after_release':
reputation_score_after_release = int(cmd[3])
key = 'reputation_score_after_release'
self.__update_configuration(key, reputation_score_after_release)
self.update_configuration(key, reputation_score_after_release)
self.Protocol.send_priv_msg(
nick_from=dnickname,
@@ -671,7 +582,7 @@ class Defender:
case 'security_group':
reputation_sg = int(cmd[3])
key = 'reputation_sg'
self.__update_configuration(key, reputation_sg)
self.update_configuration(key, reputation_sg)
self.Protocol.send_priv_msg(
nick_from=dnickname,
@@ -733,7 +644,7 @@ class Defender:
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
return None
self.__update_configuration(option, 1)
self.update_configuration(option, 1)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
elif action == 'off':
@@ -741,7 +652,7 @@ class Defender:
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog)
return None
self.__update_configuration(option, 0)
self.update_configuration(option, 0)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
@@ -751,7 +662,7 @@ class Defender:
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
return None
self.__update_configuration(option, 1)
self.update_configuration(option, 1)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
elif action == 'off':
@@ -759,7 +670,7 @@ class Defender:
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog)
return None
self.__update_configuration(option, 0)
self.update_configuration(option, 0)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
@@ -769,7 +680,7 @@ class Defender:
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
return None
self.__update_configuration(option, 1)
self.update_configuration(option, 1)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
elif action == 'off':
@@ -777,7 +688,7 @@ class Defender:
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog)
return None
self.__update_configuration(option, 0)
self.update_configuration(option, 0)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
@@ -787,7 +698,7 @@ class Defender:
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
return None
self.__update_configuration(option, 1)
self.update_configuration(option, 1)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
elif action == 'off':
@@ -795,7 +706,7 @@ class Defender:
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog)
return None
self.__update_configuration(option, 0)
self.update_configuration(option, 0)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
@@ -805,7 +716,7 @@ class Defender:
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Already activated", channel=dchanlog)
return None
self.__update_configuration(option, 1)
self.update_configuration(option, 1)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_green}PROXY_SCAN {option.upper()}{color_black} ] : Activated by {fromuser}", channel=dchanlog)
elif action == 'off':
@@ -813,7 +724,7 @@ class Defender:
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Already Deactivated", channel=dchanlog)
return None
self.__update_configuration(option, 0)
self.update_configuration(option, 0)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {color_red}PROXY_SCAN {option.upper()}{color_black} ] : Deactivated by {fromuser}", channel=dchanlog)
@@ -846,7 +757,7 @@ class Defender:
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Already activated", channel=dchanlog)
return False
self.__update_configuration(key, 1)
self.update_configuration(key, 1)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Activated by {fromuser}", channel=dchanlog)
@@ -855,7 +766,7 @@ class Defender:
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.red}FLOOD{self.Config.COLORS.black} ] : Already Deactivated", channel=dchanlog)
return False
self.__update_configuration(key, 0)
self.update_configuration(key, 0)
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Deactivated by {fromuser}", channel=dchanlog)
@@ -867,7 +778,7 @@ class Defender:
case 'flood_message':
key = 'flood_message'
set_value = int(cmd[3])
self.__update_configuration(key, set_value)
self.update_configuration(key, set_value)
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Flood message set to {set_value} by {fromuser}",
@@ -876,7 +787,7 @@ class Defender:
case 'flood_time':
key = 'flood_time'
set_value = int(cmd[3])
self.__update_configuration(key, set_value)
self.update_configuration(key, set_value)
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Flood time set to {set_value} by {fromuser}",
@@ -885,7 +796,7 @@ class Defender:
case 'flood_timer':
key = 'flood_timer'
set_value = int(cmd[3])
self.__update_configuration(key, set_value)
self.update_configuration(key, set_value)
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[ {self.Config.COLORS.green}FLOOD{self.Config.COLORS.black} ] : Flood timer set to {set_value} by {fromuser}",
@@ -922,6 +833,7 @@ class Defender:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' flood_message ==> {self.ModConfig.flood_message}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' flood_time ==> {self.ModConfig.flood_time}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' flood_timer ==> {self.ModConfig.flood_timer}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f' [{color_green if self.ModConfig.flood == 1 else color_red}Sentinel{nogc}] ==> {self.ModConfig.sentinel}')
except KeyError as ke:
self.Logs.error(f"Key Error : {ke}")
@@ -963,14 +875,27 @@ class Defender:
channel_to_dont_quit = [self.Config.SALON_JAIL, self.Config.SERVICE_CHANLOG]
if activation == 'on':
result = self.Base.db_execute_query(f"SELECT distinct channel_name FROM {self.Config.TABLE_CHANNEL}")
channels = result.fetchall()
channel_in_db = [channel[0] for channel in channels]
channel_to_dont_quit.extend(channel_in_db)
self.update_configuration('sentinel', 1)
for chan in self.Channel.UID_CHANNEL_DB:
if chan.name not in channel_to_dont_quit:
self.Protocol.send_join_chan(uidornickname=dnickname, channel=chan.name)
self.Protocol.send_priv_msg(dnickname, f"Sentinel mode activated on {channel}", channel=chan.name)
return None
if activation == 'off':
result = self.Base.db_execute_query(f"SELECT distinct channel_name FROM {self.Config.TABLE_CHANNEL}")
channels = result.fetchall()
channel_in_db = [channel[0] for channel in channels]
channel_to_dont_quit.extend(channel_in_db)
self.update_configuration('sentinel', 0)
for chan in self.Channel.UID_CHANNEL_DB:
if chan.name not in channel_to_dont_quit:
self.Protocol.send_part_chan(uidornickname=dnickname, channel=chan.name)
self.Protocol.send_priv_msg(dnickname, f"Sentinel mode deactivated on {channel}", channel=chan.name)
self.join_saved_channels()
return None

View File

@@ -20,6 +20,7 @@ class ModConfModel(MainModel):
autolimit: int = 0
autolimit_amount: int = 3
autolimit_interval: int = 3
sentinel: int = 0
@dataclass
class FloodUser(MainModel):

View File

@@ -62,6 +62,11 @@ def handle_on_mode(uplink: 'Defender', srvmsg: list[str]):
def handle_on_privmsg(uplink: 'Defender', srvmsg: list[str]):
# ['@mtag....',':python', 'PRIVMSG', '#defender', ':zefzefzregreg', 'regg', 'aerg']
sender, reciever, channel, message = uplink.Protocol.parse_privmsg(srvmsg)
if uplink.ModConfig.sentinel == 1 and channel.name != uplink.Config.SERVICE_CHANLOG:
uplink.Protocol.send_priv_msg(uplink.Config.SERVICE_NICKNAME, f"{sender.nickname} say on {channel.name}: {' '.join(message)}", uplink.Config.SERVICE_CHANLOG)
action_on_flood(uplink, srvmsg)
return None
@@ -147,8 +152,13 @@ def handle_on_nick(uplink: 'Defender', srvmsg: list[str]):
confmodel (ModConfModel): The Module Configuration
"""
p = uplink.Protocol
parser = p.parse_nick(srvmsg)
uid = uplink.Loader.Utils.clean_uid(parser.get('uid', None))
u, new_nickname, timestamp = p.parse_nick(srvmsg)
if u is None:
uplink.Logs.error(f"[USER OBJ ERROR {timestamp}] - {srvmsg}")
return None
uid = u.uid
confmodel = uplink.ModConfig
get_reputation = uplink.Reputation.get_reputation(uid)
@@ -161,7 +171,7 @@ def handle_on_nick(uplink: 'Defender', srvmsg: list[str]):
# Update the new nickname
oldnick = get_reputation.nickname
newnickname = parser.get('newnickname', None)
newnickname = new_nickname
get_reputation.nickname = newnickname
# If ban in all channel is ON then unban old nickname an ban the new nickname
@@ -179,14 +189,17 @@ def handle_on_quit(uplink: 'Defender', srvmsg: list[str]):
srvmsg (list[str]): The Server MSG
"""
p = uplink.Protocol
parser = p.parse_quit(srvmsg)
userobj, reason = p.parse_quit(srvmsg)
confmodel = uplink.ModConfig
if userobj is None:
uplink.Logs.debug(f"This UID do not exist anymore: {srvmsg}")
return None
ban_all_chan = uplink.Base.int_if_possible(confmodel.reputation_ban_all_chan)
final_UID = uplink.Loader.Utils.clean_uid(str(parser.get('uid', None)))
jail_salon = uplink.Config.SALON_JAIL
service_id = uplink.Config.SERVICE_ID
get_user_reputation = uplink.Reputation.get_reputation(final_UID)
get_user_reputation = uplink.Reputation.get_reputation(userobj.uid)
if get_user_reputation is not None:
final_nickname = get_user_reputation.nickname
@@ -195,7 +208,7 @@ def handle_on_quit(uplink: 'Defender', srvmsg: list[str]):
p.send2socket(f":{service_id} MODE {chan.name} -b {final_nickname}!*@*")
uplink.Logs.debug(f"Mode -b {final_nickname} on channel {chan.name}")
uplink.Reputation.delete(final_UID)
uplink.Reputation.delete(userobj.uid)
uplink.Logs.debug(f"Client {get_user_reputation.nickname} has been removed from Reputation local DB")
def handle_on_uid(uplink: 'Defender', srvmsg: list[str]):
@@ -207,7 +220,7 @@ def handle_on_uid(uplink: 'Defender', srvmsg: list[str]):
uplink (Defender): The Defender instance
srvmsg (list[str]): The Server MSG
"""
parser_uid = uplink.Protocol.parse_uid(srvmsg)
_User = uplink.Protocol.parse_uid(srvmsg)
gconfig = uplink.Config
irc = uplink.Irc
confmodel = uplink.ModConfig
@@ -217,10 +230,8 @@ def handle_on_uid(uplink: 'Defender', srvmsg: list[str]):
return None
# Get User information
_User = irc.User.get_user(parser_uid.get('uid', None))
if _User is None:
irc.Logs.warning(f'This UID: [{parser_uid.get("uid", None)}] is not available please check why')
irc.Logs.warning(f'Error when parsing UID', exc_info=True)
return
# If user is not service or IrcOp then scan them
@@ -385,7 +396,7 @@ def action_apply_reputation_santions(uplink: 'Defender') -> None:
salon_jail = gconfig.SALON_JAIL
uid_to_clean = []
if reputation_flag == 0 or reputation_timer == 0:
if reputation_flag == 0 or reputation_timer == 0 or not irc.Reputation.UID_REPUTATION_DB:
return None
for user in irc.Reputation.UID_REPUTATION_DB:

View File

@@ -1,18 +1,13 @@
import logging
import asyncio
from unrealircd_rpc_py.objects.Definition import LiveRPCResult
from core.classes.interfaces.imodule import IModule
import mods.jsonrpc.utils as utils
import mods.jsonrpc.threads as thds
from time import sleep
from typing import TYPE_CHECKING
from dataclasses import dataclass
from unrealircd_rpc_py.ConnectionFactory import ConnectionFactory
from unrealircd_rpc_py.LiveConnectionFactory import LiveConnectionFactory
if TYPE_CHECKING:
from core.irc import Irc
class Jsonrpc():
class Jsonrpc(IModule):
@dataclass
class ModConfModel:
@@ -20,120 +15,13 @@ class Jsonrpc():
"""
jsonrpc: int = 0
def __init__(self, ircInstance: 'Irc') -> None:
# Module name (Mandatory)
self.module_name = 'mod_' + str(self.__class__.__name__).lower()
# Add Irc Object to the module (Mandatory)
self.Irc = ircInstance
# Add Protocol to the module (Mandatory)
self.Protocol = ircInstance.Protocol
# Add Global Configuration to the module (Mandatory)
self.Config = ircInstance.Config
# Add Base object to the module (Mandatory)
self.Base = ircInstance.Base
# Add Main Utils (Mandatory)
self.MainUtils = ircInstance.Utils
# Add logs object to the module (Mandatory)
self.Logs = ircInstance.Loader.Logs
# Add User object to the module (Mandatory)
self.User = ircInstance.User
# Add Channel object to the module (Mandatory)
self.Channel = ircInstance.Channel
# Is RPC Active?
self.is_streaming = False
# Module Utils
self.Utils = utils
# Module threads
self.Threads = thds
# Run Garbage collector.
self.Base.create_timer(10, self.MainUtils.run_python_garbage_collector)
# Create module commands (Mandatory)
self.Irc.build_command(1, self.module_name, 'jsonrpc', 'Activate the JSON RPC Live connection [ON|OFF]')
self.Irc.build_command(1, self.module_name, 'jruser', 'Get Information about a user using JSON RPC')
self.Irc.build_command(1, self.module_name, 'jrinstances', 'Get number of instances')
# Init the module
self.__init_module()
# Log the module
self.Logs.debug(f'Module {self.module_name} loaded ...')
def __init_module(self) -> None:
logging.getLogger('websockets').setLevel(logging.WARNING)
logging.getLogger('unrealircd-rpc-py').setLevel(logging.CRITICAL)
logging.getLogger('unrealircd-liverpc-py').setLevel(logging.CRITICAL)
# Create you own tables (Mandatory)
# self.__create_tables()
# Load module configuration and sync with core one (Mandatory)
self.__load_module_configuration()
# End of mandatory methods you can start your customization #
try:
self.Rpc = ConnectionFactory(self.Config.DEBUG_LEVEL).get(self.Config.JSONRPC_METHOD)
self.LiveRpc = LiveConnectionFactory(self.Config.DEBUG_LEVEL).get(self.Config.JSONRPC_METHOD)
sync_unixsocket = {'path_to_socket_file': self.Config.JSONRPC_PATH_TO_SOCKET_FILE}
sync_http = {'url': self.Config.JSONRPC_URL, 'username': self.Config.JSONRPC_USER, 'password': self.Config.JSONRPC_PASSWORD}
live_unixsocket = {'path_to_socket_file': self.Config.JSONRPC_PATH_TO_SOCKET_FILE,
'callback_object_instance' : self, 'callback_method_or_function_name': 'callback_sent_to_irc'}
live_http = {'url': self.Config.JSONRPC_URL, 'username': self.Config.JSONRPC_USER, 'password': self.Config.JSONRPC_PASSWORD,
'callback_object_instance' : self, 'callback_method_or_function_name': 'callback_sent_to_irc'}
sync_param = sync_unixsocket if self.Config.JSONRPC_METHOD == 'unixsocket' else sync_http
live_param = live_unixsocket if self.Config.JSONRPC_METHOD == 'unixsocket' else live_http
self.Rpc.setup(sync_param)
self.LiveRpc.setup(live_param)
if self.ModConfig.jsonrpc == 1:
self.Base.create_thread(func=self.Threads.thread_subscribe, func_args=(self, ), run_once=True)
return None
except Exception as err:
self.Protocol.send_priv_msg(
nick_from=self.Config.SERVICE_NICKNAME,
msg=f"[{self.Config.COLORS.red}JSONRPC ERROR{self.Config.COLORS.nogc}] {err.__str__()}",
channel=self.Config.SERVICE_CHANLOG
)
self.Logs.error(f"JSONRPC ERROR: {err.__str__()}")
def __create_tables(self) -> None:
"""Methode qui va créer la base de donnée si elle n'existe pas.
Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module
Args:
database_name (str): Nom de la base de données ( pas d'espace dans le nom )
Returns:
None: Aucun retour n'es attendu
"""
table_logs = '''CREATE TABLE IF NOT EXISTS test_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
server_msg TEXT
)
'''
self.Base.db_execute_query(table_logs)
return None
MOD_HEADER: dict[str, str] = {
'name':'JsonRPC',
'version':'1.0.0',
'description':'Module using the unrealircd-rpc-py library',
'author':'Defender Team',
'core_version':'Defender-6'
}
def callback_sent_to_irc(self, response: LiveRPCResult) -> None:
@@ -169,31 +57,74 @@ class Jsonrpc():
return None
def __load_module_configuration(self) -> None:
"""### Load Module Configuration
"""
try:
# Build the default configuration model (Mandatory)
self.ModConfig = self.ModConfModel(jsonrpc=0)
def create_tables(self) -> None:
return None
# Sync the configuration with core configuration (Mandatory)
self.Base.db_sync_core_config(self.module_name, self.ModConfig)
def load(self) -> None:
logging.getLogger('websockets').setLevel(logging.WARNING)
logging.getLogger('unrealircd-rpc-py').setLevel(logging.CRITICAL)
logging.getLogger('unrealircd-liverpc-py').setLevel(logging.CRITICAL)
self.ModConfig = self.ModConfModel(jsonrpc=0)
if self.Config.SERVEUR_PROTOCOL != 'unreal6':
self.Loader.ModuleUtils.unload_one_module(self.Irc, self.module_name, False)
return None
except TypeError as te:
self.Logs.critical(te)
# Is RPC Active?
self.is_streaming = False
# Module Utils
self.Utils = utils
def update_configuration(self, param_key: str, param_value: str) -> None:
"""Update the local and core configuration
# Module threads
self.Threads = thds
Args:
param_key (str): The parameter key
param_value (str): The parameter value
"""
self.Base.db_update_core_config(self.module_name, self.ModConfig, param_key, param_value)
# Run Garbage collector.
self.Base.create_timer(10, self.MainUtils.run_python_garbage_collector)
# Create module commands (Mandatory)
self.Irc.build_command(1, self.module_name, 'jsonrpc', 'Activate the JSON RPC Live connection [ON|OFF]')
self.Irc.build_command(1, self.module_name, 'jruser', 'Get Information about a user using JSON RPC')
self.Irc.build_command(1, self.module_name, 'jrinstances', 'Get number of instances')
try:
self.Rpc = ConnectionFactory(self.Config.DEBUG_LEVEL).get(self.Config.JSONRPC_METHOD)
self.LiveRpc = LiveConnectionFactory(self.Config.DEBUG_LEVEL).get(self.Config.JSONRPC_METHOD)
sync_unixsocket = {'path_to_socket_file': self.Config.JSONRPC_PATH_TO_SOCKET_FILE}
sync_http = {'url': self.Config.JSONRPC_URL, 'username': self.Config.JSONRPC_USER, 'password': self.Config.JSONRPC_PASSWORD}
live_unixsocket = {'path_to_socket_file': self.Config.JSONRPC_PATH_TO_SOCKET_FILE,
'callback_object_instance' : self, 'callback_method_or_function_name': 'callback_sent_to_irc'}
live_http = {'url': self.Config.JSONRPC_URL, 'username': self.Config.JSONRPC_USER, 'password': self.Config.JSONRPC_PASSWORD,
'callback_object_instance' : self, 'callback_method_or_function_name': 'callback_sent_to_irc'}
sync_param = sync_unixsocket if self.Config.JSONRPC_METHOD == 'unixsocket' else sync_http
live_param = live_unixsocket if self.Config.JSONRPC_METHOD == 'unixsocket' else live_http
self.Rpc.setup(sync_param)
self.LiveRpc.setup(live_param)
if self.ModConfig.jsonrpc == 1:
self.Base.create_thread(func=self.Threads.thread_subscribe, func_args=(self, ), run_once=True)
return None
except Exception as err:
self.Protocol.send_priv_msg(
nick_from=self.Config.SERVICE_NICKNAME,
msg=f"[{self.Config.COLORS.red}JSONRPC ERROR{self.Config.COLORS.nogc}] {err.__str__()}",
channel=self.Config.SERVICE_CHANLOG
)
self.Logs.error(f"JSONRPC ERROR: {err.__str__()}")
def unload(self) -> None:
if self.Config.SERVEUR_PROTOCOL != 'unreal6':
self.Loader.ModuleUtils.unload_one_module(self.Irc, self.module_name, False)
return None
if self.is_streaming:
self.Protocol.send_priv_msg(
nick_from=self.Config.SERVICE_NICKNAME,

View File

@@ -1,75 +1,28 @@
from typing import TYPE_CHECKING
from dataclasses import dataclass, fields
from typing import Any
from core.classes.interfaces.imodule import IModule
from dataclasses import dataclass
if TYPE_CHECKING:
from core.irc import Irc
class Test():
class Test(IModule):
@dataclass
class ModConfModel:
"""The Model containing the module parameters
"""The Model containing the module parameters (Mandatory)
you can leave it without params.
just use pass | if you leave it empty, in the load() method just init empty object ==> self.ModConfig = ModConfModel()
"""
param_exemple1: str
param_exemple2: int
def __init__(self, ircInstance: 'Irc') -> None:
MOD_HEADER: dict[str, str] = {
'name':'Test',
'version':'1.0.0',
'description':'The test module',
'author':'Defender Team',
'core_version':'Defender-6'
}
"""Module Header (Mandatory)"""
# Module name (Mandatory)
self.module_name = 'mod_' + str(self.__class__.__name__).lower()
# Add Irc Object to the module (Mandatory)
self.Irc = ircInstance
# Add Loader Object to the module (Mandatory)
self.Loader = ircInstance.Loader
# Add server protocol Object to the module (Mandatory)
self.Protocol = ircInstance.Protocol
# Add Global Configuration to the module (Mandatory)
self.Config = ircInstance.Config
# Add Base object to the module (Mandatory)
self.Base = ircInstance.Base
# Add logs object to the module (Mandatory)
self.Logs = ircInstance.Loader.Logs
# Add User object to the module (Mandatory)
self.User = ircInstance.User
# Add Channel object to the module (Mandatory)
self.Channel = ircInstance.Channel
# Add Reputation object to the module (Optional)
self.Reputation = ircInstance.Reputation
# Create module commands (Mandatory)
self.Irc.build_command(0, self.module_name, 'test-command', 'Execute a test command')
self.Irc.build_command(1, self.module_name, 'test_level_1', 'Execute a level 1 test command')
self.Irc.build_command(2, self.module_name, 'test_level_2', 'Execute a level 2 test command')
self.Irc.build_command(3, self.module_name, 'test_level_3', 'Execute a level 3 test command')
# Init the module
self.__init_module()
# Log the module
self.Logs.debug(f'Module {self.module_name} loaded ...')
def __init_module(self) -> None:
# Create you own tables (Mandatory)
self.__create_tables()
# Load module configuration and sync with core one (Mandatory)
self.__load_module_configuration()
# End of mandatory methods you can start your customization #
return None
def __create_tables(self) -> None:
def create_tables(self) -> None:
"""Methode qui va créer la base de donnée si elle n'existe pas.
Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module
Args:
@@ -86,69 +39,69 @@ class Test():
)
'''
self.Base.db_execute_query(table_logs)
# self.Base.db_execute_query(table_logs)
return None
def __load_module_configuration(self) -> None:
"""### Load Module Configuration
def load(self) -> None:
"""### Load Module Configuration (Mandatory)
"""
try:
# Build the default configuration model (Mandatory)
self.ModConfig = self.ModConfModel(param_exemple1='param value 1', param_exemple2=1)
# Sync the configuration with core configuration (Mandatory)
self.Base.db_sync_core_config(self.module_name, self.ModConfig)
# Create module commands (Mandatory)
self.Irc.build_command(0, self.module_name, 'test-command', 'Execute a test command')
self.Irc.build_command(1, self.module_name, 'test_level_1', 'Execute a level 1 test command')
self.Irc.build_command(2, self.module_name, 'test_level_2', 'Execute a level 2 test command')
self.Irc.build_command(3, self.module_name, 'test_level_3', 'Execute a level 3 test command')
return None
except TypeError as te:
self.Logs.critical(te)
def __update_configuration(self, param_key: str, param_value: str):
"""Update the local and core configuration
Args:
param_key (str): The parameter key
param_value (str): The parameter value
"""
self.Base.db_update_core_config(self.module_name, self.ModConfig, param_key, param_value)
# Build the default configuration model (Mandatory)
self.ModConfig = self.ModConfModel(param_exemple1='str', param_exemple2=1)
def unload(self) -> None:
"""### This method is called when you unload or you reload the module (Mandatory)"""
self.Irc.Commands.drop_command_by_module(self.module_name)
return None
def cmd(self, data:list) -> None:
try:
cmd = list(data).copy()
def cmd(self, data: list[str]) -> None:
"""All messages coming from the IRCD server will be handled using this method (Mandatory)
Args:
data (list): Messages coming from the IRCD server.
"""
cmd = list(data).copy()
try:
return None
except KeyError as ke:
self.Logs.error(f"Key Error: {ke}")
except IndexError as ie:
self.Logs.error(f"{ie} / {cmd} / length {str(len(cmd))}")
except Exception as err:
self.Logs.error(f"General Error: {err}")
def hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
def hcmds(self, user: str, channel: Any, cmd: list, fullcmd: list = []) -> None:
"""All messages coming from the user commands (Mandatory)
Args:
user (str): The user who send the request.
channel (Any): The channel from where is coming the message (could be None).
cmd (list): The messages coming from the IRCD server.
fullcmd (list, optional): The full messages coming from the IRCD server. Defaults to [].
"""
u = self.User.get_user(user)
c = self.Channel.get_channel(channel) if self.Channel.is_valid_channel(channel) else None
if u is None:
return None
command = str(cmd[0]).lower()
dnickname = self.Config.SERVICE_NICKNAME
fromuser = user
fromchannel = str(channel) if not channel is None else None
match command:
case 'test-command':
try:
self.Protocol.send_notice(nick_from=dnickname, nick_to=u.nickname, msg="This is a notice to the sender ...")
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"This is private message to the sender ...", nick_to=u.nickname)
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="This is a notice to the sender ...")
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"This is private message to the sender ...", nick_to=fromuser)
if not fromchannel is None:
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"This is private message to the sender ...", channel=fromchannel)
if c is not None:
self.Protocol.send_priv_msg(nick_from=dnickname, msg=f"This is private message to the sender ...", channel=c.name)
# How to update your module configuration
self.__update_configuration('param_exemple2', 7)
self.update_configuration('param_exemple2', 7)
self.update_configuration('param_exemple1', 'my_value')
# Log if you want the result
self.Logs.debug(f"Test logs ready")

View File

@@ -1,95 +1,37 @@
"""
File : mod_votekick.py
Version : 1.0.0
Version : 1.0.2
Description : Manages votekick sessions for multiple channels.
Handles activation, ongoing vote checks, and cleanup.
Author : adator
Created : 2025-08-16
Last Updated: 2025-08-16
Last Updated: 2025-11-01
-----------------------------------------
"""
from dataclasses import dataclass
import re
from core.classes.interfaces.imodule import IModule
import mods.votekick.schemas as schemas
import mods.votekick.utils as utils
from mods.votekick.votekick_manager import VotekickManager
import mods.votekick.threads as thds
from typing import TYPE_CHECKING, Any, Optional
from typing import Any, Optional
if TYPE_CHECKING:
from core.irc import Irc
class Votekick(IModule):
@dataclass
class ModConfModel(schemas.VoteChannelModel):
...
class Votekick:
MOD_HEADER: dict[str, str] = {
'name':'votekick',
'version':'1.0.2',
'description':'Channel Democraty',
'author':'Defender Team',
'core_version':'Defender-6'
}
def __init__(self, uplink: 'Irc') -> None:
# Module name (Mandatory)
self.module_name = 'mod_' + str(self.__class__.__name__).lower()
# Add Irc Object to the module
self.Irc = uplink
# Add Loader Object to the module (Mandatory)
self.Loader = uplink.Loader
# Add server protocol Object to the module (Mandatory)
self.Protocol = uplink.Protocol
# Add Global Configuration to the module
self.Config = uplink.Config
# Add Base object to the module
self.Base = uplink.Base
# Add logs object to the module
self.Logs = uplink.Logs
# Add User object to the module
self.User = uplink.User
# Add Channel object to the module
self.Channel = uplink.Channel
# Add Utils.
self.Utils = uplink.Utils
# Add Utils module
self.ModUtils = utils
# Add Schemas module
self.Schemas = schemas
# Add Threads module
self.Threads = thds
# Add VoteKick Manager
self.VoteKickManager = VotekickManager(self)
metadata = uplink.Loader.Settings.get_cache('VOTEKICK')
if metadata is not None:
self.VoteKickManager.VOTE_CHANNEL_DB = metadata
# self.VOTE_CHANNEL_DB = metadata
# Créer les nouvelles commandes du module
self.Irc.build_command(1, self.module_name, 'vote', 'The kick vote module')
# Init the module
self.__init_module()
# Log the module
self.Logs.debug(f'-- Module {self.module_name} loaded ...')
def __init_module(self) -> None:
# Add admin object to retrieve admin users
self.Admin = self.Irc.Admin
self.__create_tables()
self.ModUtils.join_saved_channels(self)
return None
def __create_tables(self) -> None:
def create_tables(self) -> None:
"""Methode qui va créer la base de donnée si elle n'existe pas.
Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module
@@ -115,10 +57,37 @@ class Votekick:
self.Base.db_execute_query(table_vote)
return None
def load(self) -> None:
self.ModConfig = self.ModConfModel()
# Add VoteKick Manager
self.VoteKickManager = VotekickManager(self)
# Add Utils module
self.ModUtils = utils
# Add Schemas module
self.Schemas = schemas
# Add Threads module
self.Threads = thds
self.ModUtils.join_saved_channels(self)
metadata = self.Settings.get_cache('VOTEKICK')
if metadata is not None:
self.VoteKickManager.VOTE_CHANNEL_DB = metadata
# Créer les nouvelles commandes du module
self.Irc.build_command(1, self.module_name, 'vote', 'The kick vote module')
def unload(self) -> None:
try:
# Cache the local DB with current votes.
self.Loader.Settings.set_cache('VOTEKICK', self.VoteKickManager.VOTE_CHANNEL_DB)
if self.VoteKickManager.VOTE_CHANNEL_DB:
self.Settings.set_cache('VOTEKICK', self.VoteKickManager.VOTE_CHANNEL_DB)
for chan in self.VoteKickManager.VOTE_CHANNEL_DB:
self.Protocol.send_part_chan(uidornickname=self.Config.SERVICE_ID, channel=chan.channel_name)

View File

@@ -11,7 +11,7 @@ class VotekickManager:
def __init__(self, uplink: 'Votekick'):
self.uplink = uplink
self.Logs = uplink.Logs
self.Utils = uplink.Utils
self.Utils = uplink.MainUtils
def activate_new_channel(self, channel_name: str) -> bool:
"""Activate a new channel in the votekick systeme

View File

@@ -1,5 +1,5 @@
{
"version": "6.3.2",
"version": "6.3.3",
"requests": "2.32.5",
"psutil": "7.1.2",