From 51f709e4a1cd191e9c1b25bc963205fa586d9f7c Mon Sep 17 00:00:00 2001 From: adator <85586985+adator85@users.noreply.github.com> Date: Thu, 20 Nov 2025 14:02:45 +0100 Subject: [PATCH] Final main file --- core/classes/interfaces/irpc_endpoint.py | 34 +++++++++++++ core/classes/modules/rpc/rpc.py | 64 +++++------------------- core/classes/modules/rpc/rpc_channel.py | 15 ++++-- core/classes/modules/rpc/rpc_command.py | 47 +++++++++++------ core/classes/modules/rpc/rpc_errors.py | 43 ++++++++++++++++ core/classes/modules/rpc/rpc_user.py | 35 +++++++++---- defender.py | 6 ++- 7 files changed, 159 insertions(+), 85 deletions(-) create mode 100644 core/classes/interfaces/irpc_endpoint.py create mode 100644 core/classes/modules/rpc/rpc_errors.py diff --git a/core/classes/interfaces/irpc_endpoint.py b/core/classes/interfaces/irpc_endpoint.py new file mode 100644 index 0000000..0f5d0b4 --- /dev/null +++ b/core/classes/interfaces/irpc_endpoint.py @@ -0,0 +1,34 @@ +import starlette.status as http_status_code +from typing import TYPE_CHECKING +from core.classes.modules.rpc.rpc_errors import JSONRPCErrorCode + +if TYPE_CHECKING: + from core.loader import Loader + +class IRPC: + + def __init__(self, loader: 'Loader'): + self.ctx = loader + self.http_status_code = http_status_code + self.response_model = { + "jsonrpc": "2.0", + "id": 123 + } + + def reset(self): + self.response_model = { + "jsonrpc": "2.0", + "id": 123 + } + + def create_error_response(self, error_code: JSONRPCErrorCode, details: dict = None) -> dict[str, str]: + """Create a JSON-RPC error!""" + response = { + "code": error_code.value, + "message": error_code.description(), + } + + if details: + response["data"] = details + + return response \ No newline at end of file diff --git a/core/classes/modules/rpc/rpc.py b/core/classes/modules/rpc/rpc.py index b26e34e..1b6646d 100644 --- a/core/classes/modules/rpc/rpc.py +++ b/core/classes/modules/rpc/rpc.py @@ -1,11 +1,12 @@ import base64 import json +import uvicorn +import core.classes.modules.rpc.rpc_errors as rpcerr +import starlette.status as http_status_code from starlette.applications import Starlette from starlette.responses import JSONResponse from starlette.requests import Request from starlette.routing import Route -import uvicorn -from enum import Enum from typing import TYPE_CHECKING, Any, Optional from core.classes.modules.rpc.rpc_user import RPCUser from core.classes.modules.rpc.rpc_channel import RPCChannel @@ -82,15 +83,17 @@ class JSonRpcServer: response_data['method'] = method rip = request.client.host rport = request.client.port - http_code = 200 + http_code = http_status_code.HTTP_200_OK if method in self.methods: - response_data['result'] = self.methods[method](**params) - return JSONResponse(response_data, http_code) + r: JSONResponse = self.methods[method](**params) + resp = json.loads(r.body) + resp['id'] = request_data.get('id', 123) + return JSONResponse(resp, r.status_code) - response_data['error'] = create_error_response(JSONRPCErrorCode.METHOD_NOT_FOUND) + response_data['error'] = rpcerr.create_error_response(rpcerr.JSONRPCErrorCode.METHOD_NOT_FOUND) self._ctx.Logs.debug(f'[RPC ERROR] {method} recieved from {rip}:{rport}') - http_code = 404 + http_code = http_status_code.HTTP_404_NOT_FOUND return JSONResponse(response_data, http_code) def authenticate(self, headers: dict, body: dict) -> JSONResponse: @@ -131,49 +134,6 @@ class JSonRpcServer: response_data = { 'jsonrpc': '2.0', 'id': request_data.get('id', 123), - 'error': create_error_response(JSONRPCErrorCode.AUTHENTICATION_ERROR) + 'error': rpcerr.create_error_response(rpcerr.JSONRPCErrorCode.AUTHENTICATION_ERROR) } - return JSONResponse(response_data) - - -class JSONRPCErrorCode(Enum): - PARSE_ERROR = -32700 # Syntax error in the request (malformed JSON) - INVALID_REQUEST = -32600 # Invalid Request (incorrect structure or missing fields) - METHOD_NOT_FOUND = -32601 # Method not found (the requested method does not exist) - INVALID_PARAMS = -32602 # Invalid Params (the parameters provided are incorrect) - INTERNAL_ERROR = -32603 # Internal Error (an internal server error occurred) - - # Custom application-specific errors (beyond standard JSON-RPC codes) - CUSTOM_ERROR = 1001 # Custom application-defined error (e.g., user not found) - AUTHENTICATION_ERROR = 1002 # Authentication failure (e.g., invalid credentials) - PERMISSION_ERROR = 1003 # Permission error (e.g., user does not have access to this method) - RESOURCE_NOT_FOUND = 1004 # Resource not found (e.g., the requested resource does not exist) - DUPLICATE_REQUEST = 1005 # Duplicate request (e.g., a similar request has already been processed) - - def description(self): - """Returns a description associated with each error code""" - descriptions = { - JSONRPCErrorCode.PARSE_ERROR: "The JSON request is malformed.", - JSONRPCErrorCode.INVALID_REQUEST: "The request is invalid (missing or incorrect fields).", - JSONRPCErrorCode.METHOD_NOT_FOUND: "The requested method could not be found.", - JSONRPCErrorCode.INVALID_PARAMS: "The parameters provided are invalid.", - JSONRPCErrorCode.INTERNAL_ERROR: "An internal error occurred on the server.", - JSONRPCErrorCode.CUSTOM_ERROR: "A custom error defined by the application.", - JSONRPCErrorCode.AUTHENTICATION_ERROR: "User authentication failed.", - JSONRPCErrorCode.PERMISSION_ERROR: "User does not have permission to access this method.", - JSONRPCErrorCode.RESOURCE_NOT_FOUND: "The requested resource could not be found.", - JSONRPCErrorCode.DUPLICATE_REQUEST: "The request is a duplicate or is already being processed.", - } - return descriptions.get(self, "Unknown error") - -def create_error_response(error_code: JSONRPCErrorCode, details: dict = None) -> dict[str, str]: - """Create a JSON-RPC error!""" - response = { - "code": error_code.value, - "message": error_code.description(), - } - - if details: - response["data"] = details - - return response \ No newline at end of file + return JSONResponse(response_data, http_status_code.HTTP_403_FORBIDDEN) diff --git a/core/classes/modules/rpc/rpc_channel.py b/core/classes/modules/rpc/rpc_channel.py index 4f39338..b1388eb 100644 --- a/core/classes/modules/rpc/rpc_channel.py +++ b/core/classes/modules/rpc/rpc_channel.py @@ -1,12 +1,17 @@ from typing import TYPE_CHECKING +from starlette.responses import JSONResponse +from core.classes.interfaces.irpc_endpoint import IRPC +from core.classes.modules.rpc.rpc_errors import JSONRPCErrorCode + if TYPE_CHECKING: from core.loader import Loader -class RPCChannel: +class RPCChannel(IRPC): def __init__(self, loader: 'Loader'): - self._Loader = loader - self._Channel = loader.Channel + super().__init__(loader) - def channel_list(self, **kwargs) -> list[dict]: - return [chan.to_dict() for chan in self._Channel.UID_CHANNEL_DB] \ No newline at end of file + def channel_list(self, **kwargs) -> JSONResponse: + self.reset() + self.response_model['result'] = [chan.to_dict() for chan in self.ctx.Channel.UID_CHANNEL_DB] + return JSONResponse(self.response_model) \ No newline at end of file diff --git a/core/classes/modules/rpc/rpc_command.py b/core/classes/modules/rpc/rpc_command.py index 230af53..ebf0691 100644 --- a/core/classes/modules/rpc/rpc_command.py +++ b/core/classes/modules/rpc/rpc_command.py @@ -1,29 +1,44 @@ -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING +from starlette.responses import JSONResponse +from core.classes.interfaces.irpc_endpoint import IRPC +from core.classes.modules.rpc.rpc_errors import JSONRPCErrorCode if TYPE_CHECKING: from core.loader import Loader -class RPCCommand: +class RPCCommand(IRPC): def __init__(self, loader: 'Loader'): - self._Loader = loader - self._Command = loader.Commands + super().__init__(loader) - def command_list(self, **kwargs) -> list[dict]: - return [command.to_dict() for command in self._Command.DB_COMMANDS] + def command_list(self, **kwargs) -> JSONResponse: + self.reset() + self.response_model['result'] = [command.to_dict() for command in self.ctx.Commands.DB_COMMANDS] + return JSONResponse(self.response_model) - def command_get_by_module(self, **kwargs) -> list[dict]: - module_name = kwargs.get('module_name', None) - if module_name is None: - return [] + def command_get_by_module(self, **kwargs) -> JSONResponse: + self.reset() + module_name: str = kwargs.get('module_name', '') - return [command.to_dict() for command in self._Command.DB_COMMANDS if command.module_name.lower() == module_name.lower()] + if not module_name: + self.response_model['error'] = self.create_error_response(JSONRPCErrorCode.INVALID_PARAMS, {'module_name': 'The param to use is module_name'}) + return JSONResponse(self.response_model, self.http_status_code.HTTP_405_METHOD_NOT_ALLOWED) + + self.response_model['result'] = [command.to_dict() for command in self.ctx.Commands.DB_COMMANDS if command.module_name.lower() == module_name.lower()] + return JSONResponse(self.response_model) + + def command_get_by_name(self, **kwargs) -> JSONResponse: + self.reset() - def command_get_by_name(self, **kwargs) -> dict: command_name: str = kwargs.get('command_name', '') if not command_name: - return dict() + self.response_model['error'] = self.create_error_response(JSONRPCErrorCode.INVALID_PARAMS, {'command_name': f'The param to use is command_name'}) + return JSONResponse(self.response_model, self.http_status_code.HTTP_405_METHOD_NOT_ALLOWED) - for command in self._Command.DB_COMMANDS: + command_to_return: list[dict] = [] + for command in self.ctx.Commands.DB_COMMANDS: if command.command_name.lower() == command_name.lower(): - return command.to_dict() - return dict() \ No newline at end of file + command_to_return.append(command.to_dict()) + + self.response_model['result'] = command_to_return + + return JSONResponse(self.response_model) \ No newline at end of file diff --git a/core/classes/modules/rpc/rpc_errors.py b/core/classes/modules/rpc/rpc_errors.py new file mode 100644 index 0000000..dc9fb90 --- /dev/null +++ b/core/classes/modules/rpc/rpc_errors.py @@ -0,0 +1,43 @@ +from enum import Enum + +class JSONRPCErrorCode(Enum): + PARSE_ERROR = -32700 # Syntax error in the request (malformed JSON) + INVALID_REQUEST = -32600 # Invalid Request (incorrect structure or missing fields) + METHOD_NOT_FOUND = -32601 # Method not found (the requested method does not exist) + INVALID_PARAMS = -32602 # Invalid Params (the parameters provided are incorrect) + INTERNAL_ERROR = -32603 # Internal Error (an internal server error occurred) + + # Custom application-specific errors (beyond standard JSON-RPC codes) + CUSTOM_ERROR = 1001 # Custom application-defined error (e.g., user not found) + AUTHENTICATION_ERROR = 1002 # Authentication failure (e.g., invalid credentials) + PERMISSION_ERROR = 1003 # Permission error (e.g., user does not have access to this method) + RESOURCE_NOT_FOUND = 1004 # Resource not found (e.g., the requested resource does not exist) + DUPLICATE_REQUEST = 1005 # Duplicate request (e.g., a similar request has already been processed) + + def description(self): + """Returns a description associated with each error code""" + descriptions = { + JSONRPCErrorCode.PARSE_ERROR: "The JSON request is malformed.", + JSONRPCErrorCode.INVALID_REQUEST: "The request is invalid (missing or incorrect fields).", + JSONRPCErrorCode.METHOD_NOT_FOUND: "The requested method could not be found.", + JSONRPCErrorCode.INVALID_PARAMS: "The parameters provided are invalid.", + JSONRPCErrorCode.INTERNAL_ERROR: "An internal error occurred on the server.", + JSONRPCErrorCode.CUSTOM_ERROR: "A custom error defined by the application.", + JSONRPCErrorCode.AUTHENTICATION_ERROR: "User authentication failed.", + JSONRPCErrorCode.PERMISSION_ERROR: "User does not have permission to access this method.", + JSONRPCErrorCode.RESOURCE_NOT_FOUND: "The requested resource could not be found.", + JSONRPCErrorCode.DUPLICATE_REQUEST: "The request is a duplicate or is already being processed.", + } + return descriptions.get(self, "Unknown error") + +def create_error_response(error_code: JSONRPCErrorCode, details: dict = None) -> dict[str, str]: + """Create a JSON-RPC error!""" + response = { + "code": error_code.value, + "message": error_code.description(), + } + + if details: + response["data"] = details + + return response \ No newline at end of file diff --git a/core/classes/modules/rpc/rpc_user.py b/core/classes/modules/rpc/rpc_user.py index bf8a323..667005a 100644 --- a/core/classes/modules/rpc/rpc_user.py +++ b/core/classes/modules/rpc/rpc_user.py @@ -1,15 +1,20 @@ from typing import TYPE_CHECKING, Optional +from starlette.responses import JSONResponse +from core.classes.interfaces.irpc_endpoint import IRPC +from core.classes.modules.rpc.rpc_errors import JSONRPCErrorCode + if TYPE_CHECKING: from core.loader import Loader from core.definition import MUser -class RPCUser: +class RPCUser(IRPC): def __init__(self, loader: 'Loader'): - self._ctx = loader + super().__init__(loader) - def user_list(self, **kwargs) -> list[dict]: - users = self._ctx.User.UID_DB.copy() + def user_list(self, **kwargs) -> JSONResponse: + self.reset() + users = self.ctx.User.UID_DB.copy() copy_users: list['MUser'] = [] for user in users: @@ -17,14 +22,24 @@ class RPCUser: copy_user.connexion_datetime = copy_user.connexion_datetime.strftime('%d-%m-%Y') copy_users.append(copy_user) - return [user.to_dict() for user in copy_users] + self.response_model['result'] = [user.to_dict() for user in copy_users] - def user_get(self, **kwargs) -> Optional[dict]: - uidornickname = kwargs.get('uid_or_nickname', None) - user = self._ctx.User.get_user(uidornickname) + return JSONResponse(self.response_model) + + def user_get(self, **kwargs) -> JSONResponse: + self.reset() + uidornickname = kwargs.get('uid_or_nickname', '') + + if not uidornickname: + self.response_model['error'] = self.create_error_response(JSONRPCErrorCode.INVALID_PARAMS, {'uid_or_nickname': 'The param to use is uid_or_nickname'}) + return JSONResponse(self.response_model, self.http_status_code.HTTP_405_METHOD_NOT_ALLOWED) + + user = self.ctx.User.get_user(uidornickname) if user: user_copy = user.copy() user_copy.connexion_datetime = user_copy.connexion_datetime.strftime('%d-%m-%Y') - return user_copy.to_dict() + self.response_model['result'] = user_copy.to_dict() + return JSONResponse(self.response_model) - return None \ No newline at end of file + self.response_model['result'] = 'User not found!' + return JSONResponse(self.response_model, self.http_status_code.HTTP_204_NO_CONTENT) \ No newline at end of file diff --git a/defender.py b/defender.py index c5787bd..e6aeb98 100644 --- a/defender.py +++ b/defender.py @@ -2,19 +2,21 @@ import asyncio from core import install ############################################# -# @Version : 6.3 # +# @Version : 6.4 # # Requierements : # # Python3.10 or higher # # SQLAlchemy, requests, psutil # # unrealircd-rpc-py, pyyaml # +# uvicorn, starlette, faker # # UnrealIRCD 6.2.2 or higher # ############################################# async def main(): + install.update_packages() from core.loader import Loader loader = Loader() await loader.start() await loader.Irc.run() if __name__ == "__main__": - asyncio.run(main(), debug=True) + asyncio.run(main())