D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
proc
/
self
/
root
/
opt
/
cloudlinux
/
venv
/
lib64
/
python3.11
/
site-packages
/
clcagefslib
/
Filename :
domain.py
back
Copy
#!/opt/cloudlinux/venv/bin/python3 -sbb # -*- coding: utf-8 -*- # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENCE.TXT # import logging import os import shutil import subprocess import tempfile from collections import defaultdict from pathlib import Path from clcommon.clcagefs import setup_mount_dir_cagefs, CAGEFSCTL_TOOL from clcommon.cpapi import cpusers from clcommon.cpapi import docroot as get_domain_docroot from clcommon.cpapi.cpapiexceptions import NoDomain from .fs import user_exists from .exceptions import UserNotFoundError from .webisolation import admin_config, config, jail_utils from .webisolation.config import DOCROOTS_ISOLATED_BASE from .webisolation.jail_config_builder import write_jail_mounts_config from .webisolation.php import reload_processes_with_docroots from .webisolation.service import start_monitoring_service, stop_monitoring_service from .webisolation.triggers import trigger_xray_ini_regeneration, trigger_ssa_ini_regeneration def is_website_isolation_allowed_server_wide(): return os.path.isfile(admin_config.WEBSITE_ISOLATION_MARKER) def is_website_isolation_feature_available(): return os.path.isfile(admin_config.WEBSITE_ISOLATION_AVAILABLE_MARKER) def get_isolation_user_mode() -> str | None: """Return the current user mode for website isolation. Returns: ``"allow_all"`` – all users allowed, denied dir lists exceptions. ``"deny_all"`` – no users allowed, allowed dir lists exceptions. ``None`` – not initialised yet. """ has_denied = os.path.isdir(admin_config.ISOLATION_DENIED_DIR) has_allowed = os.path.isdir(admin_config.ISOLATION_ALLOWED_DIR) if has_denied and has_allowed: # Error state – both dirs present. Treat as allow_all and clean up. logging.warning( "Both site-isolation.users.allowed and site-isolation.users.denied " "directories exist. Removing allowed directory, treating as allow_all mode." ) shutil.rmtree(admin_config.ISOLATION_ALLOWED_DIR, ignore_errors=True) return "allow_all" if has_denied: return "allow_all" if has_allowed: return "deny_all" return None def is_website_isolation_allowed_for_user(user: str) -> bool: """Check whether *user* is allowed to use website isolation. Combines the global marker with the two-mode user model: * **allow_all** – allowed unless the user is in the denied directory. * **deny_all** – denied unless the user is in the allowed directory. """ if not os.path.isfile(admin_config.WEBSITE_ISOLATION_MARKER): return False mode = get_isolation_user_mode() if mode == "allow_all": return not admin_config.user_in_dir(admin_config.ISOLATION_DENIED_DIR, user) if mode == "deny_all": return admin_config.user_in_dir(admin_config.ISOLATION_ALLOWED_DIR, user) return False # not initialised def _ensure_isolation_mount_and_marker(): """Set up mount directories and the global marker if not already done.""" if not os.path.isfile(admin_config.WEBSITE_ISOLATION_MARKER): setup_mount_dir_cagefs( str(DOCROOTS_ISOLATED_BASE), prefix="*", remount_cagefs=True, remount_in_background=False, ) marker_path = Path(admin_config.WEBSITE_ISOLATION_MARKER) marker_path.parent.mkdir(parents=True, exist_ok=True) marker_path.touch() # Remount kills redis processes, restart clwpos_monitoring service if it's running subprocess.run( ["/usr/bin/systemctl", "try-restart", "clwpos_monitoring.service"], capture_output=True, text=True ) PROXY_COMMANDS_PATH = "/etc/cagefs/proxy.commands" CAGEFSCTL_USER_PROXY_ENTRY = "CAGEFSCTL_USER:noproceed=root:/usr/sbin/cagefsctl-user" CAGEFSCTL_USER_BINARIES = [ "/usr/sbin/cagefsctl-user", ] def ensure_proxyexec_command(): """Register the ``cagefsctl-user`` proxyexec alias if not already present. Appends the ``CAGEFSCTL_USER`` entry to ``/etc/cagefs/proxy.commands`` and runs ``cagefsctl --update-list`` to pull the required binaries into the CageFS skeleton. This is a no-op when the entry already exists. """ try: with open(PROXY_COMMANDS_PATH, "r", encoding="utf-8") as f: content = f.read() except FileNotFoundError: content = "" if "CAGEFSCTL_USER" in content: return logging.info("Registering cagefsctl-user in %s", PROXY_COMMANDS_PATH) new_content = content if new_content and not new_content.endswith("\n"): new_content += "\n" new_content += CAGEFSCTL_USER_PROXY_ENTRY + "\n" proxy_dir = os.path.dirname(PROXY_COMMANDS_PATH) os.makedirs(proxy_dir, exist_ok=True) fd, tmp_path = tempfile.mkstemp(dir=proxy_dir, prefix=".proxy.commands.") try: with os.fdopen(fd, "w", encoding="utf-8") as f: f.write(new_content) os.replace(tmp_path, PROXY_COMMANDS_PATH) except BaseException: os.unlink(tmp_path) raise update_list = ("\n".join(CAGEFSCTL_USER_BINARIES) + "\n").encode() subprocess.run( [CAGEFSCTL_TOOL, "--wait-lock", "--update-list"], input=update_list, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False, ) def toggle_isolation_user_mode() -> str: """Flip the isolation user mode without modifying any per-user state. Unlike :func:`allow_website_isolation_server_wide` and :func:`deny_website_isolation_server_wide`, this function only flips the mode indicator directories. It does **not** clean up existing user isolation or alter the per-user exception lists. * ``allow_all`` → ``deny_all`` * ``deny_all`` → ``allow_all`` * not initialised → ``allow_all`` Returns: The new mode after toggling (``"allow_all"`` or ``"deny_all"``). """ _ensure_isolation_mount_and_marker() current = get_isolation_user_mode() if current == "allow_all": new_mode = "deny_all" os.makedirs(admin_config.ISOLATION_ALLOWED_DIR, mode=admin_config.DIR_MODE, exist_ok=True) shutil.rmtree(admin_config.ISOLATION_DENIED_DIR, ignore_errors=True) else: # deny_all or not initialised → allow_all new_mode = "allow_all" os.makedirs(admin_config.ISOLATION_DENIED_DIR, mode=admin_config.DIR_MODE, exist_ok=True) shutil.rmtree(admin_config.ISOLATION_ALLOWED_DIR, ignore_errors=True) return new_mode def allow_website_isolation_server_wide(): """Switch to *allow_all* mode – all users are allowed by default.""" _ensure_isolation_mount_and_marker() ensure_proxyexec_command() # Create empty denied-users directory → allow_all mode indicator os.makedirs(admin_config.ISOLATION_DENIED_DIR, mode=admin_config.DIR_MODE, exist_ok=True) # Remove allowed-users directory (belongs to deny_all mode) shutil.rmtree(admin_config.ISOLATION_ALLOWED_DIR, ignore_errors=True) def deny_website_isolation_server_wide(): """Switch to *deny_all* mode – no users are allowed by default. Disables domain isolation for every user and switches the mode. """ _cleanup_all_users_isolation() _ensure_isolation_mount_and_marker() # Create empty allowed-users directory → deny_all mode indicator os.makedirs(admin_config.ISOLATION_ALLOWED_DIR, mode=admin_config.DIR_MODE, exist_ok=True) # Remove denied-users directory (belongs to allow_all mode) shutil.rmtree(admin_config.ISOLATION_DENIED_DIR, ignore_errors=True) def allow_website_isolation_for_user(username: str): """Allow website isolation for *username* (mode-aware). * **allow_all** – removes *username* from the denied directory. * **deny_all** – adds *username* to the allowed directory. * **not initialised** – sets up infrastructure in *deny_all* mode with *username* as the first allowed user. """ _ensure_isolation_mount_and_marker() ensure_proxyexec_command() mode = get_isolation_user_mode() if mode == "allow_all": admin_config.remove_user_from_dir(admin_config.ISOLATION_DENIED_DIR, username) elif mode == "deny_all": admin_config.add_user_to_dir(admin_config.ISOLATION_ALLOWED_DIR, username) else: # Not initialised → start in deny_all mode with this user allowed os.makedirs(admin_config.ISOLATION_ALLOWED_DIR, mode=admin_config.DIR_MODE, exist_ok=True) admin_config.add_user_to_dir(admin_config.ISOLATION_ALLOWED_DIR, username) def deny_website_isolation_for_user(username: str): """Deny website isolation for *username* (mode-aware). * **allow_all** – adds *username* to the denied directory. * **deny_all** – removes *username* from the allowed directory. Also disables all domain isolation for the user. """ mode = get_isolation_user_mode() if mode == "allow_all": admin_config.add_user_to_dir(admin_config.ISOLATION_DENIED_DIR, username) elif mode == "deny_all": admin_config.remove_user_from_dir(admin_config.ISOLATION_ALLOWED_DIR, username) # Clean up existing domain isolation for the user _cleanup_user_isolation(username) def _cleanup_user_isolation(username: str): """Remove all domain isolation state for a single user.""" if not user_exists(username): return user_cfg = config.load_user_config(username) if not user_cfg.enabled_websites: return domain_docroot_map = { d: _get_docroot_or_none(d) for d in user_cfg.enabled_websites } config.save_user_config(username, config=None) write_jail_mounts_config(username, user_config=None) reload_processes_with_docroots( username, filter_by_docroots=list(domain_docroot_map.values()) ) for d, docroot in domain_docroot_map.items(): if docroot is None: logging.error( "Unable to detect document root for domain %s, " "configuration cleanup failed. Contact CloudLinux support " "if the error repeats.", d, ) continue jail_utils.remove_website_token_directory(username, docroot) def _cleanup_all_users_isolation(): """Remove domain isolation state for every user that has it.""" for username in list(users_with_enabled_domain_isolation()): try: _cleanup_user_isolation(username) except Exception: logging.exception( "Unable to disable website isolation for user %s, skipping.", username, ) users_left = users_with_enabled_domain_isolation() if not users_left: stop_monitoring_service() def _get_docroot_or_none(domain: str): try: return get_domain_docroot(domain)[0] except (NoDomain, IndexError): return None def is_isolation_enabled(user): if not is_website_isolation_allowed_server_wide(): return False try: domains_config_path = jail_utils.get_jail_config_path(user) except UserNotFoundError: return False return os.path.exists(domains_config_path) def users_with_enabled_domain_isolation() -> dict: users = [u for u in cpusers() if user_exists(u) and is_isolation_enabled(u)] user_domain_pairs = {} for user in users: domains_with_isolation = get_websites_with_enabled_isolation(user) if domains_with_isolation: user_domain_pairs[user] = domains_with_isolation return user_domain_pairs def get_websites_with_enabled_isolation(user: str): if not user_exists(user): logging.warning( "User %s not found, cannot get websites with enabled isolation", user) return [] return config.load_user_config(user).enabled_websites def get_docroots_of_isolated_websites() -> dict: """ Returns pairs user: set(docroots) for all users with website isolation enabled Used by monitoring service to watch docroots changes to load actual list of docroot paths instead of stale storage """ users_with_isolation = users_with_enabled_domain_isolation() pairs = defaultdict(set) for user, domains in users_with_isolation.items(): for domain in domains: try: dr = get_domain_docroot(domain)[0] except (NoDomain, IndexError): continue pairs[user].add(dr) return pairs def enable_website_isolation(user, domain): if not user_exists(user): logging.warning( "User %s not found, cannot enable website isolation", user) return user_config = config.load_user_config(user) if domain not in user_config.enabled_websites: user_config.enabled_websites.append(domain) # if it crashes just let the command fail with NoDomain # exception, it should be a very rare case because # we validate input domain name in cagefsctl.py document_root = get_domain_docroot(domain)[0] # Create website token directory and overlay storage jail_utils.create_website_token_directory(user, document_root) jail_utils.create_overlay_storage_directory(user, document_root) config.save_user_config(user, user_config) # regenerate alt-php ini configuration for selector for the specific domain subprocess.run(["cagefsctl", "--rebuild-alt-php-ini", "--domain", domain], check=True) write_jail_mounts_config(user, user_config) reload_processes_with_docroots(user, filter_by_docroots=[_get_docroot_or_none(domain)]) start_monitoring_service() # Trigger xray/ssa ini regeneration for per-domain PHP selector trigger_xray_ini_regeneration(user, domain) trigger_ssa_ini_regeneration(user) def regenerate_isolation_configuration(user): if not user_exists(user): logging.warning( "User %s not found, cannot regenerate website isolation configuration", user) return user_config = config.load_user_config(user) write_jail_mounts_config(user, user_config) document_roots = [] for domain in user_config.enabled_websites: document_root = _get_docroot_or_none(domain) if document_root is None: logging.warning( "Unable to find document root for domain %s, " "please contact CloudLinux support if the issue persists.", domain, ) continue document_roots.append(document_root) try: # recreate tokens and storage e.g. when username changes jail_utils.create_website_token_directory(user, document_root) jail_utils.create_overlay_storage_directory(user, document_root) except Exception as e: logging.error("Unable to recreate token/storage for domain=%s, Error=%s", domain, e) continue reload_processes_with_docroots(user, filter_by_docroots=document_roots) def disable_website_isolation(user: str, domain: str | None = None): if not user_exists(user): logging.warning( "User %s not found, cannot disable website isolation", user) return user_config = config.load_user_config(user) reload_docroots = None if domain is None: reload_docroots = [ _get_docroot_or_none(website) for website in user_config.enabled_websites ] user_config.enabled_websites = [] elif domain in user_config.enabled_websites: reload_docroots = [_get_docroot_or_none(domain)] user_config.enabled_websites.remove(domain) config.save_user_config(user, user_config) write_jail_mounts_config(user, user_config) if reload_docroots: reload_processes_with_docroots(user, filter_by_docroots=reload_docroots) for document_root in reload_docroots: if document_root is None: continue jail_utils.remove_website_token_directory(user, document_root) # get actual docroots for all users with website isolation enabled users_with_isolation = users_with_enabled_domain_isolation() if not users_with_isolation: stop_monitoring_service()