var/opt/nydus/ops/customer_local_ops/control_panel/linux_plesk.py000064400000055752147205630250021565 0ustar00# -*- coding: utf-8 -*- from typing import Dict, Any import json import logging import os import re import shlex import sys from enum import IntEnum from fileinput import FileInput from customer_local_ops import OpType, ResourceType, NydusResult from customer_local_ops.operating_system.linux import (AlmaLinux8, AlmaLinux9, Linux, CentOS, CentOS6, CentOS7, Debian, Debian8, Debian10, Debian11, Debian12, Ubuntu1604, Ubuntu2004, Ubuntu2204, Ubuntu2404) from customer_local_ops.control_panel.plesk import OSPlesk from customer_local_ops.util.execute import runCommand, run_uapi_command, run_multiple_uapi_commands from customer_local_ops.util.retry import Retry LOG = logging.getLogger(__name__) PLESK_DIR_1 = "/opt/psa" PLESK_DIR_2 = "/usr/local/psa" class LinuxPlesk(Linux, OSPlesk): """ Plesk Customer Local Ops for the Linux OS. All function names should contain 'plesk' so as not to override the OS ops """ op_type = OpType.CONTROL_PANEL_OPERATING_SYSTEM PLESK_INSTALL_LOCATIONS = [ PLESK_DIR_1, PLESK_DIR_2, ] plesk_dir = None RETRYABLE_ERRS = ['The Plesk administrator password cannot be changed until the server cloning is finished', 'No connection could be made because the target machine actively refused it', 'Could not resolve host'] def get_plesk_dir(self) -> str: """Find the installation directory for plesk :return: full path to plesk installation """ # Look for plesk in any of several locations. On Ubuntu, it's currently installed in # /opt/psa and on CentOS, it can be found in /usr/local/psa. The list below could # easily be made into a configurable item so that new potential locations could be # added without having to change the code. if self.plesk_dir is not None: return self.plesk_dir for loc in self.PLESK_INSTALL_LOCATIONS: if os.path.isdir(loc): LOG.info(loc) self.plesk_dir = loc return self.plesk_dir raise RuntimeError("plesk path could not be found") def license_plesk(self, activation_key: str, *args: Any, intermediate_result: Dict[str, Any] = None) -> NydusResult: """Run license utility on local vm with activation key :param activation_key: Key value passed back from license plesk op on hfs executor :param intermediate_result: Dict containing metadata for retries :return: tuple with success, error data """ LOG.info("install plesk license on '%s'", str(self)) op_name = 'license_plesk' command = 'license' full_command = self.get_path_plesk(command) LOG.info("full command: %s", full_command) exit_code, outs, errs = run_uapi_command([full_command, '--install', activation_key], 'set Plesk License', 'license_plesk') return self._result_handler(exit_code, outs, errs, op_name, intermediate_result=intermediate_result) # pylint: disable=too-many-locals def enable_plesk(self, vm_ip: str, vm_resource: str, plesk_user: str, plesk_pass: str, *args: Any, intermediate_result: Dict[str, Any] = None) -> NydusResult: """Enable plesk on local vm :param vm_ip: External IP address of the VM :param vm_resource: The resource name for the third-party hosting provider :param plesk_user: User name to be used on Plesk instance :param plesk_pass: Password to be used on Plesk instance :param intermediate_result: Dict containing metadata for retries :raises DecryptError: if there is a problem with decrypting the password :return: tuple with success, error data """ LOG.info("enable plesk") op_name = self.OP_ENABLE_PLESK password_arg = shlex.quote(self.decrypt(plesk_pass)) ops_map = self.PLESK_OPS_RESOURCE_ATTRIBUTE_MAP[ResourceType(vm_resource)][op_name] init_conf_cmd = self.get_path_plesk('init_conf') check_configured_cmd = '{} --check-configured'.format(init_conf_cmd) exit_code, outs, errs = run_uapi_command(check_configured_cmd, "Check Plesk configured", op_name, use_shell=True) if exit_code != 0: # Plesk is not configured init_conf_setup_flag = '--init' else: # Plesk is configured init_conf_setup_flag = '--update' prep_cmds = [ ('set minimum password strength', '{server_pref} -u -min_password_strength medium'.format( server_pref=self.get_path_plesk('server_pref'))), ('setup Plesk', ops_map[self.SETUP_CMD].format( init_conf_cmd=init_conf_cmd, # init_conf_setup_flag=ops_map[self.INIT_CONF_SETUP_FLAG], init_conf_setup_flag=init_conf_setup_flag, plesk_user=plesk_user, password=password_arg)), ('check config', check_configured_cmd), ('set Poweruser', '{set_poweruser_cmd} --on'.format(set_poweruser_cmd=self.get_path_plesk('poweruser'))), ('set auto updates', 'plesk db "INSERT INTO misc(param, val) ' + 'VALUES(\'automaticSystemPackageUpdates\', \'true\') ON DUPLICATE KEY UPDATE val = \'true\';"') ] if ops_map[self.RUN_HIDE_INTERNAL_IP]: prep_cmds += [ ('hide internal ip', '''sed -i'' 's/blacklist=".*"/blacklist="{vm_ip}"/' ''' '/usr/local/psa/admin/conf/panel.ini'.format(vm_ip=vm_ip)), ('reread ips', 'plesk bin ipmanage --reread') ] if ops_map[self.RUN_DISABLE_SESSION_IP_CHECK]: prep_cmds += [ ('disable session ip check', 'plesk db "' 'INSERT INTO misc(param,val) ' 'VALUES(\'disable_check_session_ip\', \'true\') ' 'ON DUPLICATE KEY UPDATE val = \'true\';"') ] exit_code, outs, errs = run_multiple_uapi_commands(prep_cmds, op_name, use_shell=True, omit_string=password_arg) return self._result_handler(exit_code, outs, errs, op_name, intermediate_result=intermediate_result) def site_list_plesk(self, *args: Any, intermediate_result: Dict[str, Any] = None) -> Any: """Retrieve a list of Plesk sites :param intermediate_result: Dict containing metadata for retries :return: list of sites or tuple with error data """ LOG.info("get site list") site_list = {} exit_code, outs, errs = runCommand(self.get_path_plesk('subscription') + " --list", "site_list_plesk: get_subscriptions", useShell=True) if exit_code != 0: return self._result_handler(exit_code, outs, errs, 'site_list_plesk: get_subscriptions', intermediate_result) site_list['subscriptions'] = outs.split('\n') site_cmd = self.get_path_plesk('site') exit_code, outs, errs = runCommand(site_cmd + " --list", "site_list_plesk: get_sites", useShell=True) if exit_code != 0: return self._result_handler(exit_code, outs, errs, 'site_list_plesk: get_sites', intermediate_result) site_list['sites'] = outs.split('\n') required_fields = {'FTP Login': 'ftp_login', 'IP address': 'ip_address', 'Disk space used by httpdocs': 'diskused', 'Hosting type': 'webspace'} for i, site in enumerate(site_list['sites']): site_data = {'name': site} exit_code, outs, errs = runCommand(site_cmd + " --info " + site, "site_list_plesk: get_site_info", useShell=True) if exit_code != 0: return self._result_handler(exit_code, outs, errs, 'site_list_plesk: get_site_info', intermediate_result) output = outs.split('\n') for line in output: data = [x.strip() for x in line.split(':')] logging.info("data: %s", data) if len(data) == 2: key = data[0] if key in required_fields: site_data[required_fields[key]] = data[1] site_list['sites'][i] = site_data logging.info(json.dumps(site_list)) return site_list def server_prep_plesk(self, *args: Any, intermediate_result: Dict[str, Any] = None) -> NydusResult: """ Install Plesk on a server :param intermediate_result: Dict containing metadata for retries :return: tuple with success, error data """ LOG.info("install plesk on '%s'", str(self)) op_name = 'server_prep_plesk' # download plesk-installer. keep: provides compatability for updates, troubleshooting, etc. prepCmds = [ ('Download Plesk', 'wget https://autoinstall.plesk.com/plesk-installer'), ('Modify Perms', 'chmod +x plesk-installer'), ('Install Plesk', 'sh plesk-installer --select-product-id=plesk --installation-type Typical ' + '--select-release-latest --notify-email admin@example.com'), ('Delete Docker Interface', 'ip link del docker0') ] for purpose, cmd in prepCmds: exit_code, outs, errs = run_uapi_command(cmd, purpose, op_name, use_shell=True) if exit_code != 0: return self._result_handler(exit_code, outs, errs, op_name, intermediate_result=intermediate_result) return self._result_handler(exit_code, outs, errs, op_name, intermediate_result=intermediate_result) def get_client_plesk(self, *args: Any, intermediate_result: Dict[str, Any] = None) -> Any: """ Get Plesk SSO URL :param intermediate_result: Dict containing metadata for retries :return: SSO URL string or tuple with error data """ exit_code, outs, errs = runCommand(self.get_path_plesk('admin') + " --get-login-link", "get sso link", useShell=True) if exit_code != 0: return self._result_handler(exit_code, outs, errs, 'get_client_plesk', intermediate_result=intermediate_result) links = outs.split('\n') sso_url = str(links[0]) return self.encrypt(sso_url).decode('utf-8') def change_hostname_plesk(self, hostname: str, *args: Any, intermediate_result: Dict[str, Any] = None) -> NydusResult: """ Change hostname on Plesk :param hostname: The new server hostname :param intermediate_result: Dict containing metadata for retries :return: tuple with success, error data """ op_name = 'change_hostname_plesk' rcmd = self.get_path_plesk('server_pref') + ' --update' + ' -hostname ' + hostname exit_code, outs, errs = run_uapi_command(rcmd, 'set Plesk hostname', op_name, use_shell=True) return self._result_handler(exit_code, outs, errs, op_name, intermediate_result=intermediate_result) def change_admin_password_plesk(self, plesk_admin_pass: str, *args: Any, intermediate_result: Dict[str, Any] = None) -> NydusResult: """ Change admin password on Plesk :param plesk_admin_pass: Encrypted password for Plesk admin user :param intermediate_result: Dict containing metadata for retries :return: tuple with success, error data """ op_name = 'change_admin_password_plesk' password = self.decrypt(plesk_admin_pass) # Perform operations on Control Panel plesk_bin = self.get_path_plesk('init_conf') command = [plesk_bin, '--set-admin-password', '-passwd', password] exit_code, outs, errs = run_uapi_command( command, 'set Plesk admin password', op_name, omit_string=password) return self._result_handler(exit_code, outs, errs, op_name, intermediate_result=intermediate_result) def configure_postfix(self, relay_address: str, op_name: str, intermediate_result: Dict[str, Any] = None) -> NydusResult: """Configure the postfix MTA. :param relay_address: IP address or host name of the mail relay to set :param op_name: name of the operation being executed :param intermediate_result: Dict containing metadata for retries """ if relay_address is None: return class SearchState(IntEnum): SEARCHING = 0 SKIPPING = 1 DONE = 2 # In the configuration file distributed with postfix, there are a either a number of commented-out relayhost # lines or a blank relay host line. Add our new line immediately after the last commented-out relayhost line # or replace the blank relayhost line try: relayhost_cmt = re.compile(r'^[ \t]*#[ \t]*relayhost[ \t]*=') relayhost_blnk = re.compile(r'^[ \t]*relayhost[ \t]*=') relayhost_line = "relayhost = [{}]\n".format(relay_address) state = SearchState.SEARCHING with FileInput('/etc/postfix/main.cf', inplace=True) as stream: for line in stream: if relayhost_blnk.match(line): sys.stdout.write(relayhost_line) state = SearchState.DONE continue if relayhost_cmt.match(line): if state == SearchState.SEARCHING: state = SearchState.SKIPPING else: if state == SearchState.SKIPPING: # We've found the first line after the other commented-out relayhost lines # Add the new relayhost line here and set the state to indicate that we've # finished. sys.stdout.write(relayhost_line) state = SearchState.DONE sys.stdout.write(line) # If we get here and the state isn't DONE, then just add the line to the end of the file if state != SearchState.DONE: with open('/etc/postfix/main.cf', 'a', encoding='utf-8') as f: f.write(relayhost_line) except Exception as ex: # pylint: disable=broad-except LOG.error("cp_os_op %s.configure_postfix result(fail): %s", op_name, ex) return self._result_handler(1, '', str(ex), op_name, intermediate_result=intermediate_result) LOG.info("restarting postfix") my_op_name = op_name + ': restart postfix' return self.run_command_and_handle_result(['systemctl', 'restart', 'postfix'], my_op_name, intermediate_result=intermediate_result) def set_outgoing_email_ip(self, address: str, *args: Any, intermediate_result: Dict[str, Any] = None) -> NydusResult: """Set Plesk's outgoing e-mail IP address. This only works with Postfix mail server. See General #6 for more information: https://docs.plesk.com/en-US/obsidian/administrator-guide/mail/configuring-serverwide-mail-settings.59430/ :param address: IP address from which to send e-mail :param intermediate_result: Dict containing result data and meta data for retries :return: Nydus operation result """ return self.run_command_and_handle_result( [self.get_path_plesk(filename='mailserver'), '--set-outgoing-email-mode', 'explicit-ip', '-explicit-ipv4', address], 'set_outgoing_email_ip', intermediate_result=intermediate_result) class CentOSPlesk(CentOS, LinuxPlesk): PLESK_INSTALL_LOCATIONS = [ PLESK_DIR_2 ] def configure_mta(self, payload, unused=None, intermediate_result=None) -> NydusResult: """Configure mail transfer agent on a CentOS-based Plesk server :param payload: Dict containing op params :param intermediate_result: Dict containing result data and meta data for retries :return: tuple with success, retry or error data """ LOG.info("%s.configure_mta", self) return self.do_configure_mta(payload, 'CentOSPlesk.configure_mta', intermediate_result=intermediate_result) def do_configure_mta(self, payload: Dict[str, Any], op_name: str, intermediate_result: Dict[str, Any] = None) -> NydusResult: """Configure mail transfer agent on a CentOS-based Plesk server :param payload: Dict containing op params :param op_name: The name of the op, including the classname :param intermediate_result: Dict containing result data and meta data for retries :return: tuple with success, retry or error data """ LOG.info("%s.do_configure_mta", self) result = self.install_postfix(op_name, intermediate_result=intermediate_result) if isinstance(result, Retry) or not result[0]: return result relay_address = payload.get('relay_address') LOG.info("%s %s start relay_address: %s", self.get_op_type().value, op_name, relay_address) return self.configure_postfix(relay_address, op_name, intermediate_result=intermediate_result) def install_postfix(self, op_name: str, intermediate_result: Dict[str, Any] = None) -> NydusResult: """Install postfix on a CentOS-based Plesk server :param op_name: The name of the op, including the classname :param intermediate_result: Dict containing result data and meta data for retries :return: tuple with success, retry or error data """ LOG.info("%s.install_postfix", self) my_op_name = op_name + ': yum_clean_all' exit_code, outs, errs = self._run_yum_command(['yum', 'clean', 'all'], my_op_name) if exit_code != 0: return self._result_handler(exit_code, outs, errs, my_op_name, intermediate_result=intermediate_result) my_op_name = op_name + ': remove_sendmail' exit_code, outs, errs = self._run_yum_command(['yum', '-y', 'remove', 'sendmail'], my_op_name) if exit_code != 0: return self._result_handler(exit_code, outs, errs, my_op_name, intermediate_result=intermediate_result) my_op_name = op_name + ': install postfix' exit_code, outs, errs = self._run_yum_command(['yum', '-y', 'install', 'postfix'], my_op_name) return self._result_handler(exit_code, outs, errs, my_op_name, intermediate_result=intermediate_result) class CentOS6Plesk(CentOS6, CentOSPlesk): # pylint: disable=too-many-ancestors def __init__(self, *args, **kwargs): # pylint: disable=super-init-not-called raise NotImplementedError class CentOS7Plesk(CentOS7, CentOSPlesk): # pylint: disable=too-many-ancestors pass class AlmaLinux8Plesk(AlmaLinux8, CentOSPlesk): # pylint: disable=too-many-ancestors pass class AlmaLinux9Plesk(AlmaLinux9, CentOSPlesk): # pylint: disable=too-many-ancestors pass class DebianPlesk(Debian, LinuxPlesk): def configure_mta(self, payload, unused=None, intermediate_result=None) -> NydusResult: """Configure mail transfer agent on a Debian-based Plesk server :param payload: Dict containing op params :param intermediate_result: Dict containing result data and meta data for retries :return: tuple with success, retry or error data """ LOG.info("DebianPlesk.configure_mta") return self.do_configure_mta(payload, 'DebianPlesk.configure_mta', intermediate_result=intermediate_result) def do_configure_mta(self, payload: Dict[str, Any], op_name: str, intermediate_result: Dict[str, Any] = None) -> NydusResult: """Configure mail transfer agent on a Debian-based Plesk server :param payload: Dict containing op params :param op_name: The name of the op, including the classname :param intermediate_result: Dict containing result data and meta data for retries :return: tuple with success, retry or error data """ LOG.info("DebianPlesk.do_configure_mta") result = self.install_postfix(op_name, intermediate_result=intermediate_result) if isinstance(result, Retry) or not result[0]: return result relay_address = payload.get('relay_address') LOG.info("%s %s start relay_address: %s", self.get_op_type().value, op_name, relay_address) return self.configure_postfix(relay_address, op_name, intermediate_result=intermediate_result) def install_postfix(self, op_name: str, intermediate_result: Dict[str, Any] = None) -> NydusResult: """Install postfix on a CentOS-based Plesk server :param op_name: The name of the op, including the classname :param intermediate_result: Dict containing result data and meta data for retries :return: tuple with success, retry or error data """ LOG.info("DebianPlesk.install_postfix") my_op_name = op_name + ': remove_sendmail' exit_code, outs, errs = runCommand(['apt-get', 'remove', '-y', 'sendmail'], my_op_name) if exit_code != 0: return self._result_handler(exit_code, outs, errs, my_op_name, intermediate_result=intermediate_result) result = self._install('postfix') if isinstance(result, Retry): return result my_op_name = op_name + ': install postfix' exit_code, outs, errs = result return self._result_handler(exit_code, outs, errs, my_op_name, intermediate_result=intermediate_result) class Debian8Plesk(Debian8, DebianPlesk): # pylint: disable=too-many-ancestors def __init__(self, *args, **kwargs): # pylint: disable=super-init-not-called raise NotImplementedError class Debian10Plesk(Debian10, DebianPlesk): # pylint: disable=too-many-ancestors def __init__(self, *args, **kwargs): # pylint: disable=super-init-not-called raise NotImplementedError class Debian11Plesk(Debian11, DebianPlesk): # pylint: disable=too-many-ancestors def __init__(self, *args, **kwargs): # pylint: disable=super-init-not-called raise NotImplementedError class Debian12Plesk(Debian12, DebianPlesk): # pylint: disable=too-many-ancestors def __init__(self, *args, **kwargs): # pylint: disable=super-init-not-called raise NotImplementedError class Ubuntu1604Plesk(Ubuntu1604, DebianPlesk): # pylint: disable=too-many-ancestors PLESK_INSTALL_LOCATIONS = [ PLESK_DIR_1 ] class Ubuntu2004Plesk(Ubuntu2004, DebianPlesk): # pylint: disable=too-many-ancestors PLESK_INSTALL_LOCATIONS = [ PLESK_DIR_1 ] class Ubuntu2204Plesk(Ubuntu2204, DebianPlesk): # pylint: disable=too-many-ancestors PLESK_INSTALL_LOCATIONS = [ PLESK_DIR_1 ] class Ubuntu2404Plesk(Ubuntu2404, DebianPlesk): # pylint: disable=too-many-ancestors PLESK_INSTALL_LOCATIONS = [ PLESK_DIR_1 ]