mirror of
https://github.com/iio612/DEFENDER.git
synced 2026-02-13 11:14:23 +00:00
Final main file
This commit is contained in:
34
core/classes/interfaces/irpc_endpoint.py
Normal file
34
core/classes/interfaces/irpc_endpoint.py
Normal file
@@ -0,0 +1,34 @@
|
||||
import starlette.status as http_status_code
|
||||
from typing import TYPE_CHECKING
|
||||
from core.classes.modules.rpc.rpc_errors import JSONRPCErrorCode
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.loader import Loader
|
||||
|
||||
class IRPC:
|
||||
|
||||
def __init__(self, loader: 'Loader'):
|
||||
self.ctx = loader
|
||||
self.http_status_code = http_status_code
|
||||
self.response_model = {
|
||||
"jsonrpc": "2.0",
|
||||
"id": 123
|
||||
}
|
||||
|
||||
def reset(self):
|
||||
self.response_model = {
|
||||
"jsonrpc": "2.0",
|
||||
"id": 123
|
||||
}
|
||||
|
||||
def create_error_response(self, error_code: JSONRPCErrorCode, details: dict = None) -> dict[str, str]:
|
||||
"""Create a JSON-RPC error!"""
|
||||
response = {
|
||||
"code": error_code.value,
|
||||
"message": error_code.description(),
|
||||
}
|
||||
|
||||
if details:
|
||||
response["data"] = details
|
||||
|
||||
return response
|
||||
@@ -1,11 +1,12 @@
|
||||
import base64
|
||||
import json
|
||||
import uvicorn
|
||||
import core.classes.modules.rpc.rpc_errors as rpcerr
|
||||
import starlette.status as http_status_code
|
||||
from starlette.applications import Starlette
|
||||
from starlette.responses import JSONResponse
|
||||
from starlette.requests import Request
|
||||
from starlette.routing import Route
|
||||
import uvicorn
|
||||
from enum import Enum
|
||||
from typing import TYPE_CHECKING, Any, Optional
|
||||
from core.classes.modules.rpc.rpc_user import RPCUser
|
||||
from core.classes.modules.rpc.rpc_channel import RPCChannel
|
||||
@@ -82,15 +83,17 @@ class JSonRpcServer:
|
||||
response_data['method'] = method
|
||||
rip = request.client.host
|
||||
rport = request.client.port
|
||||
http_code = 200
|
||||
http_code = http_status_code.HTTP_200_OK
|
||||
|
||||
if method in self.methods:
|
||||
response_data['result'] = self.methods[method](**params)
|
||||
return JSONResponse(response_data, http_code)
|
||||
r: JSONResponse = self.methods[method](**params)
|
||||
resp = json.loads(r.body)
|
||||
resp['id'] = request_data.get('id', 123)
|
||||
return JSONResponse(resp, r.status_code)
|
||||
|
||||
response_data['error'] = create_error_response(JSONRPCErrorCode.METHOD_NOT_FOUND)
|
||||
response_data['error'] = rpcerr.create_error_response(rpcerr.JSONRPCErrorCode.METHOD_NOT_FOUND)
|
||||
self._ctx.Logs.debug(f'[RPC ERROR] {method} recieved from {rip}:{rport}')
|
||||
http_code = 404
|
||||
http_code = http_status_code.HTTP_404_NOT_FOUND
|
||||
return JSONResponse(response_data, http_code)
|
||||
|
||||
def authenticate(self, headers: dict, body: dict) -> JSONResponse:
|
||||
@@ -131,49 +134,6 @@ class JSonRpcServer:
|
||||
response_data = {
|
||||
'jsonrpc': '2.0',
|
||||
'id': request_data.get('id', 123),
|
||||
'error': create_error_response(JSONRPCErrorCode.AUTHENTICATION_ERROR)
|
||||
'error': rpcerr.create_error_response(rpcerr.JSONRPCErrorCode.AUTHENTICATION_ERROR)
|
||||
}
|
||||
return JSONResponse(response_data)
|
||||
|
||||
|
||||
class JSONRPCErrorCode(Enum):
|
||||
PARSE_ERROR = -32700 # Syntax error in the request (malformed JSON)
|
||||
INVALID_REQUEST = -32600 # Invalid Request (incorrect structure or missing fields)
|
||||
METHOD_NOT_FOUND = -32601 # Method not found (the requested method does not exist)
|
||||
INVALID_PARAMS = -32602 # Invalid Params (the parameters provided are incorrect)
|
||||
INTERNAL_ERROR = -32603 # Internal Error (an internal server error occurred)
|
||||
|
||||
# Custom application-specific errors (beyond standard JSON-RPC codes)
|
||||
CUSTOM_ERROR = 1001 # Custom application-defined error (e.g., user not found)
|
||||
AUTHENTICATION_ERROR = 1002 # Authentication failure (e.g., invalid credentials)
|
||||
PERMISSION_ERROR = 1003 # Permission error (e.g., user does not have access to this method)
|
||||
RESOURCE_NOT_FOUND = 1004 # Resource not found (e.g., the requested resource does not exist)
|
||||
DUPLICATE_REQUEST = 1005 # Duplicate request (e.g., a similar request has already been processed)
|
||||
|
||||
def description(self):
|
||||
"""Returns a description associated with each error code"""
|
||||
descriptions = {
|
||||
JSONRPCErrorCode.PARSE_ERROR: "The JSON request is malformed.",
|
||||
JSONRPCErrorCode.INVALID_REQUEST: "The request is invalid (missing or incorrect fields).",
|
||||
JSONRPCErrorCode.METHOD_NOT_FOUND: "The requested method could not be found.",
|
||||
JSONRPCErrorCode.INVALID_PARAMS: "The parameters provided are invalid.",
|
||||
JSONRPCErrorCode.INTERNAL_ERROR: "An internal error occurred on the server.",
|
||||
JSONRPCErrorCode.CUSTOM_ERROR: "A custom error defined by the application.",
|
||||
JSONRPCErrorCode.AUTHENTICATION_ERROR: "User authentication failed.",
|
||||
JSONRPCErrorCode.PERMISSION_ERROR: "User does not have permission to access this method.",
|
||||
JSONRPCErrorCode.RESOURCE_NOT_FOUND: "The requested resource could not be found.",
|
||||
JSONRPCErrorCode.DUPLICATE_REQUEST: "The request is a duplicate or is already being processed.",
|
||||
}
|
||||
return descriptions.get(self, "Unknown error")
|
||||
|
||||
def create_error_response(error_code: JSONRPCErrorCode, details: dict = None) -> dict[str, str]:
|
||||
"""Create a JSON-RPC error!"""
|
||||
response = {
|
||||
"code": error_code.value,
|
||||
"message": error_code.description(),
|
||||
}
|
||||
|
||||
if details:
|
||||
response["data"] = details
|
||||
|
||||
return response
|
||||
return JSONResponse(response_data, http_status_code.HTTP_403_FORBIDDEN)
|
||||
|
||||
@@ -1,12 +1,17 @@
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from starlette.responses import JSONResponse
|
||||
from core.classes.interfaces.irpc_endpoint import IRPC
|
||||
from core.classes.modules.rpc.rpc_errors import JSONRPCErrorCode
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.loader import Loader
|
||||
|
||||
class RPCChannel:
|
||||
class RPCChannel(IRPC):
|
||||
def __init__(self, loader: 'Loader'):
|
||||
self._Loader = loader
|
||||
self._Channel = loader.Channel
|
||||
super().__init__(loader)
|
||||
|
||||
def channel_list(self, **kwargs) -> list[dict]:
|
||||
return [chan.to_dict() for chan in self._Channel.UID_CHANNEL_DB]
|
||||
def channel_list(self, **kwargs) -> JSONResponse:
|
||||
self.reset()
|
||||
self.response_model['result'] = [chan.to_dict() for chan in self.ctx.Channel.UID_CHANNEL_DB]
|
||||
return JSONResponse(self.response_model)
|
||||
@@ -1,29 +1,44 @@
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
from typing import TYPE_CHECKING
|
||||
from starlette.responses import JSONResponse
|
||||
from core.classes.interfaces.irpc_endpoint import IRPC
|
||||
from core.classes.modules.rpc.rpc_errors import JSONRPCErrorCode
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.loader import Loader
|
||||
|
||||
class RPCCommand:
|
||||
class RPCCommand(IRPC):
|
||||
def __init__(self, loader: 'Loader'):
|
||||
self._Loader = loader
|
||||
self._Command = loader.Commands
|
||||
super().__init__(loader)
|
||||
|
||||
def command_list(self, **kwargs) -> list[dict]:
|
||||
return [command.to_dict() for command in self._Command.DB_COMMANDS]
|
||||
def command_list(self, **kwargs) -> JSONResponse:
|
||||
self.reset()
|
||||
self.response_model['result'] = [command.to_dict() for command in self.ctx.Commands.DB_COMMANDS]
|
||||
return JSONResponse(self.response_model)
|
||||
|
||||
def command_get_by_module(self, **kwargs) -> list[dict]:
|
||||
module_name = kwargs.get('module_name', None)
|
||||
if module_name is None:
|
||||
return []
|
||||
def command_get_by_module(self, **kwargs) -> JSONResponse:
|
||||
self.reset()
|
||||
module_name: str = kwargs.get('module_name', '')
|
||||
|
||||
return [command.to_dict() for command in self._Command.DB_COMMANDS if command.module_name.lower() == module_name.lower()]
|
||||
if not module_name:
|
||||
self.response_model['error'] = self.create_error_response(JSONRPCErrorCode.INVALID_PARAMS, {'module_name': 'The param to use is module_name'})
|
||||
return JSONResponse(self.response_model, self.http_status_code.HTTP_405_METHOD_NOT_ALLOWED)
|
||||
|
||||
self.response_model['result'] = [command.to_dict() for command in self.ctx.Commands.DB_COMMANDS if command.module_name.lower() == module_name.lower()]
|
||||
return JSONResponse(self.response_model)
|
||||
|
||||
def command_get_by_name(self, **kwargs) -> JSONResponse:
|
||||
self.reset()
|
||||
|
||||
def command_get_by_name(self, **kwargs) -> dict:
|
||||
command_name: str = kwargs.get('command_name', '')
|
||||
if not command_name:
|
||||
return dict()
|
||||
self.response_model['error'] = self.create_error_response(JSONRPCErrorCode.INVALID_PARAMS, {'command_name': f'The param to use is command_name'})
|
||||
return JSONResponse(self.response_model, self.http_status_code.HTTP_405_METHOD_NOT_ALLOWED)
|
||||
|
||||
for command in self._Command.DB_COMMANDS:
|
||||
command_to_return: list[dict] = []
|
||||
for command in self.ctx.Commands.DB_COMMANDS:
|
||||
if command.command_name.lower() == command_name.lower():
|
||||
return command.to_dict()
|
||||
return dict()
|
||||
command_to_return.append(command.to_dict())
|
||||
|
||||
self.response_model['result'] = command_to_return
|
||||
|
||||
return JSONResponse(self.response_model)
|
||||
43
core/classes/modules/rpc/rpc_errors.py
Normal file
43
core/classes/modules/rpc/rpc_errors.py
Normal file
@@ -0,0 +1,43 @@
|
||||
from enum import Enum
|
||||
|
||||
class JSONRPCErrorCode(Enum):
|
||||
PARSE_ERROR = -32700 # Syntax error in the request (malformed JSON)
|
||||
INVALID_REQUEST = -32600 # Invalid Request (incorrect structure or missing fields)
|
||||
METHOD_NOT_FOUND = -32601 # Method not found (the requested method does not exist)
|
||||
INVALID_PARAMS = -32602 # Invalid Params (the parameters provided are incorrect)
|
||||
INTERNAL_ERROR = -32603 # Internal Error (an internal server error occurred)
|
||||
|
||||
# Custom application-specific errors (beyond standard JSON-RPC codes)
|
||||
CUSTOM_ERROR = 1001 # Custom application-defined error (e.g., user not found)
|
||||
AUTHENTICATION_ERROR = 1002 # Authentication failure (e.g., invalid credentials)
|
||||
PERMISSION_ERROR = 1003 # Permission error (e.g., user does not have access to this method)
|
||||
RESOURCE_NOT_FOUND = 1004 # Resource not found (e.g., the requested resource does not exist)
|
||||
DUPLICATE_REQUEST = 1005 # Duplicate request (e.g., a similar request has already been processed)
|
||||
|
||||
def description(self):
|
||||
"""Returns a description associated with each error code"""
|
||||
descriptions = {
|
||||
JSONRPCErrorCode.PARSE_ERROR: "The JSON request is malformed.",
|
||||
JSONRPCErrorCode.INVALID_REQUEST: "The request is invalid (missing or incorrect fields).",
|
||||
JSONRPCErrorCode.METHOD_NOT_FOUND: "The requested method could not be found.",
|
||||
JSONRPCErrorCode.INVALID_PARAMS: "The parameters provided are invalid.",
|
||||
JSONRPCErrorCode.INTERNAL_ERROR: "An internal error occurred on the server.",
|
||||
JSONRPCErrorCode.CUSTOM_ERROR: "A custom error defined by the application.",
|
||||
JSONRPCErrorCode.AUTHENTICATION_ERROR: "User authentication failed.",
|
||||
JSONRPCErrorCode.PERMISSION_ERROR: "User does not have permission to access this method.",
|
||||
JSONRPCErrorCode.RESOURCE_NOT_FOUND: "The requested resource could not be found.",
|
||||
JSONRPCErrorCode.DUPLICATE_REQUEST: "The request is a duplicate or is already being processed.",
|
||||
}
|
||||
return descriptions.get(self, "Unknown error")
|
||||
|
||||
def create_error_response(error_code: JSONRPCErrorCode, details: dict = None) -> dict[str, str]:
|
||||
"""Create a JSON-RPC error!"""
|
||||
response = {
|
||||
"code": error_code.value,
|
||||
"message": error_code.description(),
|
||||
}
|
||||
|
||||
if details:
|
||||
response["data"] = details
|
||||
|
||||
return response
|
||||
@@ -1,15 +1,20 @@
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
|
||||
from starlette.responses import JSONResponse
|
||||
from core.classes.interfaces.irpc_endpoint import IRPC
|
||||
from core.classes.modules.rpc.rpc_errors import JSONRPCErrorCode
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.loader import Loader
|
||||
from core.definition import MUser
|
||||
|
||||
class RPCUser:
|
||||
class RPCUser(IRPC):
|
||||
def __init__(self, loader: 'Loader'):
|
||||
self._ctx = loader
|
||||
super().__init__(loader)
|
||||
|
||||
def user_list(self, **kwargs) -> list[dict]:
|
||||
users = self._ctx.User.UID_DB.copy()
|
||||
def user_list(self, **kwargs) -> JSONResponse:
|
||||
self.reset()
|
||||
users = self.ctx.User.UID_DB.copy()
|
||||
copy_users: list['MUser'] = []
|
||||
|
||||
for user in users:
|
||||
@@ -17,14 +22,24 @@ class RPCUser:
|
||||
copy_user.connexion_datetime = copy_user.connexion_datetime.strftime('%d-%m-%Y')
|
||||
copy_users.append(copy_user)
|
||||
|
||||
return [user.to_dict() for user in copy_users]
|
||||
self.response_model['result'] = [user.to_dict() for user in copy_users]
|
||||
|
||||
def user_get(self, **kwargs) -> Optional[dict]:
|
||||
uidornickname = kwargs.get('uid_or_nickname', None)
|
||||
user = self._ctx.User.get_user(uidornickname)
|
||||
return JSONResponse(self.response_model)
|
||||
|
||||
def user_get(self, **kwargs) -> JSONResponse:
|
||||
self.reset()
|
||||
uidornickname = kwargs.get('uid_or_nickname', '')
|
||||
|
||||
if not uidornickname:
|
||||
self.response_model['error'] = self.create_error_response(JSONRPCErrorCode.INVALID_PARAMS, {'uid_or_nickname': 'The param to use is uid_or_nickname'})
|
||||
return JSONResponse(self.response_model, self.http_status_code.HTTP_405_METHOD_NOT_ALLOWED)
|
||||
|
||||
user = self.ctx.User.get_user(uidornickname)
|
||||
if user:
|
||||
user_copy = user.copy()
|
||||
user_copy.connexion_datetime = user_copy.connexion_datetime.strftime('%d-%m-%Y')
|
||||
return user_copy.to_dict()
|
||||
self.response_model['result'] = user_copy.to_dict()
|
||||
return JSONResponse(self.response_model)
|
||||
|
||||
return None
|
||||
self.response_model['result'] = 'User not found!'
|
||||
return JSONResponse(self.response_model, self.http_status_code.HTTP_204_NO_CONTENT)
|
||||
@@ -2,19 +2,21 @@ import asyncio
|
||||
from core import install
|
||||
|
||||
#############################################
|
||||
# @Version : 6.3 #
|
||||
# @Version : 6.4 #
|
||||
# Requierements : #
|
||||
# Python3.10 or higher #
|
||||
# SQLAlchemy, requests, psutil #
|
||||
# unrealircd-rpc-py, pyyaml #
|
||||
# uvicorn, starlette, faker #
|
||||
# UnrealIRCD 6.2.2 or higher #
|
||||
#############################################
|
||||
|
||||
async def main():
|
||||
install.update_packages()
|
||||
from core.loader import Loader
|
||||
loader = Loader()
|
||||
await loader.start()
|
||||
await loader.Irc.run()
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main(), debug=True)
|
||||
asyncio.run(main())
|
||||
|
||||
Reference in New Issue
Block a user