From 4d7f74957488222d20e38e420cd6c11064dc12de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=F0=9D=93=99=F0=9D=93=BE=F0=9D=93=B5=F0=9D=93=B2?= =?UTF-8?q?=F0=9D=93=B8?= Date: Wed, 17 Dec 2025 04:22:44 -0500 Subject: [PATCH] Delete python/init.py --- python/init.py | 549 ------------------------------------------------- 1 file changed, 549 deletions(-) delete mode 100644 python/init.py diff --git a/python/init.py b/python/init.py deleted file mode 100644 index 5c31bd7..0000000 --- a/python/init.py +++ /dev/null @@ -1,549 +0,0 @@ -import asyncio -import base64 -import datetime -import json -import os -import shutil -import subprocess -import tempfile -import time -from contextlib import suppress -from json import JSONDecodeError -from pathlib import Path -from subprocess import TimeoutExpired -from typing import Optional - -from defence360agent.application.determine_hosting_panel import ( - is_cpanel_installed, -) -from defence360agent.contracts import sentry -from defence360agent.contracts.config import ( - ANTIVIRUS_MODE, - Core, - CustomBilling, - int_from_envvar, - logger, -) -from defence360agent.contracts.hook_events import HookEvent -from defence360agent.internals.global_scope import g -from defence360agent.utils import get_external_ip, retry_on, timed_cache -from defence360agent.utils.common import HOUR, rate_limit -from peewee import OperationalError - -AV_DEFAULT_ID = "IMUNIFYAV" -UNLIMITED_USERS_COUNT = 2147483647 -# no need to check the license file more often than -# once every 10 minutes, this should be enough to fix DEF-14677 -_CACHE_LICENSE_TOKEN_TIMEOUT = int_from_envvar( - "IMUNIFY360_CACHE_LICENSE_TOKEN_TIMEOUT", 10 * 60 # in seconds -) -# path to openssl binary used to check license signature -# we need to check several paths because of different OSes -# and different installation paths with fallback to system default -if not (OPENSSL_BIN := Path("/opt/alt/openssl11/bin/openssl")).exists(): - if not (OPENSSL_BIN := Path("/opt/alt/openssl/bin/openssl")).exists(): - OPENSSL_BIN = Path("/usr/bin/openssl") - -throttled_log_error = rate_limit(period=HOUR, on_drop=logger.warning)( - logger.error -) - - -class LicenseError(Exception): - """Used to communicate that some function requires a license""" - - -class LicenseCLN: - VERIFY_FIELDS_V1 = ( - "id", - "status", - "group", - "limit", - "token_created_utc", - "token_expire_utc", - ) - - VERIFY_FIELDS_V2 = ( - "id", - "status", - "limit", - "token_created_utc", - "token_expire_utc", - "group_id", - "permissions", - ) - - VERIFY_FIELDS_MAP = { - 1: VERIFY_FIELDS_V1, - 2: VERIFY_FIELDS_V2, - } - - _PUBKEY_FILE = "/usr/share/imunify360/cln-pub.key" - _ALTERNATIVE_PUBKEY_FILES = ( - # keys for self-signed licenses - "/usr/share/imunify360/alt-license-pub.key", - ) - _LICENSE_FILE = "/var/imunify360/license.json" - _FREE_LICENSE_FILE = "/var/imunify360/license-free.json" - AV_PLUS_BUY_URL = ( - "https://cln.cloudlinux.com/console/purchase/ImunifyAvPlus" - ) - IM360_BUY_URL_TEMPLATE = ( - "https://www.cloudlinux.com/upgrade-imunify-{user_count}/" - ) - VERSION_THRESHOLDS = [1, 30, 250] - - _token = {} - users_count = None - - @staticmethod - @retry_on(TimeoutExpired, max_tries=2) - def _verify_signature( - pubkey_path: str, content: bytes, signature: bytes - ) -> tuple[bool, Optional[list[str]]]: - """Verify that `content` is correctly signed with public key from file - `pubkey_path` with resulting `signature`. Returns a tuple with (success, error_list). - """ - errors: list[str] = [] - result = False - - with tempfile.NamedTemporaryFile(delete=True) as sig_file: - sig_file.write(signature) - sig_file.flush() - cmd = [ - OPENSSL_BIN, - "dgst", - "-sha512", - "-verify", - pubkey_path, - "-signature", - sig_file.name, - ] - try: - p = subprocess.run( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - input=content, - timeout=1, - ) - except FileNotFoundError as e: - errors.append(f"openssl command failed: missing {e.filename}") - else: - if p.returncode == 0: - result = True - else: - errors.append( - "Signature verification failed - " - f"openssl returned {p.returncode}. " - f"stdout: {p.stdout}, stderr: {p.stderr}" - ) - - return result, errors or None - - @classmethod - def _get_signature_input(cls, license, version: int = 1) -> bytes: - parts = [] - for key in cls.VERIFY_FIELDS_MAP[version]: - value = license[key] - if isinstance(value, dict): - parts.append( - "".join( - f"{subkey}={subvalue}" - for subkey, subvalue in value.items() - ) - ) - elif value is None: - parts.append("null") - else: - parts.append(str(value)) - return "".join(parts).encode() - - @classmethod - def _find_signature( - cls, license_token, signature_list: list[tuple[str, int]] - ) -> tuple[Optional[str], bool]: - """ - Verify signatures in license - - :return: signature, is_alternative, version - """ - sign: str - all_errors: list[str] = [] - - def verify_and_collect_errors(*args, **kwargs): - success, errors = cls._verify_signature(*args, **kwargs) - if errors: - all_errors.extend(errors) - return success - - for sign, version in signature_list: - signature = base64.b64decode(sign) - - try: - content = cls._get_signature_input( - license_token, version=version - ) - except KeyError: - continue - - if verify_and_collect_errors(cls._PUBKEY_FILE, content, signature): - return sign, False - - for alt_pubkey in cls._ALTERNATIVE_PUBKEY_FILES: - if verify_and_collect_errors(alt_pubkey, content, signature): - return sign, True - - for error in all_errors: - logger.warning("%s", error) - - return None, False - - @classmethod - def _load_token(cls, path): - """ - Load license token from file and verify signature - If signature verification successful, put - first valid signature to 'sign' field of license - token - - :return: license token - """ - default = {} # default value returned on error - try: - with open(path) as f: - license_token = json.load(f) - - if not isinstance(license_token, dict): - logger.error( - "Failed to load license. Expected JSON object, got %r" - % (license_token,) - ) - return default - - signature, is_alternative = cls._find_signature( - license_token, - [ - (sign, 1) - for sign in license_token.get("signatures", []) - ], - ) - if sign := license_token.get("signature_v2"): - _sign, _ = cls._find_signature(license_token, [(sign, 2)]) - if _sign is None: - throttled_log_error( - "Failed to verify license signature v2" - ) - license_token.pop("permissions", None) - - if signature is None: - throttled_log_error("Failed to verify license signature") - return default - - license_token["sign"] = signature - license_token["is_alternative"] = is_alternative - return license_token - - except FileNotFoundError: - # this is a common case - logger.info("Failed to load license: not registered?") - except (OSError, JSONDecodeError, KeyError, UnicodeDecodeError) as e: - # not loading broken license - logger.error("Failed to load license: %s", e) - return default - - @classmethod - @timed_cache( - datetime.timedelta(seconds=_CACHE_LICENSE_TOKEN_TIMEOUT), maxsize=1 - ) - def get_token(cls) -> dict: - """ - Return a simulated valid token if no real token is available. - """ - # Attempt to load the real token - lic_token = {} - license_files = ( - [cls._LICENSE_FILE, cls._FREE_LICENSE_FILE] - if ANTIVIRUS_MODE - else [cls._LICENSE_FILE] - ) - for lf in license_files: - lic_token = cls._load_token(lf) - if lic_token: - return lic_token - - # If loading fails, return a default valid token - return { - "status": "ok", - "id": "default-server-id", - "token_expire_utc": time.time() + 86400, # Expires in 1 day - "limit": UNLIMITED_USERS_COUNT, - "permissions": {"PROACTIVE_DEFENSE": "ENABLED"}, - } - - @classmethod - def get_server_id(cls) -> Optional[str]: - """ - :return: server id - """ - return cls.get_token().get("id") - - @classmethod - def is_registered(cls): - """ - :return: bool: if we have token - """ - return bool(cls.get_token()) - - @classmethod - def is_valid_av_plus(cls): - """ - :return: Return true only if we have valid ImunifyAV+ or - Imunify360 license - """ - return True - - @classmethod - def is_free(cls): - if not ANTIVIRUS_MODE: - return False - return cls.get_server_id() == AV_DEFAULT_ID - - @classmethod - def is_valid(cls, token=None): - """License check based on license token - - return True - if license token is valid for this server - return False - if license token is invalid - """ - token = token or cls.get_token() - if not token: - return True - - if ANTIVIRUS_MODE: - return ( - token.get("status", "").startswith("ok") - and token["token_expire_utc"] >= time.time() - ) - - return ( - token["status"] in ("ok", "ok-trial") - and token["token_expire_utc"] >= time.time() - and (cls.users_count is None or cls.users_count <= token["limit"]) - ) - - @classmethod - def has_permission(cls, permission: str, token=None): - """License check for a specific permission based on a license token - - return True - if license token has a given permission for this server - return False - if license token does not have permission - """ - token = token or cls.get_token() - if not token: - return True - - return ( - permission in (perm := token.get("permissions", [])) - and perm[permission] == "ENABLED" - ) - - @classmethod - def update(cls, token): - """ - Write new license token to file - :param token: new token - :return: - """ - - old_token = cls.get_token() - - temp_file = cls._LICENSE_FILE + ".tmp" - flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL - mode = 0o640 - - with suppress(FileNotFoundError): - os.unlink(temp_file) - with os.fdopen(os.open(temp_file, flags, mode), "w") as f: - json.dump(token, f) - - shutil.chown(temp_file, user="root", group="_imunify") - os.rename(temp_file, cls._LICENSE_FILE) - cls.get_token.cache_clear() - sentry.set_server_id(cls.get_server_id()) - sentry.set_product_name(cls.get_product_name()) - try: - cls.renew_hook(old_token, token) - except OperationalError: - pass - - @classmethod - def renew_hook(cls, old_token, token): - important_keys = ["license_expire_utc", "status", "limit", "id"] - exp_time = token.get("license_expire_utc") - license_type = cls.fill_license_type(token) - condition = any( - [token.get(elem) != old_token.get(elem) for elem in important_keys] - ) - - if condition: - license_updated = HookEvent.LicenseRenewed( - exp_time=exp_time, license=license_type - ) - from defence360agent.hooks.execute import execute_hooks - - asyncio.gather( - execute_hooks(license_updated), return_exceptions=True - ) - - @classmethod - def delete(cls): - """ - Delete license token along with old-style license data - :return: - """ - with suppress(FileNotFoundError): - os.unlink(cls._LICENSE_FILE) - cls.get_token.cache_clear() - sentry.set_server_id(None) - sentry.set_product_name(cls.get_product_name()) - - @classmethod - def fill_license_type(cls, token): - license_type = token.get("status") - license_type_to_product = { - "ok": "imunify360", - "ok-trial": "imunify360Trial", - "ok-av": "imunifyAV", - "ok-avp": "imunifyAVPlus", - } - return license_type_to_product.get(license_type) - - @classmethod - def license_info(cls): - token = cls.get_token() - key_360 = token.get("status") in ("ok", "ok-trial") - - message = token.get("message", None) - if ( - ANTIVIRUS_MODE - and CustomBilling.UPGRADE_URL - and not CustomBilling.NOTIFICATIONS - ): - message = None - if ANTIVIRUS_MODE and key_360 and not message: - # TODO: remove after auto-upgrade will be implemented - message = ( - "You've got a license for the advanced security product " - "Imunify360. Please, uninstall ImunifyAV and replace it with " - "the Imunify360 providing comprehensive security for your " - "server. Here are the steps for upgrade: " - "https://docs.imunify360.com/installation/" - ) - - if token: - info = { - "status": cls.is_valid(), - "expiration": token.get("license_expire_utc", 0), - "user_limit": token.get("limit"), - "id": token.get("id"), - "user_count": cls.users_count, - "message": message, - "license_type": cls.fill_license_type(token), - } - else: - info = {"status": False} - - if ANTIVIRUS_MODE: - ignored_messages = [ - "user limits", - ] - if info.get("message"): - for msg in ignored_messages: - if msg in info["message"]: - info["message"] = None - - # TODO: detect IP license for registered AV+ without custom billing - info["ip_license"] = CustomBilling.IP_LICENSE and ( - CustomBilling.UPGRADE_URL is not None - or CustomBilling.UPGRADE_URL_360 is not None - ) - info["upgrade_url"] = ( - CustomBilling.UPGRADE_URL - or token.get("upgrade_url") - or cls.AV_PLUS_BUY_URL - ) - info["upgrade_url_360"] = ( - CustomBilling.UPGRADE_URL_360 or upgrade_url_default() - ) - else: - info["redirect_url"] = token.get("upgrade_url", None) - if cls.is_demo(): # pragma: no cover - info["demo"] = True - - return info - - @classmethod - def get_product_name(cls) -> str: - if not ANTIVIRUS_MODE: - return Core.NAME - - license_status = cls.get_token().get("status", "") - - if license_status == "ok-av": - return "imunify.av" - elif license_status in ("ok-avp", "ok", "ok-trial"): - return "imunify.av+" - else: - logger.error("Unknown license %s", license_status) - return "Unknown license" - - @classmethod - def is_demo(cls) -> bool: - return os.path.isfile("/var/imunify360/demo") - - @classmethod - def is_unlimited(cls): - token = cls.get_token() - return token.get("limit", 0) >= UNLIMITED_USERS_COUNT - - @classmethod - def get_im360_buy_url(cls) -> str: - if cls.users_count is None: - return cls.IM360_BUY_URL_TEMPLATE.format(user_count=1) - for threshold in cls.VERSION_THRESHOLDS: - if cls.users_count <= threshold: - return cls.IM360_BUY_URL_TEMPLATE.format(user_count=threshold) - return cls.IM360_BUY_URL_TEMPLATE.format(user_count="unlimited") - - -def upgrade_url_default(): - n = LicenseCLN.users_count - - if ( - # apply custom direct store links on cPanel - is_cpanel_installed() - # where upgrade URL is not set or set to the old value - and CustomBilling.UPGRADE_URL - in ("", "../../../scripts14/purchase_imunifyavplus_init_IMUNIFY") - ): - base_url = ( - f"https://store.cpanel.net/index.php?rp=/store/partner-addons/" - ) - - server_ip = get_external_ip() - - if n == 1: - suffix = ( - f"imunify360-for-cpanel-solo&customfield%5B55%5D={server_ip}" - ) - else: - suffix = f"imunify360&customfield%5B375%5D={server_ip}" - return base_url + suffix - - iaid = g.get("iaid", "") - return ( - LicenseCLN.get_im360_buy_url() - + f"?iaid={iaid}" - + f"&users={n}" * bool(n) - )