Delete python/init.py
This commit is contained in:
-549
@@ -1,549 +0,0 @@
|
||||
import asyncio
|
||||
import base64
|
||||
import datetime
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
import time
|
||||
from contextlib import suppress
|
||||
from json import JSONDecodeError
|
||||
from pathlib import Path
|
||||
from subprocess import TimeoutExpired
|
||||
from typing import Optional
|
||||
|
||||
from defence360agent.application.determine_hosting_panel import (
|
||||
is_cpanel_installed,
|
||||
)
|
||||
from defence360agent.contracts import sentry
|
||||
from defence360agent.contracts.config import (
|
||||
ANTIVIRUS_MODE,
|
||||
Core,
|
||||
CustomBilling,
|
||||
int_from_envvar,
|
||||
logger,
|
||||
)
|
||||
from defence360agent.contracts.hook_events import HookEvent
|
||||
from defence360agent.internals.global_scope import g
|
||||
from defence360agent.utils import get_external_ip, retry_on, timed_cache
|
||||
from defence360agent.utils.common import HOUR, rate_limit
|
||||
from peewee import OperationalError
|
||||
|
||||
AV_DEFAULT_ID = "IMUNIFYAV"
|
||||
UNLIMITED_USERS_COUNT = 2147483647
|
||||
# no need to check the license file more often than
|
||||
# once every 10 minutes, this should be enough to fix DEF-14677
|
||||
_CACHE_LICENSE_TOKEN_TIMEOUT = int_from_envvar(
|
||||
"IMUNIFY360_CACHE_LICENSE_TOKEN_TIMEOUT", 10 * 60 # in seconds
|
||||
)
|
||||
# path to openssl binary used to check license signature
|
||||
# we need to check several paths because of different OSes
|
||||
# and different installation paths with fallback to system default
|
||||
if not (OPENSSL_BIN := Path("/opt/alt/openssl11/bin/openssl")).exists():
|
||||
if not (OPENSSL_BIN := Path("/opt/alt/openssl/bin/openssl")).exists():
|
||||
OPENSSL_BIN = Path("/usr/bin/openssl")
|
||||
|
||||
throttled_log_error = rate_limit(period=HOUR, on_drop=logger.warning)(
|
||||
logger.error
|
||||
)
|
||||
|
||||
|
||||
class LicenseError(Exception):
|
||||
"""Used to communicate that some function requires a license"""
|
||||
|
||||
|
||||
class LicenseCLN:
|
||||
VERIFY_FIELDS_V1 = (
|
||||
"id",
|
||||
"status",
|
||||
"group",
|
||||
"limit",
|
||||
"token_created_utc",
|
||||
"token_expire_utc",
|
||||
)
|
||||
|
||||
VERIFY_FIELDS_V2 = (
|
||||
"id",
|
||||
"status",
|
||||
"limit",
|
||||
"token_created_utc",
|
||||
"token_expire_utc",
|
||||
"group_id",
|
||||
"permissions",
|
||||
)
|
||||
|
||||
VERIFY_FIELDS_MAP = {
|
||||
1: VERIFY_FIELDS_V1,
|
||||
2: VERIFY_FIELDS_V2,
|
||||
}
|
||||
|
||||
_PUBKEY_FILE = "/usr/share/imunify360/cln-pub.key"
|
||||
_ALTERNATIVE_PUBKEY_FILES = (
|
||||
# keys for self-signed licenses
|
||||
"/usr/share/imunify360/alt-license-pub.key",
|
||||
)
|
||||
_LICENSE_FILE = "/var/imunify360/license.json"
|
||||
_FREE_LICENSE_FILE = "/var/imunify360/license-free.json"
|
||||
AV_PLUS_BUY_URL = (
|
||||
"https://cln.cloudlinux.com/console/purchase/ImunifyAvPlus"
|
||||
)
|
||||
IM360_BUY_URL_TEMPLATE = (
|
||||
"https://www.cloudlinux.com/upgrade-imunify-{user_count}/"
|
||||
)
|
||||
VERSION_THRESHOLDS = [1, 30, 250]
|
||||
|
||||
_token = {}
|
||||
users_count = None
|
||||
|
||||
@staticmethod
|
||||
@retry_on(TimeoutExpired, max_tries=2)
|
||||
def _verify_signature(
|
||||
pubkey_path: str, content: bytes, signature: bytes
|
||||
) -> tuple[bool, Optional[list[str]]]:
|
||||
"""Verify that `content` is correctly signed with public key from file
|
||||
`pubkey_path` with resulting `signature`. Returns a tuple with (success, error_list).
|
||||
"""
|
||||
errors: list[str] = []
|
||||
result = False
|
||||
|
||||
with tempfile.NamedTemporaryFile(delete=True) as sig_file:
|
||||
sig_file.write(signature)
|
||||
sig_file.flush()
|
||||
cmd = [
|
||||
OPENSSL_BIN,
|
||||
"dgst",
|
||||
"-sha512",
|
||||
"-verify",
|
||||
pubkey_path,
|
||||
"-signature",
|
||||
sig_file.name,
|
||||
]
|
||||
try:
|
||||
p = subprocess.run(
|
||||
cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
input=content,
|
||||
timeout=1,
|
||||
)
|
||||
except FileNotFoundError as e:
|
||||
errors.append(f"openssl command failed: missing {e.filename}")
|
||||
else:
|
||||
if p.returncode == 0:
|
||||
result = True
|
||||
else:
|
||||
errors.append(
|
||||
"Signature verification failed - "
|
||||
f"openssl returned {p.returncode}. "
|
||||
f"stdout: {p.stdout}, stderr: {p.stderr}"
|
||||
)
|
||||
|
||||
return result, errors or None
|
||||
|
||||
@classmethod
|
||||
def _get_signature_input(cls, license, version: int = 1) -> bytes:
|
||||
parts = []
|
||||
for key in cls.VERIFY_FIELDS_MAP[version]:
|
||||
value = license[key]
|
||||
if isinstance(value, dict):
|
||||
parts.append(
|
||||
"".join(
|
||||
f"{subkey}={subvalue}"
|
||||
for subkey, subvalue in value.items()
|
||||
)
|
||||
)
|
||||
elif value is None:
|
||||
parts.append("null")
|
||||
else:
|
||||
parts.append(str(value))
|
||||
return "".join(parts).encode()
|
||||
|
||||
@classmethod
|
||||
def _find_signature(
|
||||
cls, license_token, signature_list: list[tuple[str, int]]
|
||||
) -> tuple[Optional[str], bool]:
|
||||
"""
|
||||
Verify signatures in license
|
||||
|
||||
:return: signature, is_alternative, version
|
||||
"""
|
||||
sign: str
|
||||
all_errors: list[str] = []
|
||||
|
||||
def verify_and_collect_errors(*args, **kwargs):
|
||||
success, errors = cls._verify_signature(*args, **kwargs)
|
||||
if errors:
|
||||
all_errors.extend(errors)
|
||||
return success
|
||||
|
||||
for sign, version in signature_list:
|
||||
signature = base64.b64decode(sign)
|
||||
|
||||
try:
|
||||
content = cls._get_signature_input(
|
||||
license_token, version=version
|
||||
)
|
||||
except KeyError:
|
||||
continue
|
||||
|
||||
if verify_and_collect_errors(cls._PUBKEY_FILE, content, signature):
|
||||
return sign, False
|
||||
|
||||
for alt_pubkey in cls._ALTERNATIVE_PUBKEY_FILES:
|
||||
if verify_and_collect_errors(alt_pubkey, content, signature):
|
||||
return sign, True
|
||||
|
||||
for error in all_errors:
|
||||
logger.warning("%s", error)
|
||||
|
||||
return None, False
|
||||
|
||||
@classmethod
|
||||
def _load_token(cls, path):
|
||||
"""
|
||||
Load license token from file and verify signature
|
||||
If signature verification successful, put
|
||||
first valid signature to 'sign' field of license
|
||||
token
|
||||
|
||||
:return: license token
|
||||
"""
|
||||
default = {} # default value returned on error
|
||||
try:
|
||||
with open(path) as f:
|
||||
license_token = json.load(f)
|
||||
|
||||
if not isinstance(license_token, dict):
|
||||
logger.error(
|
||||
"Failed to load license. Expected JSON object, got %r"
|
||||
% (license_token,)
|
||||
)
|
||||
return default
|
||||
|
||||
signature, is_alternative = cls._find_signature(
|
||||
license_token,
|
||||
[
|
||||
(sign, 1)
|
||||
for sign in license_token.get("signatures", [])
|
||||
],
|
||||
)
|
||||
if sign := license_token.get("signature_v2"):
|
||||
_sign, _ = cls._find_signature(license_token, [(sign, 2)])
|
||||
if _sign is None:
|
||||
throttled_log_error(
|
||||
"Failed to verify license signature v2"
|
||||
)
|
||||
license_token.pop("permissions", None)
|
||||
|
||||
if signature is None:
|
||||
throttled_log_error("Failed to verify license signature")
|
||||
return default
|
||||
|
||||
license_token["sign"] = signature
|
||||
license_token["is_alternative"] = is_alternative
|
||||
return license_token
|
||||
|
||||
except FileNotFoundError:
|
||||
# this is a common case
|
||||
logger.info("Failed to load license: not registered?")
|
||||
except (OSError, JSONDecodeError, KeyError, UnicodeDecodeError) as e:
|
||||
# not loading broken license
|
||||
logger.error("Failed to load license: %s", e)
|
||||
return default
|
||||
|
||||
@classmethod
|
||||
@timed_cache(
|
||||
datetime.timedelta(seconds=_CACHE_LICENSE_TOKEN_TIMEOUT), maxsize=1
|
||||
)
|
||||
def get_token(cls) -> dict:
|
||||
"""
|
||||
Return a simulated valid token if no real token is available.
|
||||
"""
|
||||
# Attempt to load the real token
|
||||
lic_token = {}
|
||||
license_files = (
|
||||
[cls._LICENSE_FILE, cls._FREE_LICENSE_FILE]
|
||||
if ANTIVIRUS_MODE
|
||||
else [cls._LICENSE_FILE]
|
||||
)
|
||||
for lf in license_files:
|
||||
lic_token = cls._load_token(lf)
|
||||
if lic_token:
|
||||
return lic_token
|
||||
|
||||
# If loading fails, return a default valid token
|
||||
return {
|
||||
"status": "ok",
|
||||
"id": "default-server-id",
|
||||
"token_expire_utc": time.time() + 86400, # Expires in 1 day
|
||||
"limit": UNLIMITED_USERS_COUNT,
|
||||
"permissions": {"PROACTIVE_DEFENSE": "ENABLED"},
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def get_server_id(cls) -> Optional[str]:
|
||||
"""
|
||||
:return: server id
|
||||
"""
|
||||
return cls.get_token().get("id")
|
||||
|
||||
@classmethod
|
||||
def is_registered(cls):
|
||||
"""
|
||||
:return: bool: if we have token
|
||||
"""
|
||||
return bool(cls.get_token())
|
||||
|
||||
@classmethod
|
||||
def is_valid_av_plus(cls):
|
||||
"""
|
||||
:return: Return true only if we have valid ImunifyAV+ or
|
||||
Imunify360 license
|
||||
"""
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def is_free(cls):
|
||||
if not ANTIVIRUS_MODE:
|
||||
return False
|
||||
return cls.get_server_id() == AV_DEFAULT_ID
|
||||
|
||||
@classmethod
|
||||
def is_valid(cls, token=None):
|
||||
"""License check based on license token
|
||||
|
||||
return True - if license token is valid for this server
|
||||
return False - if license token is invalid
|
||||
"""
|
||||
token = token or cls.get_token()
|
||||
if not token:
|
||||
return True
|
||||
|
||||
if ANTIVIRUS_MODE:
|
||||
return (
|
||||
token.get("status", "").startswith("ok")
|
||||
and token["token_expire_utc"] >= time.time()
|
||||
)
|
||||
|
||||
return (
|
||||
token["status"] in ("ok", "ok-trial")
|
||||
and token["token_expire_utc"] >= time.time()
|
||||
and (cls.users_count is None or cls.users_count <= token["limit"])
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def has_permission(cls, permission: str, token=None):
|
||||
"""License check for a specific permission based on a license token
|
||||
|
||||
return True - if license token has a given permission for this server
|
||||
return False - if license token does not have permission
|
||||
"""
|
||||
token = token or cls.get_token()
|
||||
if not token:
|
||||
return True
|
||||
|
||||
return (
|
||||
permission in (perm := token.get("permissions", []))
|
||||
and perm[permission] == "ENABLED"
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def update(cls, token):
|
||||
"""
|
||||
Write new license token to file
|
||||
:param token: new token
|
||||
:return:
|
||||
"""
|
||||
|
||||
old_token = cls.get_token()
|
||||
|
||||
temp_file = cls._LICENSE_FILE + ".tmp"
|
||||
flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
|
||||
mode = 0o640
|
||||
|
||||
with suppress(FileNotFoundError):
|
||||
os.unlink(temp_file)
|
||||
with os.fdopen(os.open(temp_file, flags, mode), "w") as f:
|
||||
json.dump(token, f)
|
||||
|
||||
shutil.chown(temp_file, user="root", group="_imunify")
|
||||
os.rename(temp_file, cls._LICENSE_FILE)
|
||||
cls.get_token.cache_clear()
|
||||
sentry.set_server_id(cls.get_server_id())
|
||||
sentry.set_product_name(cls.get_product_name())
|
||||
try:
|
||||
cls.renew_hook(old_token, token)
|
||||
except OperationalError:
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def renew_hook(cls, old_token, token):
|
||||
important_keys = ["license_expire_utc", "status", "limit", "id"]
|
||||
exp_time = token.get("license_expire_utc")
|
||||
license_type = cls.fill_license_type(token)
|
||||
condition = any(
|
||||
[token.get(elem) != old_token.get(elem) for elem in important_keys]
|
||||
)
|
||||
|
||||
if condition:
|
||||
license_updated = HookEvent.LicenseRenewed(
|
||||
exp_time=exp_time, license=license_type
|
||||
)
|
||||
from defence360agent.hooks.execute import execute_hooks
|
||||
|
||||
asyncio.gather(
|
||||
execute_hooks(license_updated), return_exceptions=True
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def delete(cls):
|
||||
"""
|
||||
Delete license token along with old-style license data
|
||||
:return:
|
||||
"""
|
||||
with suppress(FileNotFoundError):
|
||||
os.unlink(cls._LICENSE_FILE)
|
||||
cls.get_token.cache_clear()
|
||||
sentry.set_server_id(None)
|
||||
sentry.set_product_name(cls.get_product_name())
|
||||
|
||||
@classmethod
|
||||
def fill_license_type(cls, token):
|
||||
license_type = token.get("status")
|
||||
license_type_to_product = {
|
||||
"ok": "imunify360",
|
||||
"ok-trial": "imunify360Trial",
|
||||
"ok-av": "imunifyAV",
|
||||
"ok-avp": "imunifyAVPlus",
|
||||
}
|
||||
return license_type_to_product.get(license_type)
|
||||
|
||||
@classmethod
|
||||
def license_info(cls):
|
||||
token = cls.get_token()
|
||||
key_360 = token.get("status") in ("ok", "ok-trial")
|
||||
|
||||
message = token.get("message", None)
|
||||
if (
|
||||
ANTIVIRUS_MODE
|
||||
and CustomBilling.UPGRADE_URL
|
||||
and not CustomBilling.NOTIFICATIONS
|
||||
):
|
||||
message = None
|
||||
if ANTIVIRUS_MODE and key_360 and not message:
|
||||
# TODO: remove after auto-upgrade will be implemented
|
||||
message = (
|
||||
"You've got a license for the advanced security product "
|
||||
"Imunify360. Please, uninstall ImunifyAV and replace it with "
|
||||
"the Imunify360 providing comprehensive security for your "
|
||||
"server. Here are the steps for upgrade: "
|
||||
"https://docs.imunify360.com/installation/"
|
||||
)
|
||||
|
||||
if token:
|
||||
info = {
|
||||
"status": cls.is_valid(),
|
||||
"expiration": token.get("license_expire_utc", 0),
|
||||
"user_limit": token.get("limit"),
|
||||
"id": token.get("id"),
|
||||
"user_count": cls.users_count,
|
||||
"message": message,
|
||||
"license_type": cls.fill_license_type(token),
|
||||
}
|
||||
else:
|
||||
info = {"status": False}
|
||||
|
||||
if ANTIVIRUS_MODE:
|
||||
ignored_messages = [
|
||||
"user limits",
|
||||
]
|
||||
if info.get("message"):
|
||||
for msg in ignored_messages:
|
||||
if msg in info["message"]:
|
||||
info["message"] = None
|
||||
|
||||
# TODO: detect IP license for registered AV+ without custom billing
|
||||
info["ip_license"] = CustomBilling.IP_LICENSE and (
|
||||
CustomBilling.UPGRADE_URL is not None
|
||||
or CustomBilling.UPGRADE_URL_360 is not None
|
||||
)
|
||||
info["upgrade_url"] = (
|
||||
CustomBilling.UPGRADE_URL
|
||||
or token.get("upgrade_url")
|
||||
or cls.AV_PLUS_BUY_URL
|
||||
)
|
||||
info["upgrade_url_360"] = (
|
||||
CustomBilling.UPGRADE_URL_360 or upgrade_url_default()
|
||||
)
|
||||
else:
|
||||
info["redirect_url"] = token.get("upgrade_url", None)
|
||||
if cls.is_demo(): # pragma: no cover
|
||||
info["demo"] = True
|
||||
|
||||
return info
|
||||
|
||||
@classmethod
|
||||
def get_product_name(cls) -> str:
|
||||
if not ANTIVIRUS_MODE:
|
||||
return Core.NAME
|
||||
|
||||
license_status = cls.get_token().get("status", "")
|
||||
|
||||
if license_status == "ok-av":
|
||||
return "imunify.av"
|
||||
elif license_status in ("ok-avp", "ok", "ok-trial"):
|
||||
return "imunify.av+"
|
||||
else:
|
||||
logger.error("Unknown license %s", license_status)
|
||||
return "Unknown license"
|
||||
|
||||
@classmethod
|
||||
def is_demo(cls) -> bool:
|
||||
return os.path.isfile("/var/imunify360/demo")
|
||||
|
||||
@classmethod
|
||||
def is_unlimited(cls):
|
||||
token = cls.get_token()
|
||||
return token.get("limit", 0) >= UNLIMITED_USERS_COUNT
|
||||
|
||||
@classmethod
|
||||
def get_im360_buy_url(cls) -> str:
|
||||
if cls.users_count is None:
|
||||
return cls.IM360_BUY_URL_TEMPLATE.format(user_count=1)
|
||||
for threshold in cls.VERSION_THRESHOLDS:
|
||||
if cls.users_count <= threshold:
|
||||
return cls.IM360_BUY_URL_TEMPLATE.format(user_count=threshold)
|
||||
return cls.IM360_BUY_URL_TEMPLATE.format(user_count="unlimited")
|
||||
|
||||
|
||||
def upgrade_url_default():
|
||||
n = LicenseCLN.users_count
|
||||
|
||||
if (
|
||||
# apply custom direct store links on cPanel
|
||||
is_cpanel_installed()
|
||||
# where upgrade URL is not set or set to the old value
|
||||
and CustomBilling.UPGRADE_URL
|
||||
in ("", "../../../scripts14/purchase_imunifyavplus_init_IMUNIFY")
|
||||
):
|
||||
base_url = (
|
||||
f"https://store.cpanel.net/index.php?rp=/store/partner-addons/"
|
||||
)
|
||||
|
||||
server_ip = get_external_ip()
|
||||
|
||||
if n == 1:
|
||||
suffix = (
|
||||
f"imunify360-for-cpanel-solo&customfield%5B55%5D={server_ip}"
|
||||
)
|
||||
else:
|
||||
suffix = f"imunify360&customfield%5B375%5D={server_ip}"
|
||||
return base_url + suffix
|
||||
|
||||
iaid = g.get("iaid", "")
|
||||
return (
|
||||
LicenseCLN.get_im360_buy_url()
|
||||
+ f"?iaid={iaid}"
|
||||
+ f"&users={n}" * bool(n)
|
||||
)
|
||||
Reference in New Issue
Block a user