24 Commits

Author SHA1 Message Date
adator
2f8b965b59 Merge pull request #36 from adator85/dev
Dev
2024-09-15 02:04:32 +02:00
adator
c7047ec3d6 V5.2.0 2024-09-15 02:03:38 +02:00
adator
eddba81cf0 remove mod_jsonrpc.py 2024-09-14 01:32:16 +02:00
adator
59e634951f V5.1.9 2024-09-14 01:25:13 +02:00
adator
3c043cefd8 Merge pull request #35 from adator85/dev
V5.1.8
2024-09-08 00:42:57 +02:00
adator
59a75cecd8 Merge pull request #34 from adator85/dev
V5.1.7
2024-09-03 00:21:32 +02:00
adator
71053437a7 Merge pull request #33 from adator85/dev
V5.1.6
2024-09-01 22:15:54 +02:00
adator
7796d05206 Merge pull request #32 from adator85/dev
Update vote kick commands
2024-09-01 18:55:57 +02:00
adator
5f2567f9e5 Merge pull request #31 from adator85/dev
mod_command update
2024-09-01 17:30:05 +02:00
adator
aaa1dd9a1a Merge pull request #30 from adator85/dev
adding Say command for clones
2024-09-01 16:40:25 +02:00
adator
a02f2f9a26 Merge pull request #29 from adator85/dev
update mod_clone module
2024-09-01 15:54:50 +02:00
adator
d73adb6f0b Merge pull request #28 from adator85/dev
update readme
2024-09-01 15:35:59 +02:00
adator
b812e64992 Merge pull request #27 from adator85/dev
Dev
2024-09-01 15:24:18 +02:00
adator
9bd1f68df2 Merge pull request #26 from adator85/dev
Dev
2024-09-01 14:59:38 +02:00
adator
f44b08bf36 Merge pull request #25 from adator85/dev
fix Installation
2024-08-29 01:36:38 +02:00
adator
1a19e1613a Merge pull request #24 from adator85/dev
Fix Bug installation
2024-08-29 01:31:19 +02:00
adator
cdc15b7b47 Merge pull request #23 from adator85/dev
Dev
2024-08-29 01:16:55 +02:00
adator
31fe9f62ec Merge pull request #22 from adator85/dev
Dev
2024-08-24 01:39:11 +02:00
adator
f0853e3afb Merge pull request #21 from adator85/dev
New Installation file created for unix system
2024-08-22 01:02:00 +02:00
adator
6dade09257 Merge pull request #20 from adator85/dev
README Update
2024-08-21 00:50:31 +02:00
adator
9533b010b2 Merge pull request #19 from adator85/dev
V5.0.4 - Delete a user when a user has been kicked
2024-08-20 02:24:37 +02:00
adator
824db73590 Merge pull request #18 from adator85/dev
Delete channel mode information
2024-08-20 02:14:31 +02:00
adator
96bf4b6f80 Merge pull request #17 from adator85/dev
Fix channel update
2024-08-20 02:08:09 +02:00
adator
922336363e Merge pull request #16 from adator85/dev
Dev
2024-08-20 01:56:04 +02:00
9 changed files with 133 additions and 264 deletions

6
.gitignore vendored
View File

@@ -2,7 +2,7 @@
db/
logs/
__pycache__/
mods/mod_jsonrpc.py
configuration.json
install.log
test.py
# mods/mod_ia.py
*.log
test.py

View File

@@ -218,7 +218,7 @@ class Base:
"""Supprime les modules de la base de données
Args:
cmd (str): le module a enregistrer
cmd (str): le module a supprimer
"""
insert_cmd_query = f"DELETE FROM {self.Config.table_module} WHERE module_name = :module_name"
mes_donnees = {'module_name': module_name}

View File

@@ -36,8 +36,6 @@ class Install:
if self.skip_install:
return None
print(f'Configuration loaded : {self.config}')
# Sinon tester les dependances python et les installer avec pip
if self.do_install():

View File

