From 8cac3316fb409cdfe77e1373353423647972db02 Mon Sep 17 00:00:00 2001 From: adator <85586985+adator85@users.noreply.github.com> Date: Mon, 10 Mar 2025 20:47:53 +0100 Subject: [PATCH] V6.1.5 i forgot --- core/classes/admin.py | 21 +- core/classes/channel.py | 1 + core/classes/client.py | 2 +- core/classes/protocols/unreal6.py | 92 ++++++-- core/classes/settings.py | 2 + core/definition.py | 14 +- core/irc.py | 156 +------------ mods/mod_clone.py | 2 +- mods/mod_command.py | 17 +- mods/mod_defender.py | 2 +- mods/mod_jsonrpc.py | 2 +- mods/mod_nickserv.py | 367 ++++++++++++++++++++++++++++++ mods/mod_test.py | 2 +- mods/mod_votekick.py | 2 +- mods/mod_weather.py | 258 +++++++++++++++++++++ version.json | 2 +- 16 files changed, 755 insertions(+), 187 deletions(-) create mode 100644 mods/mod_nickserv.py create mode 100644 mods/mod_weather.py diff --git a/core/classes/admin.py b/core/classes/admin.py index 3f4e61c..e2e3269 100644 --- a/core/classes/admin.py +++ b/core/classes/admin.py @@ -37,6 +37,9 @@ class Admin: result = False + if not self.is_exist(uid): + return result + for record in self.UID_ADMIN_DB: if record.uid == uid: # If the admin exist, update and do not go further @@ -46,7 +49,7 @@ class Admin: return result if not result: - self.Logs.critical(f'The new nickname {newNickname} was not updated, uid = {uid}') + self.Logs.critical(f'Admin: The new nickname {newNickname} was not updated, uid = {uid}') return result @@ -124,4 +127,18 @@ class Admin: if record.uid == uidornickname: nickname = record.nickname self.Logs.debug(f'The value {uidornickname} -- {nickname}') - return nickname \ No newline at end of file + return nickname + + def is_exist(self, uidornickname: str) -> bool: + """Check if this uid or nickname is logged in as an admin + + Args: + uidornickname (str): The UID or the Nickname + + Returns: + bool: True if the Nickname or UID is an admin + """ + if self.get_Admin(uidornickname) is None: + return False + else: + return True \ No newline at end of file diff --git a/core/classes/channel.py b/core/classes/channel.py index 608b051..70c1493 100644 --- a/core/classes/channel.py +++ b/core/classes/channel.py @@ -90,6 +90,7 @@ class Channel: if self.Base.clean_uid(userid) == self.Base.clean_uid(uid): chanObj.uids.remove(userid) result = True + self.Logs.debug(f"The UID: {userid} has been removed from the {channel_name}") self.clean_channel() diff --git a/core/classes/client.py b/core/classes/client.py index fd54ff0..3be5374 100644 --- a/core/classes/client.py +++ b/core/classes/client.py @@ -219,7 +219,7 @@ class Client: account_to_check_result = account_to_check_query.fetchone() if account_to_check_result: - self.Logs.error(f"Account ({account}) already exist") + self.Logs.debug(f"Account ({account}) already exist") return True return False diff --git a/core/classes/protocols/unreal6.py b/core/classes/protocols/unreal6.py index 8cc16bf..37a70dc 100644 --- a/core/classes/protocols/unreal6.py +++ b/core/classes/protocols/unreal6.py @@ -245,6 +245,33 @@ class Unrealircd6: self.__Irc.Channel.insert(self.__Irc.Loader.Definition.MChannel(name=channel, uids=[self.__Config.SERVICE_ID])) return None + def send_svs_nick(self, oldnickname: str, newnickname: str) -> None: + + unixtime = self.__Base.get_unixtime() + self.send2socket(f':{self.__Config.SERVEUR_ID} SVSNICK {oldnickname} {newnickname} {unixtime}') + + user_obj = self.__Irc.User.get_User(oldnickname) + self.__Irc.User.update_nickname(user_obj.uid, newnickname) + + return None + + def send_sjoin(self, uid_to_join: str, channel: str) -> None: + """UID will join a channel with pre defined umodes + + Args: + channel (str): Channel to join + """ + if not self.__Irc.Channel.Is_Channel(channel): + self.__Base.logs.error(f"The channel [{channel}] is not valid") + return None + + self.send2socket(f":{self.__Config.SERVEUR_ID} SJOIN {self.__Base.get_unixtime()} {channel} {self.__Config.SERVICE_UMODES} :{uid_to_join}") + self.send2socket(f":{self.__Config.SERVICE_ID} MODE {channel} {self.__Config.SERVICE_UMODES} {uid_to_join}") + + # Add defender to the channel uids list + self.__Irc.Channel.insert(self.__Irc.Loader.Definition.MChannel(name=channel, uids=[uid_to_join])) + return None + def send_sapart(self, nick_to_sapart: str, channel_name: str) -> None: """_summary_ @@ -326,7 +353,26 @@ class Unrealircd6: except Exception as err: self.__Base.logs.error(f"{__name__} - General Error: {err}") - def send_quit(self, uid: str, reason: str, print_log: True) -> None: + def send_svs2mode(self, service_uid: str, nickname: str, user_mode: str) -> None: + try: + + user_obj = self.__Irc.User.get_User(uidornickname=nickname) + service_uid = service_uid + + if user_obj is None: + # User not exist: leave + return None + + self.send2socket(f':{service_uid} SVS2MODE {nickname} {user_mode}') + + # Update new mode + self.__Irc.User.update_mode(user_obj.uid, user_mode) + + return None + except Exception as err: + self.__Base.logs.error(f"{__name__} - General Error: {err}") + + def send_quit(self, uid: str, reason: str, print_log: bool = True) -> None: """Send quit message - Delete uid from User object - Delete uid from Clone object @@ -469,6 +515,22 @@ class Unrealircd6: self.send2socket(f":{self.__Config.SERVICE_NICKNAME} MODE {channel_name} {channel_mode}") return None + def send_topic_chan(self, channel_name: str, topic_msg: str) -> None: + """Set a channel topic + + Args: + channel_name (str): Channel name starting with # + topic_msg (str): The message of the topic + """ + + if self.__Irc.Channel.Is_Channel(channel_name) is None: + self.__Base.logs.error(f"The channel {channel_name} is not valid") + return None + + self.send2socket(f":{self.__Config.SERVICE_NICKNAME} TOPIC {channel_name} :{topic_msg}") + + return None + def send_raw(self, raw_command: str) -> None: self.send2socket(f":{self.__Config.SERVICE_NICKNAME} {raw_command}") @@ -715,7 +777,7 @@ class Unrealircd6: # ['@unrealircd.org/geoip=FR;unrealircd.org/userhost=50d6492c@80.214.73.44;unrealircd.org/userip=50d6492c@80.214.73.44;msgid=YSIPB9q4PcRu0EVfC9ci7y-/mZT0+Gj5FLiDSZshH5NCw;time=2024-08-15T15:35:53.772Z', # ':001EPFBRD', 'PART', '#welcome', ':WEB', 'IRC', 'Paris'] - uid = str(serverMsg[1]).lstrip(':') + uid = str(serverMsg[1]).replace(':', '') channel = str(serverMsg[3]).lower() self.__Irc.Channel.delete_user_from_channel(channel, uid) @@ -839,29 +901,23 @@ class Unrealircd6: isWebirc = True if 'webirc' in serverMsg[0] else False isWebsocket = True if 'websocket' in serverMsg[0] else False - uid = str(serverMsg[8]) nickname = str(serverMsg[3]) + hopcount = str(serverMsg[4]) + timestamp = str(serverMsg[5]) username = str(serverMsg[6]) hostname = str(serverMsg[7]) + uid = str(serverMsg[8]) + servicestamp = str(serverMsg[9]) umodes = str(serverMsg[10]) vhost = str(serverMsg[11]) - - if not 'S' in umodes: - remote_ip = self.__Base.decode_ip(str(serverMsg[13])) - else: - remote_ip = '127.0.0.1' - - # extract realname + cloacked_host = str(serverMsg[12]) + remote_ip = self.__Base.decode_ip(str(serverMsg[13])) if 'S' in umodes else '127.0.0.1' realname = ' '.join(serverMsg[14:]).lstrip(':') # Extract Geoip information pattern = r'^.*geoip=cc=(\S{2}).*$' geoip_match = match(pattern, serverMsg[0]) - - if geoip_match: - geoip = geoip_match.group(1) - else: - geoip = None + geoip = geoip_match.group(1) if geoip_match else None score_connexion = self.__Irc.first_score @@ -874,6 +930,10 @@ class Unrealircd6: hostname=hostname, umodes=umodes, vhost=vhost, + hopcount=hopcount, + timestamp=timestamp, + servicestamp=servicestamp, + cloacked_host=cloacked_host, isWebirc=isWebirc, isWebsocket=isWebsocket, remote_ip=remote_ip, @@ -940,7 +1000,7 @@ class Unrealircd6: fromchannel = str(cmd[2]).lower() if self.__Irc.Channel.Is_Channel(cmd[2]) else None self.__Irc.hcmds(user_trigger, fromchannel, arg, cmd) - if cmd[2] == self.__Config.SERVICE_ID: + if cmd[2] == self.__Config.SERVICE_ID or cmd[2] == self.__Settings.NICKSERV_UID: pattern = fr'^:.*?:(.*)$' hcmds = search(pattern, ' '.join(cmd)) diff --git a/core/classes/settings.py b/core/classes/settings.py index 5132a45..af9ad58 100644 --- a/core/classes/settings.py +++ b/core/classes/settings.py @@ -13,3 +13,5 @@ class Settings: PROTOCTL_USER_MODES: list[str] = [] PROTOCTL_PREFIX: list[str] = [] + + NICKSERV_UID: str = None diff --git a/core/definition.py b/core/definition.py index fea388e..d25d198 100644 --- a/core/definition.py +++ b/core/definition.py @@ -6,14 +6,18 @@ from os import sep @dataclass class MClient: """Model Client for registred nickname""" - uid: str = None account: str = None + uid: str = None nickname: str = None username: str = None realname: str = None hostname: str = None umodes: str = None vhost: str = None + hopcount: str = None + timestamp: str = None + servicestamp: str = None + cloacked_host: str = None isWebirc: bool = False isWebsocket: bool = False remote_ip: str = None @@ -32,6 +36,10 @@ class MUser: hostname: str = None umodes: str = None vhost: str = None + hopcount: str = None + timestamp: str = None + servicestamp: str = None + cloacked_host: str = None isWebirc: bool = False isWebsocket: bool = False remote_ip: str = None @@ -50,6 +58,10 @@ class MAdmin: hostname: str = None umodes: str = None vhost: str = None + hopcount: str = None + timestamp: str = None + servicestamp: str = None + cloacked_host: str = None isWebirc: bool = False isWebsocket: bool = False remote_ip: str = None diff --git a/core/irc.py b/core/irc.py index f2ef8ff..01d0842 100644 --- a/core/irc.py +++ b/core/irc.py @@ -97,7 +97,7 @@ class Irc: self.build_command(0, 'core', 'uptime', 'Give you since when the service is connected') self.build_command(0, 'core', 'firstauth', 'First authentication of the Service') self.build_command(0, 'core', 'register', f'Register your nickname /msg {self.Config.SERVICE_NICKNAME} REGISTER ') - self.build_command(0, 'core', 'identify', f'Identify yourself with your password /msg {self.Config.SERVICE_NICKNAME} IDENTIFY ') + # self.build_command(0, 'core', 'identify', f'Identify yourself with your password /msg {self.Config.SERVICE_NICKNAME} IDENTIFY ') self.build_command(0, 'core', 'logout', 'Reverse the effect of the identify command') self.build_command(1, 'core', 'load', 'Load an existing module') self.build_command(1, 'core', 'unload', 'Unload a module') @@ -321,7 +321,7 @@ class Irc: except AssertionError as ae: self.Logs.error(f"Assertion error : {ae}") - def unload(self) -> None: + def unload(self, reloading: bool = False) -> None: # This is only to reference the method return None @@ -616,7 +616,7 @@ class Irc: if 'mods.' + module_name in sys.modules: self.Logs.info('Unload the module ...') - self.loaded_classes[class_name].unload() + self.loaded_classes[class_name].unload(reloading=True) self.Logs.info('Module Already Loaded ... reloading the module ...') the_module = sys.modules['mods.' + module_name] importlib.reload(the_module) @@ -854,7 +854,7 @@ class Irc: self.Logs.debug(f"** handle {parsed_protocol}") case 'PART': - self.Protocol.on_part(serverMsg=parsed_protocol) + self.Protocol.on_part(serverMsg=original_response) self.Logs.debug(f"** handle {parsed_protocol}") case 'VERSION': @@ -1233,154 +1233,6 @@ class Irc: self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Impossible de supprimer l'utilisateur.") self.Logs.warning(f":{dnickname} NOTICE {fromuser} : Impossible de supprimer l'utilisateur.") - case 'register': - # Register PASSWORD EMAIL - try: - - if len(cmd) < 3: - self.Protocol.send_notice( - nick_from=dnickname, - nick_to=fromuser, - msg=f'/msg {self.Config.SERVICE_NICKNAME} {command.upper()} ' - ) - return None - - password = cmd[1] - email = cmd[2] - - if not self.Base.is_valid_email(email_to_control=email): - self.Protocol.send_notice( - nick_from=dnickname, - nick_to=fromuser, - msg='The email is not valid. You must provide a valid email address (first.name@email.extension)' - ) - return None - - user_obj = self.User.get_User(fromuser) - - if user_obj is None: - self.Logs.error(f"Nickname ({fromuser}) doesn't exist, it is impossible to register this nickname") - return None - - # If the account already exist. - if self.Client.db_is_account_exist(fromuser): - self.Protocol.send_notice( - nick_from=dnickname, - nick_to=fromuser, - msg=f"Your account already exist, please try to login instead /msg {self.Config.SERVICE_NICKNAME} IDENTIFY " - ) - return None - - # If the account doesn't exist then insert into database - data_to_record = { - 'createdOn': self.Base.get_datetime(), 'account': fromuser, - 'nickname': user_obj.nickname, 'hostname': user_obj.hostname, 'vhost': user_obj.vhost, 'realname': user_obj.realname, 'email': email, - 'password': self.Base.crypt_password(password=password), 'level': 0 - } - - insert_to_db = self.Base.db_execute_query(f""" - INSERT INTO {self.Config.TABLE_CLIENT} - (createdOn, account, nickname, hostname, vhost, realname, email, password, level) - VALUES - (:createdOn, :account, :nickname, :hostname, :vhost, :realname, :email, :password, :level) - """, data_to_record) - - if insert_to_db.rowcount > 0: - self.Protocol.send_notice( - nick_from=dnickname, - nick_to=fromuser, - msg=f"You have register your nickname successfully" - ) - - return None - - except ValueError as ve: - self.Logs.error(f"Value Error : {ve}") - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" {self.Config.SERVICE_PREFIX}{command.upper()} ") - - case 'identify': - # Identify ACCOUNT PASSWORD - try: - if len(cmd) < 3: - self.Protocol.send_notice( - nick_from=dnickname, - nick_to=fromuser, - msg=f'/msg {self.Config.SERVICE_NICKNAME} {command.upper()} ' - ) - return None - - account = str(cmd[1]) # account - encrypted_password = self.Base.crypt_password(cmd[2]) - user_obj = self.User.get_User(fromuser) - client_obj = self.Client.get_Client(user_obj.uid) - - if client_obj is not None: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You are already logged in") - return None - - db_query = f"SELECT account FROM {self.Config.TABLE_CLIENT} WHERE account = :account AND password = :password" - db_param = {'account': account, 'password': encrypted_password} - exec_query = self.Base.db_execute_query( - db_query, - db_param - ) - result_query = exec_query.fetchone() - if result_query: - account = result_query[0] - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You are now logged in") - client = self.Loader.Definition.MClient( - uid=user_obj.uid, account=account, nickname=fromuser, - username=user_obj.username, realname=user_obj.realname, hostname=user_obj.hostname, umodes=user_obj.umodes, vhost=user_obj.vhost, - isWebirc=user_obj.isWebirc, isWebsocket=user_obj.isWebsocket, remote_ip=user_obj.remote_ip, score_connexion=user_obj.score_connexion, - geoip=user_obj.geoip, connexion_datetime=user_obj.connexion_datetime - ) - self.Client.insert(client) - self.Protocol.send_svs_mode(nickname=fromuser, user_mode='+r') - else: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"Wrong password or account") - - return None - - except ValueError as ve: - self.Logs.error(f"Value Error: {ve}") - self.Protocol.send_notice( - nick_from=dnickname, - nick_to=fromuser, - msg=f'/msg {self.Config.SERVICE_NICKNAME} {command.upper()} ' - ) - - except Exception as err: - self.Logs.error(f"General Error: {err}") - - case 'logout': - try: - # LOGOUT - if len(cmd) < 2: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} ") - return None - - user_obj = self.User.get_User(fromuser) - if user_obj is None: - self.Logs.error(f"The User [{fromuser}] is not available in the database") - return None - - client_obj = self.Client.get_Client(user_obj.uid) - - if client_obj is None: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="Nothing to logout. please login first") - return None - - self.Protocol.send_svs_mode(nickname=fromuser, user_mode='-r') - self.Client.delete(user_obj.uid) - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"You have been logged out successfully") - - except ValueError as ve: - self.Logs.error(f"Value Error: {ve}") - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} ") - except Exception as err: - self.Logs.error(f"General Error: {err}") - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} ") - case 'help': self.generate_help_menu(nickname=fromuser) diff --git a/mods/mod_clone.py b/mods/mod_clone.py index 0231255..6a1355e 100644 --- a/mods/mod_clone.py +++ b/mods/mod_clone.py @@ -111,7 +111,7 @@ class Clone(): except TypeError as te: self.Logs.critical(te) - def unload(self) -> None: + def unload(self, reloading: bool = False) -> None: """Cette methode sera executée a chaque désactivation ou rechargement de module """ diff --git a/mods/mod_command.py b/mods/mod_command.py index f3b8874..72c3ca8 100644 --- a/mods/mod_command.py +++ b/mods/mod_command.py @@ -164,15 +164,14 @@ class Command: """ self.Base.db_update_core_config(self.module_name, self.ModConfig, param_key, param_value) - def unload(self) -> None: + def unload(self, reloading: bool = False) -> None: return None def cmd(self, data: list[str]) -> None: try: - # service_id = self.Config.SERVICE_ID + dnickname = self.Config.SERVICE_NICKNAME - # dchanlog = self.Config.SERVICE_CHANLOG red = self.Config.COLORS.red green = self.Config.COLORS.green bold = self.Config.COLORS.bold @@ -303,7 +302,7 @@ class Command: service_id = self.Config.SERVICE_ID dchanlog = self.Config.SERVICE_CHANLOG self.user_to_notice = uidornickname - fromuser = uidornickname + fromuser = self.User.get_nickname(uidornickname) fromchannel = channel_name match command: @@ -853,9 +852,9 @@ class Command: case 'topic': try: - if len(cmd) == 1: - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} TOPIC #channel THE_TOPIC_MESSAGE") - self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f" /msg {dnickname} TOPIC #channel THE_TOPIC_MESSAGE") + # TOPIC <#CHANNEL> + if len(cmd) < 2: + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} {command.upper()} <#channel> THE_TOPIC_MESSAGE") return None chan = str(cmd[1]) @@ -864,10 +863,10 @@ class Command: self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {dnickname} TOPIC #channel THE_TOPIC_MESSAGE") return None - topic_msg = ' '.join(cmd[2:]).strip() + topic_msg: str = ' '.join(cmd[2:]).strip() if topic_msg: - self.Protocol.send2socket(f':{dnickname} TOPIC {chan} :{topic_msg}') + self.Protocol.send_topic_chan(channel_name=chan, topic_msg=topic_msg) else: self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg="You need to specify the topic") diff --git a/mods/mod_defender.py b/mods/mod_defender.py index e0efaed..c7065c0 100644 --- a/mods/mod_defender.py +++ b/mods/mod_defender.py @@ -200,7 +200,7 @@ class Defender(): self.Base.db_update_core_config(self.module_name, self.ModConfig, param_key, param_value) - def unload(self) -> None: + def unload(self, reloading: bool = False) -> None: """Cette methode sera executée a chaque désactivation ou rechargement de module """ diff --git a/mods/mod_jsonrpc.py b/mods/mod_jsonrpc.py index 8e19c18..c6e2584 100644 --- a/mods/mod_jsonrpc.py +++ b/mods/mod_jsonrpc.py @@ -182,7 +182,7 @@ class Jsonrpc(): """ self.Base.db_update_core_config(self.module_name, self.ModConfig, param_key, param_value) - def unload(self) -> None: + def unload(self, reloading: bool = False) -> None: if self.UnrealIrcdRpcLive.Error.code != -1: self.UnrealIrcdRpcLive.unsubscribe() return None diff --git a/mods/mod_nickserv.py b/mods/mod_nickserv.py new file mode 100644 index 0000000..bdc34d7 --- /dev/null +++ b/mods/mod_nickserv.py @@ -0,0 +1,367 @@ +import socket +import json +import time +import re +import psutil +import requests +from dataclasses import dataclass +from datetime import datetime +from typing import Union, TYPE_CHECKING +from core.classes import user +import core.definition as df + +if TYPE_CHECKING: + from core.irc import Irc + + +class Nickserv(): + + @dataclass + class ModConfModel: + active: bool = False + + def __init__(self, ircInstance: 'Irc') -> None: + + # Module name (Mandatory) + self.module_name = 'mod_' + str(self.__class__.__name__).lower() + + # Add Irc Object to the module (Mandatory) + self.Irc = ircInstance + + # Add Loader Object to the module (Mandatory) + self.Loader = ircInstance.Loader + + # Add server protocol Object to the module (Mandatory) + self.Protocol = ircInstance.Protocol + + # Add Global Configuration to the module (Mandatory) + self.Config = ircInstance.Config + + # Add Base object to the module (Mandatory) + self.Base = ircInstance.Base + + # Add logs object to the module (Mandatory) + self.Logs = ircInstance.Base.logs + + # Add User object to the module (Mandatory) + self.User = ircInstance.User + + # Add Client object to the module (Mandatory) + self.Client = ircInstance.Client + + # Add Channel object to the module (Mandatory) + self.Channel = ircInstance.Channel + + # Add Reputation object to the module (Optional) + self.Reputation = ircInstance.Reputation + + # Check if NickServ already exist + # for user_obj in self.User.UID_DB: + # if user_obj.nickname.lower() == 'nickserv': + # self.Logs.warning(f"The NickServ service already exist, impossible to load 2 NickServ services") + # return None + + # Create module commands (Mandatory) + self.Irc.build_command(0, self.module_name, 'register', f'Register your nickname /msg NickServ REGISTER ') + self.Irc.build_command(0, self.module_name, 'identify', f'Identify yourself with your password /msg NickServ IDENTIFY ') + self.Irc.build_command(0, self.module_name, 'logout', 'Reverse the effect of the identify command') + + # Init the module (Mandatory) + self.__init_module() + + # Log the module + self.Logs.debug(f'-- Module {self.module_name} loaded ...') + + def __init_module(self) -> None: + + # Create you own tables if needed (Mandatory) + self.__create_tables() + + # Load module configuration (Mandatory) + self.__load_module_configuration() + # End of mandatory methods you can start your customization # + + self.build_nickserv_information() + + return None + + def __create_tables(self) -> None: + """Methode qui va créer la base de donnée si elle n'existe pas. + Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module + Args: + database_name (str): Nom de la base de données ( pas d'espace dans le nom ) + + Returns: + None: Aucun retour n'es attendu + """ + + # table_autoop = '''CREATE TABLE IF NOT EXISTS defender_autoop ( + # id INTEGER PRIMARY KEY AUTOINCREMENT, + # datetime TEXT, + # nickname TEXT, + # channel TEXT + # ) + # ''' + + # self.Base.db_execute_query(table_autoop) + # self.Base.db_execute_query(table_config) + # self.Base.db_execute_query(table_trusted) + return None + + def __load_module_configuration(self) -> None: + """### Load Module Configuration + """ + try: + # Variable qui va contenir les options de configuration du module Defender + self.ModConfig = self.ModConfModel(active=True) + + # Sync the configuration with core configuration (Mandatory) + self.Base.db_sync_core_config(self.module_name, self.ModConfig) + + return None + + except TypeError as te: + self.Logs.critical(te) + + def __update_configuration(self, param_key: str, param_value: str): + + self.Base.db_update_core_config(self.module_name, self.ModConfig, param_key, param_value) + + def build_nickserv_information(self) -> None: + + self.NICKSERV_NICKNAME = 'NickServ' + + if self.User.is_exist(self.NICKSERV_NICKNAME): + # If NickServ already exist just return None + self.NICKSERV_UID = self.User.get_uid(self.NICKSERV_NICKNAME) + self.Irc.Settings.NICKSERV_UID = self.NICKSERV_UID + return None + + self.NICKSERV_UID = self.Config.SERVEUR_ID + self.Base.get_random(6) + nickserv_uid = self.NICKSERV_UID + self.Irc.Settings.NICKSERV_UID = self.NICKSERV_UID + + self.Protocol.send_uid( + nickname=self.NICKSERV_NICKNAME, username='NickServ', hostname='nickserv.deb.biz.st', uid=nickserv_uid, umodes='+ioqBS', + vhost='nickserv.deb.biz.st', remote_ip='127.0.0.1', realname='Nick Service' + ) + + new_user_obj = self.Loader.Definition.MUser(uid=nickserv_uid, nickname=self.NICKSERV_NICKNAME, username='NickServ', realname='Nick Service', + hostname='nickserv.deb.biz.st', umodes='+ioqBS', vhost='nickserv.deb.biz.st', remote_ip='127.0.0.1') + + self.User.insert(new_user_obj) + self.Protocol.send_sjoin(nickserv_uid, self.Config.SERVICE_CHANLOG) + return None + + def timer_force_change_nickname(self, nickname: str) -> None: + + client_obj = self.Client.get_Client(nickname) + + if client_obj is None: + self.Protocol.send_svs_nick(nickname, self.Base.get_random(8)) + + return None + + def unload(self, reloading: bool = False) -> None: + """Cette methode sera executée a chaque désactivation ou + rechargement de module + """ + if reloading: + pass + else: + # NickServ will be disconnected + self.Protocol.send_quit(self.NICKSERV_UID, f'Stopping {self.module_name} module') + + del self.NICKSERV_NICKNAME + del self.NICKSERV_UID + + return None + + def cmd(self, data: list[str]) -> None: + try: + service_id = self.Config.SERVICE_ID # Defender serveur id + original_serv_response = list(data).copy() + + parsed_protocol = self.Protocol.parse_server_msg(data.copy()) + + match parsed_protocol: + + case 'UID': + try: + # ['@...', ':001', 'UID', 'adator', '0', '1733865747', '...', '192.168.1.10', '00118YU01', '0', '+iotwxz', 'netadmin.irc.dev.local', '745A32E5.421A4718.F33AB65A.IP', 'wKgBCg==', ':...'] + get_uid = str(original_serv_response[8]) + user_obj = self.User.get_User(get_uid) + self.Protocol.send_notice(nick_from=self.NICKSERV_NICKNAME, nick_to=user_obj.nickname, + msg='NickServ is here to protect your nickname' + ) + self.Protocol.send_notice(nick_from=self.NICKSERV_NICKNAME, nick_to=user_obj.nickname, + msg='To register a nickname tape /msg NickServ register password email' + ) + + if self.Client.db_is_account_exist(user_obj.nickname): + self.Protocol.send_notice(nick_from=self.NICKSERV_NICKNAME, nick_to=user_obj.nickname, msg='The nickname is protected please login to keep it') + self.Protocol.send_notice(nick_from=self.NICKSERV_NICKNAME, nick_to=user_obj.nickname, msg=f'/msg {self.NICKSERV_NICKNAME} IDENTIFY ') + self.Base.create_timer(60, self.timer_force_change_nickname, (user_obj.nickname, )) + + except IndexError as ie: + self.Logs.error(f'cmd reputation: index error: {ie}') + + case None: + self.Logs.debug(f"** TO BE HANDLE {original_serv_response} {__name__}") + + except KeyError as ke: + self.Logs.error(f"{ke} / {original_serv_response} / length {str(len(original_serv_response))}") + except IndexError as ie: + self.Logs.error(f"{ie} / {original_serv_response} / length {str(len(original_serv_response))}") + except Exception as err: + self.Logs.error(f"General Error: {err}") + + def hcmds(self, user: str, channel: str, cmd: list, fullcmd: list = []) -> None: + + command = str(cmd[0]).lower() + fromuser = user + fromchannel = channel if self.Channel.Is_Channel(channel) else None + dnickname = self.Config.SERVICE_NICKNAME # Defender nickname + dchanlog = self.Config.SERVICE_CHANLOG # Defender chan log + dumodes = self.Config.SERVICE_UMODES # Les modes de Defender + service_id = self.Config.SERVICE_ID # Defender serveur id + + match command: + + case 'register': + # Register PASSWORD EMAIL + try: + + if len(cmd) < 3: + self.Protocol.send_notice( + nick_from=self.NICKSERV_NICKNAME, + nick_to=fromuser, + msg=f'/msg {self.NICKSERV_NICKNAME} {command.upper()} ' + ) + return None + + password = cmd[1] + email = cmd[2] + + if not self.Base.is_valid_email(email_to_control=email): + self.Protocol.send_notice( + nick_from=self.NICKSERV_NICKNAME, + nick_to=fromuser, + msg='The email is not valid. You must provide a valid email address (first.name@email.extension)' + ) + return None + + user_obj = self.User.get_User(fromuser) + + if user_obj is None: + self.Logs.error(f"Nickname ({fromuser}) doesn't exist, it is impossible to register this nickname") + return None + + # If the account already exist. + if self.Client.db_is_account_exist(fromuser): + self.Protocol.send_notice( + nick_from=self.NICKSERV_NICKNAME, + nick_to=fromuser, + msg=f"Your account already exist, please try to login instead /msg {self.NICKSERV_NICKNAME} IDENTIFY " + ) + return None + + # If the account doesn't exist then insert into database + data_to_record = { + 'createdOn': self.Base.get_datetime(), 'account': fromuser, + 'nickname': user_obj.nickname, 'hostname': user_obj.hostname, 'vhost': user_obj.vhost, 'realname': user_obj.realname, 'email': email, + 'password': self.Base.crypt_password(password=password), 'level': 0 + } + + insert_to_db = self.Base.db_execute_query(f""" + INSERT INTO {self.Config.TABLE_CLIENT} + (createdOn, account, nickname, hostname, vhost, realname, email, password, level) + VALUES + (:createdOn, :account, :nickname, :hostname, :vhost, :realname, :email, :password, :level) + """, data_to_record) + + if insert_to_db.rowcount > 0: + self.Protocol.send_notice( + nick_from=self.NICKSERV_NICKNAME, + nick_to=fromuser, + msg=f"You have register your nickname successfully" + ) + + return None + + except ValueError as ve: + self.Logs.error(f"Value Error : {ve}") + self.Protocol.send_notice(nick_from=self.NICKSERV_NICKNAME, nick_to=fromuser, msg=f" {self.Config.SERVICE_PREFIX}{command.upper()} ") + + case 'identify': + try: + # IDENTIFY + if len(cmd) < 3: + self.Protocol.send_notice( + nick_from=dnickname, + nick_to=fromuser, + msg=f'/msg {self.NICKSERV_NICKNAME} {command.upper()} ' + ) + return None + + account = str(cmd[1]) # account + encrypted_password = self.Base.crypt_password(cmd[2]) + user_obj = self.User.get_User(fromuser) + client_obj = self.Client.get_Client(user_obj.uid) + + if client_obj is not None: + self.Protocol.send_notice(nick_from=self.NICKSERV_NICKNAME, nick_to=fromuser, msg=f"You are already logged in") + return None + + db_query = f"SELECT account FROM {self.Config.TABLE_CLIENT} WHERE account = :account AND password = :password" + db_param = {'account': account, 'password': encrypted_password} + exec_query = self.Base.db_execute_query( + db_query, + db_param + ) + result_query = exec_query.fetchone() + if result_query: + account = result_query[0] + self.Protocol.send_notice(nick_from=self.NICKSERV_NICKNAME, nick_to=fromuser, msg=f"You are now logged in") + user_obj_dict = self.User.get_User_AsDict(fromuser) + client = self.Loader.Definition.MClient( + account=account, + **user_obj_dict + ) + self.Client.insert(client) + self.Protocol.send_svs2mode(service_uid=self.NICKSERV_UID, nickname=fromuser, user_mode='+r') + else: + self.Protocol.send_notice(nick_from=self.NICKSERV_NICKNAME, nick_to=fromuser, msg=f"Wrong password or account") + except TypeError as te: + self.Logs.error(f"Type Error -> {te}") + except ValueError as ve: + self.Logs.error(f"Value Error -> {ve}") + + case 'logout': + try: + # LOGOUT + if len(cmd) < 2: + self.Protocol.send_notice(nick_from=self.NICKSERV_NICKNAME, nick_to=fromuser, msg=f"/msg {self.NICKSERV_NICKNAME} {command.upper()} ") + return None + + user_obj = self.User.get_User(fromuser) + if user_obj is None: + self.Logs.error(f"The User [{fromuser}] is not available in the database") + return None + + client_obj = self.Client.get_Client(user_obj.uid) + + if client_obj is None: + self.Protocol.send_notice(nick_from=self.NICKSERV_NICKNAME, nick_to=fromuser, msg="Nothing to logout. please login first") + return None + + self.Protocol.send_svs2mode(service_uid=self.NICKSERV_UID, nickname=fromuser, user_mode='-r') + self.Client.delete(user_obj.uid) + self.Protocol.send_notice(nick_from=self.NICKSERV_NICKNAME, nick_to=fromuser, msg=f"You have been logged out successfully") + + except ValueError as ve: + self.Logs.error(f"Value Error: {ve}") + self.Protocol.send_notice(nick_from=self.NICKSERV_NICKNAME, nick_to=fromuser, msg=f"/msg {self.NICKSERV_NICKNAME} {command.upper()} ") + except Exception as err: + self.Logs.error(f"General Error: {err}") + self.Protocol.send_notice(nick_from=self.NICKSERV_NICKNAME, nick_to=fromuser, msg=f"/msg {self.NICKSERV_NICKNAME} {command.upper()} ") diff --git a/mods/mod_test.py b/mods/mod_test.py index 275f845..9352a1b 100644 --- a/mods/mod_test.py +++ b/mods/mod_test.py @@ -113,7 +113,7 @@ class Test(): """ self.Base.db_update_core_config(self.module_name, self.ModConfig, param_key, param_value) - def unload(self) -> None: + def unload(self, reloading: bool = False) -> None: return None diff --git a/mods/mod_votekick.py b/mods/mod_votekick.py index 54b7d03..507d3ae 100644 --- a/mods/mod_votekick.py +++ b/mods/mod_votekick.py @@ -101,7 +101,7 @@ class Votekick(): self.Base.db_execute_query(table_vote) return None - def unload(self) -> None: + def unload(self, reloading: bool = False) -> None: try: for chan in self.VOTE_CHANNEL_DB: self.Protocol.send_part_chan(uidornickname=self.Config.SERVICE_ID, channel=chan.channel_name) diff --git a/mods/mod_weather.py b/mods/mod_weather.py new file mode 100644 index 0000000..cfbc464 --- /dev/null +++ b/mods/mod_weather.py @@ -0,0 +1,258 @@ +from urllib import request +import requests +from dataclasses import dataclass +from datetime import datetime +from typing import Union, TYPE_CHECKING +from core.classes import user +import core.definition as df + +if TYPE_CHECKING: + from core.irc import Irc + + +class Weather(): + + @dataclass + class ModConfModel: + active: bool = False + + def __init__(self, ircInstance: 'Irc') -> None: + + # Module name (Mandatory) + self.module_name = 'mod_' + str(self.__class__.__name__).lower() + + # Add Irc Object to the module (Mandatory) + self.Irc = ircInstance + + # Add Loader Object to the module (Mandatory) + self.Loader = ircInstance.Loader + + # Add server protocol Object to the module (Mandatory) + self.Protocol = ircInstance.Protocol + + # Add Global Configuration to the module (Mandatory) + self.Config = ircInstance.Config + + # Add Base object to the module (Mandatory) + self.Base = ircInstance.Base + + # Add logs object to the module (Mandatory) + self.Logs = ircInstance.Base.logs + + # Add User object to the module (Mandatory) + self.User = ircInstance.User + + # Add Client object to the module (Mandatory) + self.Client = ircInstance.Client + + # Add Channel object to the module (Mandatory) + self.Channel = ircInstance.Channel + + # Add Reputation object to the module (Optional) + self.Reputation = ircInstance.Reputation + + # Create module commands (Mandatory) + self.Irc.build_command(0, self.module_name, 'meteo', 'Get the meteo of a current city') + + # Init the module (Mandatory) + self.__init_module() + + # Log the module + self.Logs.debug(f'-- Module {self.module_name} loaded ...') + + def __init_module(self) -> None: + + # Create you own tables if needed (Mandatory) + self.__create_tables() + + # Load module configuration (Mandatory) + self.__load_module_configuration() + # End of mandatory methods you can start your customization # + + self.from_user = None + self.from_channel = None + + return None + + def __create_tables(self) -> None: + """Methode qui va créer la base de donnée si elle n'existe pas. + Une Session unique pour cette classe sera crée, qui sera utilisé dans cette classe / module + Args: + database_name (str): Nom de la base de données ( pas d'espace dans le nom ) + + Returns: + None: Aucun retour n'es attendu + """ + + # table_autoop = '''CREATE TABLE IF NOT EXISTS defender_autoop ( + # id INTEGER PRIMARY KEY AUTOINCREMENT, + # datetime TEXT, + # nickname TEXT, + # channel TEXT + # ) + # ''' + + # self.Base.db_execute_query(table_autoop) + # self.Base.db_execute_query(table_config) + # self.Base.db_execute_query(table_trusted) + return None + + def __load_module_configuration(self) -> None: + """### Load Module Configuration + """ + try: + # Variable qui va contenir les options de configuration du module Defender + self.ModConfig = self.ModConfModel(active=True) + + # Sync the configuration with core configuration (Mandatory) + self.Base.db_sync_core_config(self.module_name, self.ModConfig) + + return None + + except TypeError as te: + self.Logs.critical(te) + + def __update_configuration(self, param_key: str, param_value: str): + + self.Base.db_update_core_config(self.module_name, self.ModConfig, param_key, param_value) + + def get_geo_information(self, city: str) -> tuple[str, str, float, float]: + """_summary_ + + Args: + city (str): The city you want to get + + Returns: + tuple[str, str, float, float]: Country Code, City Name Latitude, Longitude + """ + api_key = 'fd36a3f3715c93f6770a13f5a34ae1e3' + geo_api_url = "http://api.openweathermap.org/geo/1.0/direct" + + response = requests.get( + url=geo_api_url, + params={'q': city, 'limit': 1, 'appid': api_key} + ) + + geo_data = response.json() + if not geo_data: + return (None, None, 0, 0) + + country_code = geo_data[0]['country'] + city_name = geo_data[0]['name'] + latitude: float = geo_data[0]['lat'] + longitude: float = geo_data[0]['lon'] + + return (country_code, city_name, latitude, longitude) + + def get_meteo_information(self, city: str, to_nickname: str) -> None: + + api_key = 'fd36a3f3715c93f6770a13f5a34ae1e3' + meteo_api_url = "https://api.openweathermap.org/data/2.5/weather" #?lat={lat}&lon={lon}&appid={API key} + + country_code, city_name, latitude, longitude = self.get_geo_information(city=city) + + response_meteo = requests.get( + url=meteo_api_url, + params={'lat': latitude, 'lon': longitude, 'appid': api_key, 'units': "metric"} + ) + + meteo_data = response_meteo.json() + + if not meteo_data or city_name is None: + self.Protocol.send_notice(nick_from=self.Config.SERVICE_NICKNAME, nick_to=to_nickname, + msg=f"Impossible to find the meteo for [{city}]") + return None + + temp_cur = meteo_data['main']['temp'] + temp_min = meteo_data['main']['temp_min'] + temp_max = meteo_data['main']['temp_max'] + + if self.from_channel is None: + self.Protocol.send_notice(nick_from=self.Config.SERVICE_NICKNAME, nick_to=to_nickname, + msg=f"Weather of {city_name} located in {country_code} is:") + self.Protocol.send_notice(nick_from=self.Config.SERVICE_NICKNAME, nick_to=to_nickname, + msg=f"Current temperature: {temp_cur} °C") + self.Protocol.send_notice(nick_from=self.Config.SERVICE_NICKNAME, nick_to=to_nickname, + msg=f"Minimum temperature: {temp_min} °C") + self.Protocol.send_notice(nick_from=self.Config.SERVICE_NICKNAME, nick_to=to_nickname, + msg=f"Maximum temperature: {temp_max} °C") + else: + self.Protocol.send_priv_msg(nick_from=self.Config.SERVICE_NICKNAME, channel=self.from_channel, + msg=f"Weather of {city_name} located in {country_code} is:") + self.Protocol.send_priv_msg(nick_from=self.Config.SERVICE_NICKNAME, channel=self.from_channel, + msg=f"Current temperature: {temp_cur} °C") + self.Protocol.send_priv_msg(nick_from=self.Config.SERVICE_NICKNAME, channel=self.from_channel, + msg=f"Minimum temperature: {temp_min} °C") + self.Protocol.send_priv_msg(nick_from=self.Config.SERVICE_NICKNAME, channel=self.from_channel, + msg=f"Maximum temperature: {temp_max} °C") + + return None + + def unload(self, reloading: bool = False) -> None: + """Cette methode sera executée a chaque désactivation ou + rechargement de module + """ + if reloading: + pass + else: + self.Protocol.send_quit(self.NICKSERV_UID, f'Stopping {self.module_name} module') + + return None + + def cmd(self, data: list[str]) -> None: + try: + service_id = self.Config.SERVICE_ID # Defender serveur id + original_serv_response = list(data).copy() + + parsed_protocol = self.Protocol.parse_server_msg(data.copy()) + + match parsed_protocol: + + case 'UID': + try: + pass + + except IndexError as ie: + self.Logs.error(f'cmd reputation: index error: {ie}') + + case None: + self.Logs.debug(f"** TO BE HANDLE {original_serv_response} {__name__}") + + except KeyError as ke: + self.Logs.error(f"{ke} / {original_serv_response} / length {str(len(original_serv_response))}") + except IndexError as ie: + self.Logs.error(f"{ie} / {original_serv_response} / length {str(len(original_serv_response))}") + except Exception as err: + self.Logs.error(f"General Error: {err}") + + def hcmds(self, user: str, channel: any, cmd: list, fullcmd: list = []) -> None: + + command = str(cmd[0]).lower() + fromuser = user + fromchannel = channel if self.Channel.Is_Channel(channel) else None + self.from_channel = fromchannel + self.from_user = fromuser + dnickname = self.Config.SERVICE_NICKNAME # Defender nickname + dchanlog = self.Config.SERVICE_CHANLOG # Defender chan log + dumodes = self.Config.SERVICE_UMODES # Les modes de Defender + service_id = self.Config.SERVICE_ID # Defender serveur id + + match command: + + case 'meteo': + try: + # meteo + if len(cmd) < 2: + self.Protocol.send_notice(nick_from=dnickname, nick_to=fromuser, msg=f"/msg {self.Config.SERVICE_NICKNAME} {command.upper()} METEO ") + return None + + city = str(' '.join(cmd[1:])) + to_nickname = fromuser + + self.Base.create_thread(self.get_meteo_information, (city, to_nickname)) + + except TypeError as te: + self.Logs.error(f"Type Error -> {te}") + except ValueError as ve: + self.Logs.error(f"Value Error -> {ve}") + diff --git a/version.json b/version.json index 4a95ad2..03d25cb 100644 --- a/version.json +++ b/version.json @@ -1,5 +1,5 @@ { - "version": "6.1.1", + "version": "6.1.5", "requests": "2.32.3", "psutil": "6.0.0",