Update to the 3.0.0 V

This commit is contained in:
adator
2025-10-18 20:49:21 +02:00
parent fd9643eddc
commit a043a58f45
18 changed files with 435 additions and 415 deletions

View File

@@ -1,13 +1,13 @@
import logging
import asyncio
from unrealircd_rpc_py.objects.Definition import LiveRPCResult
import mods.jsonrpc.utils as utils
import mods.jsonrpc.threads as thds
from time import sleep
from types import SimpleNamespace
from typing import TYPE_CHECKING
from dataclasses import dataclass
from unrealircd_rpc_py.Live import LiveWebsocket, LiveUnixSocket
from unrealircd_rpc_py.Loader import Loader
from unrealircd_rpc_py.ConnectionFactory import ConnectionFactory
from unrealircd_rpc_py.LiveConnectionFactory import LiveConnectionFactory
if TYPE_CHECKING:
from core.irc import Irc
@@ -85,43 +85,24 @@ class Jsonrpc():
self.__load_module_configuration()
# End of mandatory methods you can start your customization #
self.UnrealIrcdRpcLive: LiveWebsocket = LiveWebsocket(
url=self.Config.JSONRPC_URL,
username=self.Config.JSONRPC_USER,
password=self.Config.JSONRPC_PASSWORD,
callback_object_instance=self,
callback_method_or_function_name='callback_sent_to_irc'
)
if self.UnrealIrcdRpcLive.get_error.code != 0:
self.Logs.error(f"{self.UnrealIrcdRpcLive.get_error.message} ({self.UnrealIrcdRpcLive.get_error.code})")
try:
self.Rpc = ConnectionFactory(self.Config.DEBUG_LEVEL).get(self.Config.JSONRPC_METHOD)
self.LiveRpc = LiveConnectionFactory(self.Config.DEBUG_LEVEL).get(self.Config.JSONRPC_METHOD)
self.Rpc.setup({'url': self.Config.JSONRPC_URL, 'username': self.Config.JSONRPC_USER, 'password': self.Config.JSONRPC_PASSWORD})
self.LiveRpc.setup({'url': self.Config.JSONRPC_URL, 'username': self.Config.JSONRPC_USER, 'password': self.Config.JSONRPC_PASSWORD,
'callback_object_instance' : self, 'callback_method_or_function_name': 'callback_sent_to_irc'})
if self.ModConfig.jsonrpc == 1:
self.Base.create_thread(func=self.Threads.thread_subscribe, func_args=(self, ), run_once=True)
return None
except Exception as err:
self.Protocol.send_priv_msg(
nick_from=self.Config.SERVICE_NICKNAME,
msg=f"[{self.Config.COLORS.red}ERROR{self.Config.COLORS.nogc}] {self.UnrealIrcdRpcLive.get_error.message}",
msg=f"[{self.Config.COLORS.red}JSONRPC ERROR{self.Config.COLORS.nogc}] {err.__str__()}",
channel=self.Config.SERVICE_CHANLOG
)
raise Exception(f"[LIVE-JSONRPC ERROR] {self.UnrealIrcdRpcLive.get_error.message}")
self.Rpc: Loader = Loader(
req_method=self.Config.JSONRPC_METHOD,
url=self.Config.JSONRPC_URL,
username=self.Config.JSONRPC_USER,
password=self.Config.JSONRPC_PASSWORD
)
if self.Rpc.get_error.code != 0:
self.Logs.error(f"{self.Rpc.get_error.message} ({self.Rpc.get_error.code})")
self.Protocol.send_priv_msg(
nick_from=self.Config.SERVICE_NICKNAME,
msg=f"[{self.Config.COLORS.red}JSONRPC ERROR{self.Config.COLORS.nogc}] {self.Rpc.get_error.message}",
channel=self.Config.SERVICE_CHANLOG
)
raise Exception(f"[JSONRPC ERROR] {self.Rpc.get_error.message}")
if self.ModConfig.jsonrpc == 1:
self.Base.create_thread(func=self.Threads.thread_subscribe, func_args=(self, ), run_once=True)
return None
)
self.Logs.error(f"JSONRPC ERROR: {err.__str__()}")
def __create_tables(self) -> None:
"""Methode qui va créer la base de donnée si elle n'existe pas.
@@ -143,7 +124,7 @@ class Jsonrpc():
self.Base.db_execute_query(table_logs)
return None
def callback_sent_to_irc(self, response: SimpleNamespace) -> None:
def callback_sent_to_irc(self, response: LiveRPCResult) -> None:
dnickname = self.Config.SERVICE_NICKNAME
dchanlog = self.Config.SERVICE_CHANLOG
@@ -152,29 +133,19 @@ class Jsonrpc():
bold = self.Config.COLORS.bold
red = self.Config.COLORS.red
if self.UnrealIrcdRpcLive.get_error.code != 0:
if response.error.code != 0:
self.Protocol.send_priv_msg(nick_from=dnickname,
msg=f"[{bold}{red}JSONRPC ERROR{nogc}{bold}] {self.UnrealIrcdRpcLive.get_error.message}",
msg=f"[{bold}{red}JSONRPC ERROR{nogc}{bold}] {response.error.message} ({response.error.code})",
channel=dchanlog)
return None
if hasattr(response, 'error'):
if response.error.code != 0:
if isinstance(response.result, bool):
if response.result:
self.Protocol.send_priv_msg(
nick_from=self.Config.SERVICE_NICKNAME,
msg=f"[{bold}{red}JSONRPC{nogc}{bold}] JSONRPC Event activated on {self.Config.JSONRPC_URL}",
channel=dchanlog)
return None
if hasattr(response, 'result'):
if isinstance(response.result, bool):
if response.result:
self.Protocol.send_priv_msg(
nick_from=self.Config.SERVICE_NICKNAME,
msg=f"[{bold}{green}JSONRPC{nogc}{bold}] JSONRPC Event activated on {self.Config.JSONRPC_URL}",
channel=dchanlog)
return None
return None
level = response.result.level if hasattr(response.result, 'level') else ''
subsystem = response.result.subsystem if hasattr(response.result, 'subsystem') else ''
@@ -278,18 +249,13 @@ class Jsonrpc():
match option:
case 'get':
nickname = str(cmd[2])
uid_to_get = self.User.get_uid(nickname)
if uid_to_get is None:
return None
rpc = self.Rpc
UserInfo = rpc.User.get(uid_to_get)
if rpc.get_error.code != 0:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'{rpc.get_error.message}')
UserInfo = rpc.User.get(nickname)
if UserInfo.error.code != 0:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'{UserInfo.error.message}')
return None
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'UID : {UserInfo.id}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'NICKNAME : {UserInfo.name}')
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f'USERNAME : {UserInfo.user.username}')
@@ -321,9 +287,8 @@ class Jsonrpc():
case 'jrinstances':
try:
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"GC Collect: {self.MainUtils.run_python_garbage_collector()}")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Nombre d'instance LiveWebsock: {self.MainUtils.get_number_gc_objects(LiveWebsocket)}")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Nombre d'instance LiveUnixSocket: {self.MainUtils.get_number_gc_objects(LiveUnixSocket)}")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Nombre d'instance Loader: {self.MainUtils.get_number_gc_objects(Loader)}")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Nombre d'instance LiveWebsock: {self.MainUtils.get_number_gc_objects(LiveConnectionFactory)}")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Nombre d'instance ConnectionFactory: {self.MainUtils.get_number_gc_objects(ConnectionFactory)}")
self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Nombre de toute les instances: {self.MainUtils.get_number_gc_objects()}")
except Exception as err:
self.Logs.error(f"Unknown Error: {err}")

View File

@@ -5,24 +5,20 @@ if TYPE_CHECKING:
from mods.jsonrpc.mod_jsonrpc import Jsonrpc
def thread_subscribe(uplink: 'Jsonrpc') -> None:
response: dict[str, dict] = {}
snickname = uplink.Config.SERVICE_NICKNAME
schannel = uplink.Config.SERVICE_CHANLOG
uplink.is_streaming = True
response = asyncio.run(uplink.LiveRpc.subscribe(["all"]))
if uplink.UnrealIrcdRpcLive.get_error.code == 0:
uplink.is_streaming = True
response = asyncio.run(uplink.UnrealIrcdRpcLive.subscribe(["all"]))
else:
if response.error.code != 0:
uplink.Protocol.send_priv_msg(nick_from=snickname,
msg=f"[{uplink.Config.COLORS.red}JSONRPC ERROR{uplink.Config.COLORS.nogc}] {uplink.UnrealIrcdRpcLive.get_error.message}",
msg=f"[{uplink.Config.COLORS.red}JSONRPC ERROR{uplink.Config.COLORS.nogc}] {response.error.message}",
channel=schannel
)
if response is None:
return
code = response.get('error', {}).get('code', 0)
message = response.get('error', {}).get('message', None)
code = response.error.code
message = response.error.message
if code == 0:
uplink.Protocol.send_priv_msg(
@@ -39,18 +35,15 @@ def thread_subscribe(uplink: 'Jsonrpc') -> None:
def thread_unsubscribe(uplink: 'Jsonrpc') -> None:
response: dict[str, dict] = asyncio.run(uplink.UnrealIrcdRpcLive.unsubscribe())
response = asyncio.run(uplink.LiveRpc.unsubscribe())
uplink.Logs.debug("[JSONRPC UNLOAD] Unsubscribe from the stream!")
uplink.is_streaming = False
uplink.update_configuration('jsonrpc', 0)
snickname = uplink.Config.SERVICE_NICKNAME
schannel = uplink.Config.SERVICE_CHANLOG
if response is None:
return None
code = response.get('error', {}).get('code', 0)
message = response.get('error', {}).get('message', None)
code = response.error.code
message = response.error.message
if code != 0:
uplink.Protocol.send_priv_msg(