@@ -453,6 +453,7 @@ class Irc:
except ModuleNotFoundError as moduleNotFound:
self.Base.logs.error(f"MODULE_NOT_FOUND: {moduleNotFound}")
self.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} :[ {self.Config.CONFIG_COLOR['rouge']}MODULE_NOT_FOUND{self.Config.CONFIG_COLOR['noire']} ]: {moduleNotFound}")
self.Base.db_delete_module(module_name)
except Exception as e:
self.Base.logs.error(f"Something went wrong with a module you want to load : {e}")
self.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} :[ {self.Config.CONFIG_COLOR['rouge']}ERROR{self.Config.CONFIG_COLOR['noire']} ]: {e}")

View File

@@ -4,6 +4,7 @@ from typing import Union
import re, socket, psutil, requests, json, time
from sys import exit
from core.irc import Irc
from core.Model import User
# Le module crée devra réspecter quelques conditions
# 1. Le nom de la classe devra toujours s'appeler comme le module. Exemple => nom de class Defender | nom du module mod_defender
@@ -107,12 +108,19 @@ class Defender():
# self.join_saved_channels()
self.timeout = self.Config.API_TIMEOUT
self.abuseipdb_UserModel: list[User.UserModel] = []
self.freeipapi_UserModel: list[User.UserModel] = []
self.cloudfilt_UserModel: list[User.UserModel] = []
self.psutil_UserModel: list[User.UserModel] = []
self.localscan_UserModel: list[User.UserModel] = []
# Listes qui vont contenir les ip a scanner avec les différentes API
self.freeipapi_remote_ip:list = []
self.cloudfilt_remote_ip:list = []
self.abuseipdb_remote_ip:list = []
self.psutil_remote_ip:list = []
self.localscan_remote_ip:list = []
# self.freeipapi_remote_ip:list = []
# self.cloudfilt_remote_ip:list = []
# self.abuseipdb_remote_ip:list = []
# self.psutil_remote_ip:list = []
# self.localscan_remote_ip:list = []
# Variables qui indique que les threads sont en cours d'éxecutions
self.abuseipdb_isRunning:bool = True
@@ -605,23 +613,33 @@ class Defender():
return None
def scan_ports(self, remote_ip: str) -> None:
def scan_ports(self, userModel: User.UserModel) -> None:
"""local_scan
Args:
remote_ip (str): _description_
"""
User = userModel
remote_ip = User.remote_ip
username = User.username
hostname = User.hostname
nickname = User.nickname
if remote_ip in self.Config.WHITELISTED_IP:
return None
for port in self.Config.PORTS_TO_SCAN:
newSocket = ''
newSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM or socket.SOCK_NONBLOCK)
newSocket.settimeout(0.5)
try:
newSocket = ''
newSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM or socket.SOCK_NONBLOCK)
newSocket.settimeout(0.5)
connection = (remote_ip, self.Base.int_if_possible(port))
newSocket.connect(connection)
self.Irc.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} :[ {self.Config.CONFIG_COLOR['rouge']}PROXY_SCAN{self.Config.CONFIG_COLOR['noire']} ] : Port [{str(port)}] ouvert sur l'adresse ip [{remote_ip}]")
fullname = f'{nickname}!{username}@{hostname}'
self.Irc.send2socket(f":{self.Config.SERVICE_NICKNAME} PRIVMSG {self.Config.SERVICE_CHANLOG} :[ {self.Config.CONFIG_COLOR['rouge']}PROXY_SCAN{self.Config.CONFIG_COLOR['noire']} ] {fullname} ({remote_ip}) : Port [{str(port)}] ouvert sur l'adresse ip [{remote_ip}]")
# print(f"=======> Le port {str(port)} est ouvert !!")
self.Base.running_sockets.append(newSocket)
# print(newSocket)
@@ -637,21 +655,19 @@ class Defender():
# newSocket.shutdown(socket.SHUT_RDWR)
newSocket.close()
self.Logs.info('=======> Fermeture de la socket')
pass
def thread_local_scan(self) -> None:
try:
while self.localscan_isRunning:
list_to_remove:list = []
for ip in self.localscan_remote_ip:
self.scan_ports(ip)
list_to_remove.append(ip)
for user in self.localscan_UserModel:
self.scan_ports(user)
list_to_remove.append(user)
time.sleep(1)
for ip_to_remove in list_to_remove:
self.localscan_remote_ip.remove(ip_to_remove)
for user_model in list_to_remove:
self.localscan_UserModel.remove(user_model)
time.sleep(1)
@@ -659,7 +675,7 @@ class Defender():
except ValueError as ve:
self.Logs.warning(f"thread_local_scan Error : {ve}")
def get_ports_connexion(self, remote_ip: str) -> list[int]:
def get_ports_connexion(self, userModel: User.UserModel) -> list[int]:
"""psutil_scan for Linux
Args:
@@ -669,13 +685,20 @@ class Defender():
list[int]: list of ports
"""
try:
User = userModel
remote_ip = User.remote_ip
username = User.username
hostname = User.hostname
nickname = User.nickname
if remote_ip in self.Config.WHITELISTED_IP:
return None
connections = psutil.net_connections(kind='inet')
fullname = f'{nickname}!{username}@{hostname}'
matching_ports = [conn.raddr.port for conn in connections if conn.raddr and conn.raddr.ip == remote_ip]
self.Logs.info(f"Connexion of {remote_ip} using ports : {str(matching_ports)}")
self.Logs.info(f"Connexion of {fullname} ({remote_ip}) using ports : {str(matching_ports)}")
return matching_ports
@@ -688,13 +711,13 @@ class Defender():
while self.psutil_isRunning:
list_to_remove:list = []
for ip in self.psutil_remote_ip:
self.get_ports_connexion(ip)
list_to_remove.append(ip)
for user in self.psutil_UserModel:
self.get_ports_connexion(user)
list_to_remove.append(user)
time.sleep(1)
for ip_to_remove in list_to_remove:
self.psutil_remote_ip.remove(ip_to_remove)
for user_model in list_to_remove:
self.psutil_UserModel.remove(user_model)
time.sleep(1)
@@ -702,16 +725,22 @@ class Defender():
except ValueError as ve:
self.Logs.warning(f"thread_psutil_scan Error : {ve}")
def abuseipdb_scan(self, remote_ip:str) -> Union[dict[str, any], None]:
def abuseipdb_scan(self, userModel: User.UserModel) -> Union[dict[str, any], None]:
"""Analyse l'ip avec AbuseIpDB
Cette methode devra etre lancer toujours via un thread ou un timer.
Args:
remote_ip (_type_): l'ip a analyser
userModel (UserModel): l'objet User qui contient l'ip
Returns:
dict[str, any] | None: les informations du provider
keys : 'score', 'country', 'isTor', 'totalReports'
"""
User = userModel
remote_ip = User.remote_ip
username = User.username
hostname = User.hostname
nickname = User.nickname
if remote_ip in self.Config.WHITELISTED_IP:
return None
if self.ModConfig.abuseipdb_scan == 0:
@@ -731,11 +760,12 @@ class Defender():
'Key': self.abuseipdb_key
}
response = requests.request(method='GET', url=url, headers=headers, params=querystring, timeout=self.timeout)
# Formatted output
decodedResponse = json.loads(response.text)
try:
response = requests.request(method='GET', url=url, headers=headers, params=querystring, timeout=self.timeout)
# Formatted output
decodedResponse = json.loads(response.text)
if not 'data' in decodedResponse:
return None
@@ -751,7 +781,10 @@ class Defender():
color_red = self.Config.CONFIG_COLOR['rouge']
color_black = self.Config.CONFIG_COLOR['noire']
self.Irc.send2socket(f":{service_id} PRIVMSG {service_chanlog} :[ {color_red}ABUSEIPDB_SCAN{color_black} ] : Connexion de {remote_ip} ==> Score: {str(result['score'])} | Country : {result['country']} | Tor : {str(result['isTor'])} | Total Reports : {str(result['totalReports'])}")
# pseudo!ident@host
fullname = f'{nickname}!{username}@{hostname}'
self.Irc.send2socket(f":{service_id} PRIVMSG {service_chanlog} :[ {color_red}ABUSEIPDB_SCAN{color_black} ] : Connexion de {fullname} ({remote_ip}) ==> Score: {str(result['score'])} | Country : {result['country']} | Tor : {str(result['isTor'])} | Total Reports : {str(result['totalReports'])}")
if result['isTor']:
self.Irc.send2socket(f":{service_id} GLINE +*@{remote_ip} {self.Config.GLINE_DURATION} This server do not allow Tor connexions {str(result['isTor'])} - Detected by Abuseipdb")
@@ -767,20 +800,22 @@ class Defender():
self.Logs.error(f"AbuseIpDb Timeout : {rt}")
except requests.ConnectionError as ce:
self.Logs.error(f"AbuseIpDb Connection Error : {ce}")
except Exception as err:
self.Logs.error(f"General Error Abuseipdb : {err}")
def thread_abuseipdb_scan(self) -> None:
try:
while self.abuseipdb_isRunning:
list_to_remove:list = []
for ip in self.abuseipdb_remote_ip:
self.abuseipdb_scan(ip)
list_to_remove.append(ip)
list_to_remove: list = []
for user in self.abuseipdb_UserModel:
self.abuseipdb_scan(user)
list_to_remove.append(user)
time.sleep(1)
for ip_to_remove in list_to_remove:
self.abuseipdb_remote_ip.remove(ip_to_remove)
for user_model in list_to_remove:
self.abuseipdb_UserModel.remove(user_model)
time.sleep(1)
@@ -788,7 +823,7 @@ class Defender():
except ValueError as ve:
self.Logs.error(f"thread_abuseipdb_scan Error : {ve}")
def freeipapi_scan(self, remote_ip:str) -> Union[dict[str, any], None]:
def freeipapi_scan(self, userModel: User.UserModel) -> Union[dict[str, any], None]:
"""Analyse l'ip avec Freeipapi
Cette methode devra etre lancer toujours via un thread ou un timer.
Args:
@@ -798,6 +833,12 @@ class Defender():
dict[str, any] | None: les informations du provider
keys : 'countryCode', 'isProxy'
"""
User = userModel
remote_ip = User.remote_ip
username = User.username
hostname = User.hostname
nickname = User.nickname
if remote_ip in self.Config.WHITELISTED_IP:
return None
if self.ModConfig.freeipapi_scan == 0:
@@ -814,11 +855,12 @@ class Defender():
'Accept': 'application/json',
}
response = requests.request(method='GET', url=url, headers=headers, timeout=self.timeout)
# Formatted output
decodedResponse = json.loads(response.text)
try:
response = requests.request(method='GET', url=url, headers=headers, timeout=self.timeout)
# Formatted output
decodedResponse = json.loads(response.text)
status_code = response.status_code
if status_code == 429:
self.Logs.warning(f'Too Many Requests - The rate limit for the API has been exceeded.')
@@ -832,7 +874,10 @@ class Defender():
'isProxy': decodedResponse['isProxy'] if 'isProxy' in decodedResponse else None
}
self.Irc.send2socket(f":{service_id} PRIVMSG {service_chanlog} :[ {color_red}FREEIPAPI_SCAN{color_black} ] : Connexion de {remote_ip} ==> Proxy: {str(result['isProxy'])} | Country : {str(result['countryCode'])}")
# pseudo!ident@host
fullname = f'{nickname}!{username}@{hostname}'
self.Irc.send2socket(f":{service_id} PRIVMSG {service_chanlog} :[ {color_red}FREEIPAPI_SCAN{color_black} ] : Connexion de {fullname} ({remote_ip}) ==> Proxy: {str(result['isProxy'])} | Country : {str(result['countryCode'])}")
if result['isProxy']:
self.Irc.send2socket(f":{service_id} GLINE +*@{remote_ip} {self.Config.GLINE_DURATION} This server do not allow proxy connexions {str(result['isProxy'])} - detected by freeipapi")
@@ -841,20 +886,22 @@ class Defender():
return result
except KeyError as ke:
self.Logs.error(f"FREEIPAPI_SCAN KeyError : {ke}")
except Exception as err:
self.Logs.error(f"General Error Freeipapi : {err}")
def thread_freeipapi_scan(self) -> None:
try:
while self.freeipapi_isRunning:
list_to_remove:list = []
for ip in self.freeipapi_remote_ip:
self.freeipapi_scan(ip)
list_to_remove.append(ip)
list_to_remove: list[User.UserModel] = []
for user in self.freeipapi_UserModel:
self.freeipapi_scan(user)
list_to_remove.append(user)
time.sleep(1)
for ip_to_remove in list_to_remove:
self.freeipapi_remote_ip.remove(ip_to_remove)
for user_model in list_to_remove:
self.freeipapi_UserModel.remove(user_model)
time.sleep(1)
@@ -862,7 +909,7 @@ class Defender():
except ValueError as ve:
self.Logs.error(f"thread_freeipapi_scan Error : {ve}")
def cloudfilt_scan(self, remote_ip:str) -> Union[dict[str, any], None]:
def cloudfilt_scan(self, userModel: User.UserModel) -> Union[dict[str, any], None]:
"""Analyse l'ip avec cloudfilt
Cette methode devra etre lancer toujours via un thread ou un timer.
Args:
@@ -872,6 +919,12 @@ class Defender():
dict[str, any] | None: les informations du provider
keys : 'countryCode', 'isProxy'
"""
User = userModel
remote_ip = User.remote_ip
username = User.username
hostname = User.hostname
nickname = User.nickname
if remote_ip in self.Config.WHITELISTED_IP:
return None
if self.ModConfig.cloudfilt_scan == 0:
@@ -891,11 +944,10 @@ class Defender():
'key': self.cloudfilt_key
}
response = requests.post(url=url, data=data)
# Formatted output
decodedResponse = json.loads(response.text)
try:
response = requests.post(url=url, data=data)
# Formatted output
decodedResponse = json.loads(response.text)
status_code = response.status_code
if status_code != 200:
self.Logs.warning(f'Error connecting to cloudfilt API | Code: {str(status_code)}')
@@ -908,7 +960,10 @@ class Defender():
'host': decodedResponse['host'] if 'host' in decodedResponse else None
}
self.Irc.send2socket(f":{service_id} PRIVMSG {service_chanlog} :[ {color_red}CLOUDFILT_SCAN{color_black} ] : Connexion de {str(remote_ip)} ==> Host: {str(result['host'])} | country: {str(result['countryiso'])} | listed: {str(result['listed'])} | listed by : {str(result['listed_by'])}")
# pseudo!ident@host
fullname = f'{nickname}!{username}@{hostname}'
self.Irc.send2socket(f":{service_id} PRIVMSG {service_chanlog} :[ {color_red}CLOUDFILT_SCAN{color_black} ] : Connexion de {fullname} ({remote_ip}) ==> Host: {str(result['host'])} | country: {str(result['countryiso'])} | listed: {str(result['listed'])} | listed by : {str(result['listed_by'])}")
if result['listed']:
self.Irc.send2socket(f":{service_id} GLINE +*@{remote_ip} {self.Config.GLINE_DURATION} You connexion is listed as dangerous {str(result['listed'])} {str(result['listed_by'])} - detected by cloudfilt")
@@ -926,13 +981,13 @@ class Defender():
while self.cloudfilt_isRunning:
list_to_remove:list = []
for ip in self.cloudfilt_remote_ip:
self.cloudfilt_scan(ip)
list_to_remove.append(ip)
for user in self.cloudfilt_UserModel:
self.cloudfilt_scan(user)
list_to_remove.append(user)
time.sleep(1)
for ip_to_remove in list_to_remove:
self.cloudfilt_remote_ip.remove(ip_to_remove)
for user_model in list_to_remove:
self.cloudfilt_UserModel.remove(user_model)
time.sleep(1)
@@ -966,22 +1021,6 @@ class Defender():
if not self.Base.is_valid_ip(cmd[2]):
return None
# self.Base.scan_ports(cmd[2])
if self.ModConfig.local_scan == 1 and not cmd[2] in self.Config.WHITELISTED_IP:
self.localscan_remote_ip.append(cmd[2])
if self.ModConfig.psutil_scan == 1 and not cmd[2] in self.Config.WHITELISTED_IP:
self.psutil_remote_ip.append(cmd[2])
if self.ModConfig.abuseipdb_scan == 1 and not cmd[2] in self.Config.WHITELISTED_IP:
self.abuseipdb_remote_ip.append(cmd[2])
if self.ModConfig.freeipapi_scan == 1 and not cmd[2] in self.Config.WHITELISTED_IP:
self.freeipapi_remote_ip.append(cmd[2])
if self.ModConfig.cloudfilt_scan == 1 and not cmd[2] in self.Config.WHITELISTED_IP:
self.cloudfilt_remote_ip.append(cmd[2])
# Possibilité de déclancher les bans a ce niveau.
except IndexError as ie:
self.Logs.error(f'cmd reputation: index error: {ie}')
@@ -1005,6 +1044,15 @@ class Defender():
# Get User information
_User = self.User.get_User(str(cmd[7]))
# If user is not service or IrcOp then scan them
if not re.match(fr'^.*[S|o?].*$', _User.umodes):
self.abuseipdb_UserModel.append(_User) if self.ModConfig.abuseipdb_scan == 1 and not _User.remote_ip in self.Config.WHITELISTED_IP else None
self.freeipapi_UserModel.append(_User) if self.ModConfig.freeipapi_scan == 1 and not _User.remote_ip in self.Config.WHITELISTED_IP else None
self.cloudfilt_UserModel.append(_User) if self.ModConfig.cloudfilt_scan == 1 and not _User.remote_ip in self.Config.WHITELISTED_IP else None
self.psutil_UserModel.append(_User) if self.ModConfig.psutil_scan == 1 and not _User.remote_ip in self.Config.WHITELISTED_IP else None
self.localscan_UserModel.append(_User) if self.ModConfig.local_scan == 1 and not _User.remote_ip in self.Config.WHITELISTED_IP else None
if _User is None:
self.Logs.critical(f'This UID: [{cmd[7]}] is not available please check why')
return None
@@ -1582,9 +1630,10 @@ class Defender():
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : NICKNAME : {UserObject.nickname}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : USERNAME : {UserObject.username}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : HOSTNAME : {UserObject.hostname}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : IP : {UserObject.remote_ip}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : REPUTATION : {UserObject.score_connexion}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : VHOST : {UserObject.vhost}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : IP : {UserObject.remote_ip}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : WebIrc : {UserObject.isWebirc}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : REPUTATION : {UserObject.score_connexion}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : MODES : {UserObject.umodes}')
self.Irc.send2socket(f':{dnickname} NOTICE {fromuser} : CONNECTION TIME : {UserObject.connexion_datetime}')
else:

