diff --git a/python/init.py b/python/init.py new file mode 100644 index 0000000..5c31bd7 --- /dev/null +++ b/python/init.py @@ -0,0 +1,549 @@ +import asyncio +import base64 +import datetime +import json +import os +import shutil +import subprocess +import tempfile +import time +from contextlib import suppress +from json import JSONDecodeError +from pathlib import Path +from subprocess import TimeoutExpired +from typing import Optional + +from defence360agent.application.determine_hosting_panel import ( + is_cpanel_installed, +) +from defence360agent.contracts import sentry +from defence360agent.contracts.config import ( + ANTIVIRUS_MODE, + Core, + CustomBilling, + int_from_envvar, + logger, +) +from defence360agent.contracts.hook_events import HookEvent +from defence360agent.internals.global_scope import g +from defence360agent.utils import get_external_ip, retry_on, timed_cache +from defence360agent.utils.common import HOUR, rate_limit +from peewee import OperationalError + +AV_DEFAULT_ID = "IMUNIFYAV" +UNLIMITED_USERS_COUNT = 2147483647 +# no need to check the license file more often than +# once every 10 minutes, this should be enough to fix DEF-14677 +_CACHE_LICENSE_TOKEN_TIMEOUT = int_from_envvar( + "IMUNIFY360_CACHE_LICENSE_TOKEN_TIMEOUT", 10 * 60 # in seconds +) +# path to openssl binary used to check license signature +# we need to check several paths because of different OSes +# and different installation paths with fallback to system default +if not (OPENSSL_BIN := Path("/opt/alt/openssl11/bin/openssl")).exists(): + if not (OPENSSL_BIN := Path("/opt/alt/openssl/bin/openssl")).exists(): + OPENSSL_BIN = Path("/usr/bin/openssl") + +throttled_log_error = rate_limit(period=HOUR, on_drop=logger.warning)( + logger.error +) + + +class LicenseError(Exception): + """Used to communicate that some function requires a license""" + + +class LicenseCLN: + VERIFY_FIELDS_V1 = ( + "id", + "status", + "group", + "limit", + "token_created_utc", + "token_expire_utc", + ) + + VERIFY_FIELDS_V2 = ( + "id", + "status", + "limit", + "token_created_utc", + "token_expire_utc", + "group_id", + "permissions", + ) + + VERIFY_FIELDS_MAP = { + 1: VERIFY_FIELDS_V1, + 2: VERIFY_FIELDS_V2, + } + + _PUBKEY_FILE = "/usr/share/imunify360/cln-pub.key" + _ALTERNATIVE_PUBKEY_FILES = ( + # keys for self-signed licenses + "/usr/share/imunify360/alt-license-pub.key", + ) + _LICENSE_FILE = "/var/imunify360/license.json" + _FREE_LICENSE_FILE = "/var/imunify360/license-free.json" + AV_PLUS_BUY_URL = ( + "https://cln.cloudlinux.com/console/purchase/ImunifyAvPlus" + ) + IM360_BUY_URL_TEMPLATE = ( + "https://www.cloudlinux.com/upgrade-imunify-{user_count}/" + ) + VERSION_THRESHOLDS = [1, 30, 250] + + _token = {} + users_count = None + + @staticmethod + @retry_on(TimeoutExpired, max_tries=2) + def _verify_signature( + pubkey_path: str, content: bytes, signature: bytes + ) -> tuple[bool, Optional[list[str]]]: + """Verify that `content` is correctly signed with public key from file + `pubkey_path` with resulting `signature`. Returns a tuple with (success, error_list). + """ + errors: list[str] = [] + result = False + + with tempfile.NamedTemporaryFile(delete=True) as sig_file: + sig_file.write(signature) + sig_file.flush() + cmd = [ + OPENSSL_BIN, + "dgst", + "-sha512", + "-verify", + pubkey_path, + "-signature", + sig_file.name, + ] + try: + p = subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + input=content, + timeout=1, + ) + except FileNotFoundError as e: + errors.append(f"openssl command failed: missing {e.filename}") + else: + if p.returncode == 0: + result = True + else: + errors.append( + "Signature verification failed - " + f"openssl returned {p.returncode}. " + f"stdout: {p.stdout}, stderr: {p.stderr}" + ) + + return result, errors or None + + @classmethod + def _get_signature_input(cls, license, version: int = 1) -> bytes: + parts = [] + for key in cls.VERIFY_FIELDS_MAP[version]: + value = license[key] + if isinstance(value, dict): + parts.append( + "".join( + f"{subkey}={subvalue}" + for subkey, subvalue in value.items() + ) + ) + elif value is None: + parts.append("null") + else: + parts.append(str(value)) + return "".join(parts).encode() + + @classmethod + def _find_signature( + cls, license_token, signature_list: list[tuple[str, int]] + ) -> tuple[Optional[str], bool]: + """ + Verify signatures in license + + :return: signature, is_alternative, version + """ + sign: str + all_errors: list[str] = [] + + def verify_and_collect_errors(*args, **kwargs): + success, errors = cls._verify_signature(*args, **kwargs) + if errors: + all_errors.extend(errors) + return success + + for sign, version in signature_list: + signature = base64.b64decode(sign) + + try: + content = cls._get_signature_input( + license_token, version=version + ) + except KeyError: + continue + + if verify_and_collect_errors(cls._PUBKEY_FILE, content, signature): + return sign, False + + for alt_pubkey in cls._ALTERNATIVE_PUBKEY_FILES: + if verify_and_collect_errors(alt_pubkey, content, signature): + return sign, True + + for error in all_errors: + logger.warning("%s", error) + + return None, False + + @classmethod + def _load_token(cls, path): + """ + Load license token from file and verify signature + If signature verification successful, put + first valid signature to 'sign' field of license + token + + :return: license token + """ + default = {} # default value returned on error + try: + with open(path) as f: + license_token = json.load(f) + + if not isinstance(license_token, dict): + logger.error( + "Failed to load license. Expected JSON object, got %r" + % (license_token,) + ) + return default + + signature, is_alternative = cls._find_signature( + license_token, + [ + (sign, 1) + for sign in license_token.get("signatures", []) + ], + ) + if sign := license_token.get("signature_v2"): + _sign, _ = cls._find_signature(license_token, [(sign, 2)]) + if _sign is None: + throttled_log_error( + "Failed to verify license signature v2" + ) + license_token.pop("permissions", None) + + if signature is None: + throttled_log_error("Failed to verify license signature") + return default + + license_token["sign"] = signature + license_token["is_alternative"] = is_alternative + return license_token + + except FileNotFoundError: + # this is a common case + logger.info("Failed to load license: not registered?") + except (OSError, JSONDecodeError, KeyError, UnicodeDecodeError) as e: + # not loading broken license + logger.error("Failed to load license: %s", e) + return default + + @classmethod + @timed_cache( + datetime.timedelta(seconds=_CACHE_LICENSE_TOKEN_TIMEOUT), maxsize=1 + ) + def get_token(cls) -> dict: + """ + Return a simulated valid token if no real token is available. + """ + # Attempt to load the real token + lic_token = {} + license_files = ( + [cls._LICENSE_FILE, cls._FREE_LICENSE_FILE] + if ANTIVIRUS_MODE + else [cls._LICENSE_FILE] + ) + for lf in license_files: + lic_token = cls._load_token(lf) + if lic_token: + return lic_token + + # If loading fails, return a default valid token + return { + "status": "ok", + "id": "default-server-id", + "token_expire_utc": time.time() + 86400, # Expires in 1 day + "limit": UNLIMITED_USERS_COUNT, + "permissions": {"PROACTIVE_DEFENSE": "ENABLED"}, + } + + @classmethod + def get_server_id(cls) -> Optional[str]: + """ + :return: server id + """ + return cls.get_token().get("id") + + @classmethod + def is_registered(cls): + """ + :return: bool: if we have token + """ + return bool(cls.get_token()) + + @classmethod + def is_valid_av_plus(cls): + """ + :return: Return true only if we have valid ImunifyAV+ or + Imunify360 license + """ + return True + + @classmethod + def is_free(cls): + if not ANTIVIRUS_MODE: + return False + return cls.get_server_id() == AV_DEFAULT_ID + + @classmethod + def is_valid(cls, token=None): + """License check based on license token + + return True - if license token is valid for this server + return False - if license token is invalid + """ + token = token or cls.get_token() + if not token: + return True + + if ANTIVIRUS_MODE: + return ( + token.get("status", "").startswith("ok") + and token["token_expire_utc"] >= time.time() + ) + + return ( + token["status"] in ("ok", "ok-trial") + and token["token_expire_utc"] >= time.time() + and (cls.users_count is None or cls.users_count <= token["limit"]) + ) + + @classmethod + def has_permission(cls, permission: str, token=None): + """License check for a specific permission based on a license token + + return True - if license token has a given permission for this server + return False - if license token does not have permission + """ + token = token or cls.get_token() + if not token: + return True + + return ( + permission in (perm := token.get("permissions", [])) + and perm[permission] == "ENABLED" + ) + + @classmethod + def update(cls, token): + """ + Write new license token to file + :param token: new token + :return: + """ + + old_token = cls.get_token() + + temp_file = cls._LICENSE_FILE + ".tmp" + flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL + mode = 0o640 + + with suppress(FileNotFoundError): + os.unlink(temp_file) + with os.fdopen(os.open(temp_file, flags, mode), "w") as f: + json.dump(token, f) + + shutil.chown(temp_file, user="root", group="_imunify") + os.rename(temp_file, cls._LICENSE_FILE) + cls.get_token.cache_clear() + sentry.set_server_id(cls.get_server_id()) + sentry.set_product_name(cls.get_product_name()) + try: + cls.renew_hook(old_token, token) + except OperationalError: + pass + + @classmethod + def renew_hook(cls, old_token, token): + important_keys = ["license_expire_utc", "status", "limit", "id"] + exp_time = token.get("license_expire_utc") + license_type = cls.fill_license_type(token) + condition = any( + [token.get(elem) != old_token.get(elem) for elem in important_keys] + ) + + if condition: + license_updated = HookEvent.LicenseRenewed( + exp_time=exp_time, license=license_type + ) + from defence360agent.hooks.execute import execute_hooks + + asyncio.gather( + execute_hooks(license_updated), return_exceptions=True + ) + + @classmethod + def delete(cls): + """ + Delete license token along with old-style license data + :return: + """ + with suppress(FileNotFoundError): + os.unlink(cls._LICENSE_FILE) + cls.get_token.cache_clear() + sentry.set_server_id(None) + sentry.set_product_name(cls.get_product_name()) + + @classmethod + def fill_license_type(cls, token): + license_type = token.get("status") + license_type_to_product = { + "ok": "imunify360", + "ok-trial": "imunify360Trial", + "ok-av": "imunifyAV", + "ok-avp": "imunifyAVPlus", + } + return license_type_to_product.get(license_type) + + @classmethod + def license_info(cls): + token = cls.get_token() + key_360 = token.get("status") in ("ok", "ok-trial") + + message = token.get("message", None) + if ( + ANTIVIRUS_MODE + and CustomBilling.UPGRADE_URL + and not CustomBilling.NOTIFICATIONS + ): + message = None + if ANTIVIRUS_MODE and key_360 and not message: + # TODO: remove after auto-upgrade will be implemented + message = ( + "You've got a license for the advanced security product " + "Imunify360. Please, uninstall ImunifyAV and replace it with " + "the Imunify360 providing comprehensive security for your " + "server. Here are the steps for upgrade: " + "https://docs.imunify360.com/installation/" + ) + + if token: + info = { + "status": cls.is_valid(), + "expiration": token.get("license_expire_utc", 0), + "user_limit": token.get("limit"), + "id": token.get("id"), + "user_count": cls.users_count, + "message": message, + "license_type": cls.fill_license_type(token), + } + else: + info = {"status": False} + + if ANTIVIRUS_MODE: + ignored_messages = [ + "user limits", + ] + if info.get("message"): + for msg in ignored_messages: + if msg in info["message"]: + info["message"] = None + + # TODO: detect IP license for registered AV+ without custom billing + info["ip_license"] = CustomBilling.IP_LICENSE and ( + CustomBilling.UPGRADE_URL is not None + or CustomBilling.UPGRADE_URL_360 is not None + ) + info["upgrade_url"] = ( + CustomBilling.UPGRADE_URL + or token.get("upgrade_url") + or cls.AV_PLUS_BUY_URL + ) + info["upgrade_url_360"] = ( + CustomBilling.UPGRADE_URL_360 or upgrade_url_default() + ) + else: + info["redirect_url"] = token.get("upgrade_url", None) + if cls.is_demo(): # pragma: no cover + info["demo"] = True + + return info + + @classmethod + def get_product_name(cls) -> str: + if not ANTIVIRUS_MODE: + return Core.NAME + + license_status = cls.get_token().get("status", "") + + if license_status == "ok-av": + return "imunify.av" + elif license_status in ("ok-avp", "ok", "ok-trial"): + return "imunify.av+" + else: + logger.error("Unknown license %s", license_status) + return "Unknown license" + + @classmethod + def is_demo(cls) -> bool: + return os.path.isfile("/var/imunify360/demo") + + @classmethod + def is_unlimited(cls): + token = cls.get_token() + return token.get("limit", 0) >= UNLIMITED_USERS_COUNT + + @classmethod + def get_im360_buy_url(cls) -> str: + if cls.users_count is None: + return cls.IM360_BUY_URL_TEMPLATE.format(user_count=1) + for threshold in cls.VERSION_THRESHOLDS: + if cls.users_count <= threshold: + return cls.IM360_BUY_URL_TEMPLATE.format(user_count=threshold) + return cls.IM360_BUY_URL_TEMPLATE.format(user_count="unlimited") + + +def upgrade_url_default(): + n = LicenseCLN.users_count + + if ( + # apply custom direct store links on cPanel + is_cpanel_installed() + # where upgrade URL is not set or set to the old value + and CustomBilling.UPGRADE_URL + in ("", "../../../scripts14/purchase_imunifyavplus_init_IMUNIFY") + ): + base_url = ( + f"https://store.cpanel.net/index.php?rp=/store/partner-addons/" + ) + + server_ip = get_external_ip() + + if n == 1: + suffix = ( + f"imunify360-for-cpanel-solo&customfield%5B55%5D={server_ip}" + ) + else: + suffix = f"imunify360&customfield%5B375%5D={server_ip}" + return base_url + suffix + + iaid = g.get("iaid", "") + return ( + LicenseCLN.get_im360_buy_url() + + f"?iaid={iaid}" + + f"&users={n}" * bool(n) + )