# -*- coding: utf-8 -*- from subprocess import list2cmdline from typing import Any, Dict, Tuple, Union, List import logging import glob import json import os from customer_local_ops import OpType, ResourceType, NydusResult from customer_local_ops.operating_system.windows import Windows, Windows2016, Windows2019, Windows2022 from customer_local_ops.control_panel import SCRIPT_BASEPATH from customer_local_ops.control_panel.plesk import OSPlesk from customer_local_ops.util.execute import (runCommand, run_powershell_file, run_uapi_command, run_multiple_uapi_commands, start_powershell_file) from customer_local_ops.util.retry import Retry LOG = logging.getLogger(__name__) PLESK_DIR_17 = 'C:\\Program Files (x86)\\Plesk\\' PLESK_DIR_12 = 'C:\\Program Files (x86)\\Parallels\\Plesk\\' # Number of seconds for Nydus to wait before retrying the previous workflow stop if a DLL conflict was detected DLL_CONFLICT_COMMAND_RETRY_INTERVAL = 10 PLESK_FIX_DLL_CONFLICT_SCRIPT_LOG_FILE = r'C:\Windows\TEMP\plesk-fix-dll-conflict.log' CmdsType = Union[str, List[str], List[Tuple[str, Union[str, List[str]]]]] def has_dll_version_conflict(text: str) -> bool: """ Check to see of the specified text contains an indication of the Python/PHP DLL conflict :param text: The text to be checked :return: True if the conflict is detected; otherwise, False """ # 'vcruntime140.dll' 14.0 is not compatible with this PHP build linked with 14.16 in Unknown on line 0 conflict_detected = "'vcruntime140.dll' 14.0 is not compatible" in text if conflict_detected: LOG.debug("DLL conflict detected") else: LOG.debug("No DLL conflict detected") return conflict_detected class WindowsPlesk(Windows, OSPlesk): """ Plesk Customer Local Ops for the Windows OS. All function names should contain 'plesk' so as not to override the OS ops """ op_type = OpType.CONTROL_PANEL_OPERATING_SYSTEM plesk_dir = None RETRYABLE_ERRS = ['The Plesk administrator password cannot be changed until the server cloning is finished', 'No connection could be made because the target machine actively refused it', 'Could not resolve host'] def get_plesk_dir(self) -> str: """Find the installation directory for plesk :raises RuntimeError: If a plesk path cannot be found :return: full path to plesk installation """ if self.plesk_dir is None: # environment variable for plesk dir. if os.getenv('plesk_dir'): self.plesk_dir = os.getenv('plesk_dir') # plesk 17 default elif os.path.exists(PLESK_DIR_17): self.plesk_dir = PLESK_DIR_17 # plesk 12 default elif os.path.exists(PLESK_DIR_12): self.plesk_dir = PLESK_DIR_12 # go find a plesk! else: pleskpath = glob.glob('C:\\Program Files (x86)\\**\\Plesk\\', recursive=True) if pleskpath and isinstance(pleskpath, list): self.plesk_dir = pleskpath[0] if self.plesk_dir is not None: LOG.info("get_plesk_dir: %s", self.plesk_dir) return self.plesk_dir raise RuntimeError("plesk path could not be found") def license_plesk(self, activation_key: str, *args: Any, intermediate_result: Dict[str, Any] = None) -> NydusResult: """Run license utility on local vm with activation key :param activation_key: Key value passed back from license plesk op on hfs executor :param intermediate_result: Dict containing metadata for retries :return: tuple with success, error data """ LOG.info("install plesk license on '%s'", str(self)) op_name = 'license_plesk' command = 'license.exe' full_command = self.get_path_plesk(command) LOG.info("full command: %s", full_command) cmd = '"{full_command}" --install {activation_key}'.format(full_command=full_command, activation_key=activation_key) exit_code, outs, errs = run_uapi_command(cmd, 'set Plesk License', 'license_plesk', use_shell=True) return self._result_handler(exit_code, outs, errs, op_name, intermediate_result) # pylint: disable=too-many-locals def enable_plesk(self, vm_ip: str, vm_resource: str, plesk_user: str, plesk_pass: str, *args: Any, intermediate_result: Dict[str, Any] = None) -> NydusResult: """Enable plesk on local server :param vm_ip: External IP address of the server :param vm_resource: The resource name for the third-party hosting provider :param plesk_user: User name to be used on Plesk instance :param plesk_pass: Password to be used on Plesk instance :param intermediate_result: Dict containing metadata for retries :raises DecryptError: if there is a problem with decrypting the password :return: tuple with success, error data """ LOG.info("enable plesk for VM IP %s", vm_ip) op_name = self.OP_ENABLE_PLESK password_arg = list2cmdline([self.decrypt(plesk_pass)]) ops_map = self.PLESK_OPS_RESOURCE_ATTRIBUTE_MAP[ResourceType(vm_resource)][op_name] init_conf_cmd = self.get_path_plesk('init_conf.exe') check_configured_cmd = '"{}" --check-configured'.format(init_conf_cmd) exit_code, outs, errs = run_uapi_command(check_configured_cmd, "Check Plesk configured", op_name, use_shell=True) res = self._result_handler(exit_code, outs, errs, op_name, intermediate_result) if isinstance(res, Retry): return res if not res[0]: # Plesk is not configured init_conf_setup_flag = '--init' else: # Plesk is configured init_conf_setup_flag = '--update' enable_cmds = [ ('set minimum password strength', '"{server_pref}" -u -min_password_strength medium'.format( server_pref=self.get_path_plesk('server_pref.exe'))), ('setup Plesk', ops_map[self.SETUP_CMD].format( init_conf_cmd='"{}"'.format(init_conf_cmd), init_conf_setup_flag=init_conf_setup_flag, plesk_user=plesk_user, password=password_arg)), ('set Poweruser', '"{poweruser_cmd}" --on'.format( poweruser_cmd=self.get_path_plesk('poweruser.exe'))), ('reconfigurator', '"{reconfigurator}" /check=Services /no-gui'.format( reconfigurator=self.get_path_plesk('reconfigurator.exe', ['admin', 'bin']))), ('Repair sslcerts', '"{}" repair web -sslcerts'.format(self.get_path_plesk('plesk.exe'))) ] exit_code, outs, errs = run_multiple_uapi_commands(enable_cmds, op_name, omit_string=password_arg) return self._result_handler(exit_code, outs, errs, op_name, intermediate_result) def site_list_plesk(self, *args: Any, intermediate_result: Dict[str, Any] = None) -> Any: """ Retrieve a list of Plesk sites :param intermediate_result: Dict containing metadata for retries :return: list of sites or tuple with error data """ LOG.info("get site list") op_name = 'site_list_plesk' plesk_dir_arg = [str(self.get_plesk_dir())] exit_code, outs, errs = run_powershell_file(SCRIPT_BASEPATH / 'powershell' / 'plesk_site_list.ps1', op_name, False, plesk_dir_arg) if exit_code != 0: return self._result_handler(exit_code, outs, errs, op_name, intermediate_result) logging.info(outs) return json.loads(outs) if outs else '' def server_prep_plesk(self, *args: Any, intermediate_result: Dict[str, Any] = None) -> NydusResult: """ Install Plesk on a server :param intermediate_result: Dict containing metadata for retries :return: tuple with success, error data """ LOG.info("install plesk on '%s'", str(self)) op_name = 'server_prep_plesk' exit_code, outs, errs = run_powershell_file( SCRIPT_BASEPATH / 'powershell' / 'plesk_server_prep.ps1', 'Prep Plesk') return self._result_handler(exit_code, outs, errs, op_name, intermediate_result) def get_client_plesk(self, *args: Any, intermediate_result: Dict[str, Any] = None) -> Any: """ Get Plesk SSO URL :param intermediate_result: Dict containing metadata for retries :return: SSO URL string or tuple with error data """ op_name = 'get_client_plesk' exit_code, outs, errs = runCommand(["plesk", "bin", "admin", "--get-login-link"], 'get sso link') if exit_code != 0: return self._result_handler(exit_code, outs, errs, op_name, intermediate_result) links = outs.split('\n') sso_url = str(links[0]) return self.encrypt(sso_url).decode('utf-8') def change_hostname_plesk(self, hostname: str, *args: Any, intermediate_result: Dict[str, Any] = None) -> NydusResult: """ Change hostname on Plesk :param hostname: New name to be assigned to the host :param intermediate_result: Dict containing metadata for retries :return: tuple with success, error data """ op_name = 'change_hostname_plesk' rcmd = '"{plesk_path}" --update -hostname {hostname}'.format(plesk_path=self.get_path_plesk('server_pref.exe'), hostname=hostname) exit_code, outs, errs = run_uapi_command(rcmd, 'set Plesk hostname', op_name, use_shell=True) return self._result_handler(exit_code, outs, errs, op_name, intermediate_result) def change_admin_password_plesk(self, plesk_admin_pass: str, *args: Any, intermediate_result: Dict[str, Any] = None) -> NydusResult: """ Change admin password on Plesk :param plesk_admin_pass: Encrypted password for Plesk admin user :param intermediate_result: Dict containing metadata for retries :return: tuple with success, error data """ op_name = 'change_admin_password_plesk' password = self.decrypt(plesk_admin_pass) # Perform operations on Control Panel command_kwargs = {'omit_string': password} plesk_bin = self.get_path_plesk(None, ['bin', 'admin']) command = [plesk_bin, '--set-admin-password', '-passwd', password] exit_code, outs, errs = run_uapi_command( command, 'set Plesk admin password', op_name, **command_kwargs) return self._result_handler(exit_code, outs, errs, op_name, intermediate_result) def _check_for_dll_conflict(self, exit_code: int, result: Dict[str, Any], intermediate_result: Dict[str, Any] = None) -> NydusResult: """ Check to see if the output of the previous command indicates that a DLL version conflict exists and, if so attempt remediation. :param exit_code: The exit code from the previous command :param result: The result dictionary from the previous command :param intermediate_result: Dict containing metadata for retries :return: A 2-tuple if no remediation is necessary, otherwise a 3-tuple where the last element is a retry interval in seconds """ if exit_code == 0: if intermediate_result is not None and intermediate_result.get('delete_log_file', False): try: os.unlink(PLESK_FIX_DLL_CONFLICT_SCRIPT_LOG_FILE) except Exception: # pylint: disable=broad-except pass # We don't care if we're trying to delete a file that doesn't exist return True, result if has_dll_version_conflict(result.get('errs', '')): LOG.error("DLL conflict detected; attempting to remediate") code, _, _ = start_powershell_file(SCRIPT_BASEPATH / 'powershell' / 'plesk_fix_dll_conflict.ps1') if code == 0: if intermediate_result is None: intermediate_result = {} intermediate_result['delete_log_file'] = True return Retry(intermediate_result) if code == 0 else (False, result) return False, result def _result_handler(self, exit_code: int, outs: str, errs: str, op_name: str, intermediate_result: Dict[str, Any] = None) -> NydusResult: """Take the result from a run command and check for retryable errors. If found, return a Retry. If not, return a Nydus result tuple. :param exit_code: The exit code from the executed command :param outs: The stdout output from the executed command :param errs: The stderr output from the executed command :param op_name: The name of the op :param intermediate_result: Dict containing metadata for retries :return: A Nydus result tuple, or Retry """ success, result = self.build_result_from_cmd_output(exit_code, outs, errs, op_name, op_name + " succeeded") if not success: dll_res = self._check_for_dll_conflict(exit_code, result, intermediate_result=intermediate_result) if isinstance(dll_res, Retry): return dll_res return super()._result_handler(exit_code, outs, errs, op_name, intermediate_result) return success, result class Windows2016Plesk(Windows2016, WindowsPlesk): pass class Windows2019Plesk(Windows2019, WindowsPlesk): pass class Windows2022Plesk(Windows2022, WindowsPlesk): pass