View File

@@ -1,179 +0,0 @@
from dataclasses import dataclass
from core.irc import Irc
from unrealircd_rpc_py.Live import Live
class Jsonrpc():
@dataclass
class ModConfModel:
"""The Model containing the module parameters
"""
param_exemple1: str
param_exemple2: int
def __init__(self, ircInstance:Irc) -> None:
# Module name (Mandatory)
self.module_name = 'mod_' + str(self.__class__.__name__).lower()
# Add Irc Object to the module (Mandatory)
self.Irc = ircInstance
# Add Global Configuration to the module (Mandatory)
self.Config = ircInstance.Config
# Add Base object to the module (Mandatory)
self.Base = ircInstance.Base
# Add logs object to the module (Mandatory)
self.Logs = ircInstance.Base.logs
# Add User object to the module (Mandatory)
self.User = ircInstance.User
# Add Channel object to the module (Mandatory)
self.Channel = ircInstance.Channel
# Create module commands (Mandatory)
self.commands_level = {
1: ['jsonrpc']
}
# Init the module
self.__init_module()
# Log the module
self.Logs.debug(f'Module {self.module_name} loaded ...')
def __init_module(self) -> None:
# Insert module commands into the core one (Mandatory)
self.__set_commands(self.commands_level)
# Create you own tables (Mandatory)
# self.__create_tables()
# Load module configuration and sync with core one (Mandatory)
self.__load_module_configuration()
# End of mandatory methods you can start your customization #
return None
def __set_commands(self, commands:dict[int, list[str]]) -> None:
"""### Rajoute les commandes du module au programme principal
Args:
commands (list): Liste des commandes du module
"""
for level, com in commands.items():
for c in commands[level]:
if not c in self.Irc.commands:
self.Irc.commands_level[level].append(c)
self.Irc.commands.append(c)
return None
def __create_tables(self) -> None:
"""Methode qui va créer la base de donnée si elle n'existe pas.
Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module
Args:
database_name (str): Nom de la base de données ( pas d'espace dans le nom )
Returns:
None: Aucun retour n'es attendu
"""
table_logs = '''CREATE TABLE IF NOT EXISTS test_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT,
server_msg TEXT
)
'''
self.Base.db_execute_query(table_logs)
return None
def callback_sent_to_irc(self, json_response: str):
dnickname = self.Config.SERVICE_NICKNAME
dchanlog = self.Config.SERVICE_CHANLOG
self.Irc.send2socket(f":{dnickname} PRIVMSG {dchanlog} :{json_response}")
pass
def thread_start_jsonrpc(self):
liveRpc = Live(path_to_socket_file='/home/adator/IRC/unrealircd6.1.2.1/data/rpc.socket',
callback_object_instance=self,
callback_method_name='callback_sent_to_irc'
)
liveRpc.execute_async_method()
pass
def __load_module_configuration(self) -> None:
"""### Load Module Configuration
"""
try:
# Build the default configuration model (Mandatory)
self.ModConfig = self.ModConfModel(param_exemple1='param value 1', param_exemple2=1)
# Sync the configuration with core configuration (Mandatory)
#self.Base.db_sync_core_config(self.module_name, self.ModConfig)
return None
except TypeError as te:
self.Logs.critical(te)
def __update_configuration(self, param_key: str, param_value: str):
"""Update the local and core configuration
Args:
param_key (str): The parameter key
param_value (str): The parameter value
"""
self.Base.db_update_core_config(self.module_name, self.ModConfig, param_key, param_value)
def unload(self) -> None:
return None
def cmd(self, data:list) -> None:
return None
def _hcmds(self, user:str, channel: any, cmd: list, fullcmd: list = []) -> None:
command = str(cmd[0]).lower()
dnickname = self.Config.SERVICE_NICKNAME
fromuser = user
fromchannel = str(channel) if not channel is None else None
match command:
case 'jsonrpc':
self.Base.create_thread(self.thread_start_jsonrpc, run_once=True)
pass
case 'ia':
try:
self.Base.create_thread(self.thread_ask_ia, ('',))
self.Irc.send2socket(f":{dnickname} NOTICE {fromuser} : This is a notice to the sender ...")
self.Irc.send2socket(f":{dnickname} PRIVMSG {fromuser} : This is private message to the sender ...")
if not fromchannel is None:
self.Irc.send2socket(f":{dnickname} PRIVMSG {fromchannel} : This is channel message to the sender ...")
# How to update your module configuration
self.__update_configuration('param_exemple2', 7)
# Log if you want the result
self.Logs.debug(f"Test logs ready")
except Exception as err:
self.Logs.error(f"Unknown Error: {err}")

Binary file not shown.

View File

@@ -1,3 +1,3 @@
{
"version": "5.1.9"
"version": "5.2.0"
}