from datetime import datetime from typing import Any, Dict, List, Tuple, Union from enum import Enum import importlib import logging import shutil from primordial.constants import CANONICAL_TIMESTRING_FORMAT from primordial.encryptor import Encryptor from customer_local_ops.exceptions import EncryptionKeyNotFound from customer_local_ops.util.execute import runCommand from customer_local_ops.util.helpers import create_file from customer_local_ops.util.retry import Retry LOG = logging.getLogger(__name__) class OpType(Enum): CONTROL_PANEL = "cp_op" OPERATING_SYSTEM = "os_op" CONTROL_PANEL_OPERATING_SYSTEM = "cp_os_op" UNKNOWN = "unknown" # TODO: Move to central repo class ResourceType(Enum): OVH = "ovh" OPENSTACK = "openstack" VIRTUOZZO_VM = "virtuozzo_vm" OPENSTACK_HOSTINGCLOUD = "openstack_hostingcloud" RESOURCE_TYPES = list(ResourceType) NydusResult = Union[Retry, Tuple[bool, Dict[str, Any]], Tuple[bool, Dict[str, Any], int]] class OpResult: """ Class whose objects encapsulate the complete result of an operation. """ def __init__(self, success: bool = False, result: Dict[str, Any] = None, retry_interval: int = None): """ Initialize an OpResult instance :param success: Indicates whether or not the operation was successful :param result: A dictionary containing any messages or other data from the operation :param retry_interval: An optional retry interval, in seconds. If this value is greater that or equal to zero, the operation will be retried in that many seconds. """ self.success = success self.result = result self.retry_interval = retry_interval @property def errs(self) -> str: """Convenience property for accessing the 'errs` key in the result dict""" return self.result.get('errs', '') if isinstance(self.result, dict) else '' @property def outs(self) -> str: """Convenience property for accessing the 'outs` key in the result dict""" return self.result.get('outs', '') if isinstance(self.result, dict) else '' def as_tuple(self) -> NydusResult: """ Construct and return a correctly-sized tuple, based on whether or not a retry_interval value is present, for passing back to Nydus. """ return ((self.success, self.result) if self.retry_interval is None else (self.success, self.result, self.retry_interval)) class Ops: DISK_UTILIZATION_PATH = None RETRYABLE_ERRS = [] PANOPTA_MANIFEST_FILE = '/etc/panopta-agent-manifest' DISTRO = None # override OS_VERSION = None # override QEMU_PACKAGE_NAME = None # override def __str__(self): return self.__class__.__name__ op_type = OpType.UNKNOWN def get_op_type(self): return self.op_type def _get_vm_tag(self): """Return this VM's tag. Same return format as runCommand.""" raise NotImplementedError def _encryption_key(self): """The encryption key.""" exit_code, outs, errs = self._get_vm_tag() if exit_code != 0 or not outs: LOG.error('Could not obtain encryption key') raise EncryptionKeyNotFound(exit_code, outs, errs) return outs def decrypt(self, encrypted): """Decrypt an encrypted value.""" try: key = self._encryption_key() return Encryptor.decrypt(encrypted, key) except Exception: LOG.exception('Decrypt failed') raise def encrypt(self, target: str) -> str: """ Encrypt a plain-text value. :param target: The value to be encrypted :return: An encrypted string """ try: key = self._encryption_key() return Encryptor.encrypt(target, key) except Exception: LOG.exception('Encrypt failed') raise def write_file(self, path, content): LOG.debug('Writing file: %s', path) with open(path, 'w', encoding='utf-8') as f: f.write(content) def get_os_op(self, os_op: str, control_panel_class_name: str = None) -> "Ops": """ Get an instance of an operating system Ops class or ControlPanel-OS Ops class :param os_op: The operating system class name e.g. linux.CentOS7 :param control_panel_class_name: The control panel class name e.g. cpanel.CPanel :return: An instance of the operating system class e.g. CentOS7 or CP-OS class e.g. CentOS7CPanel """ parts = os_op.split(".") # Last part of os_op should be the family name followed by the os class name, e.g. linux.CentOS7 os_name = parts[-1] family_name = parts[-2] prefix = "customer_local_ops" if control_panel_class_name is None and self.get_op_type() == OpType.CONTROL_PANEL: # If we're requesting to get the os op class from a control panel class, # assume we want the operating system/control panel hybrid class control_panel_class_name = str(self) if control_panel_class_name is not None: # Return the operating system/control panel hybrid class os_name = os_name + control_panel_class_name cp_class_name = control_panel_class_name.lower() module_path = "{prefix}.control_panel.{family_name}_{cp_class_name}".format(prefix=prefix, family_name=family_name, cp_class_name=cp_class_name) else: module_path = "{prefix}.operating_system.{family_name}".format(prefix=prefix, family_name=family_name) mod = importlib.import_module(module_path) class_ = getattr(mod, os_name) os_op = class_() return os_op def build_result_dict(self, outs: str, errs: str, op_name: str) -> Dict[str, Any]: """ This function takes command output (outs, errs) and builds and returns a result dict with op metadata and errors if appropriate :param outs: The stdout from the run command :param errs: The stderr from the run command :param op_name: the name of the op calling this function :return: a result dict """ unicode_null = "\u0000" outs = outs.replace(unicode_null, "") errs = errs.replace(unicode_null, "") result = {'outs': outs, 'errs': errs, 'cls_name': str(self), 'op_type': self.get_op_type().value, 'op_name': op_name} return result def get_result_data(self, result: Any) -> OpResult: """ This function takes a result tuple or result dict and parses it to a result tuple of a consistent format that can then be used by ops to inspect the result or build another result based on the first one :param result: a dict or tuple result from an op :return: A data structure containing the complete result """ if isinstance(result, tuple): result_len = len(result) if result_len == 1: return OpResult(True, result[0]) if result_len == 2: return OpResult(result[0], result[1]) if result_len == 3: return OpResult(result[0], result[1], result[2]) # Should never get here raise ValueError("result tuple has too many elements: {}".format(result_len)) return OpResult(True, result) def build_result_from_other_result(self, result: Any, op_name: str) -> NydusResult: """ This function takes a result tuple/dict/Retry/str from a secondary op (usually an os op) and (if not a Retry) extracts the metadata, output and errors from it and places it inside a result with metadata from the primary op (usually a control panel op). In this way it is clear: a). what the primary op was and b). that the error/output is coming from the secondary op :param result: a dict or tuple result from an op :param op_name: the name of the op calling this function :return: a result dict, tuple or Retry instance """ if isinstance(result, Retry): return result data = self.get_result_data(result) if isinstance(data.result, dict): op_type = data.result.get('op_type', self.get_op_type().value) op_name_other = data.result.get('op_name', op_name) cls_name = data.result.get('cls_name', str(self)) prefix = "{op_type} {cls_name}.{op_name_other}".format(op_type=op_type, cls_name=cls_name, op_name_other=op_name_other) outs = "{prefix} outs: {outs}".format(prefix=prefix, outs=data.outs) errs = data.errs if errs: errs = "{prefix} errs: {errs}".format(prefix=prefix, errs=data.errs) else: if data.success: outs = str(data.result) errs = '' else: outs = '' errs = str(data.result) data.result = self.build_result_dict(outs, errs, op_name) return data.as_tuple() def build_result_from_cmd_output(self, exit_code: int, outs: str, errs: str, op_name: str, success_message: str = None) -> NydusResult: """ This function takes a result from a command output (exit_code, outs, errs) and builds and returns a result tuple with op metadata and errors if appropriate :param exit_code: The exit code returned from the run command :param outs: The stdout from the run command :param errs: The stderr from the run command :param op_name: the name of the op calling this function :param success_message: A message to use in place of 'outs' if the command ran successfully (optional) :return: a result tuple """ success = exit_code == 0 if success and success_message is not None: outs = success_message return success, self.build_result_dict(outs, errs, op_name) def run_command_and_handle_result(self, command: Union[List[str], str], op_name: str, *args: Any, intermediate_result: Dict[str, Any] = None, **kw: Any) -> NydusResult: """Run command, return result suitable for a Nydus operation. :param command: command list or string as per runCommand :param op_name: name of the operation; used as runCommand tag and result op_name :param args: additional positional arguments for runCommand :param intermediate_result: Dict containing metadata for retries :param kw: additional keyword arguments for runCommand :return: command result as a Nydus operation result """ exit_code, outs, errs = runCommand(command, op_name, *args, **kw) return self._result_handler(exit_code, outs, errs, op_name, intermediate_result=intermediate_result) # def configure_ips(self, network_node): # raise NotImplementedError # def patch_system(self, payload): # raise NotImplementedError # def patch_system_specific_update(self, payload): # raise NotImplementedError # def list_vm_patches(self, payload): # raise NotImplementedError def add_user(self, payload, unused=None): raise NotImplementedError # def remove_user(self, payload): # raise NotImplementedError # def change_password(self, payload): # raise NotImplementedError # def change_hostname(self, payload): # raise NotImplementedError # def install_package(self, payload): # raise NotImplementedError # def uninstall_package(self, payload): # raise NotImplementedError def shutdown_clean(self, unused=None): # The `unused` param is an artifact of Archon workflows requiring an I/O # chain for sequencing. raise NotImplementedError # def stop_web_server(self, none): # raise NotImplementedError # def start_web_server(self, none): # raise NotImplementedError # def prepare_web_server_ssl(self, none): # raise NotImplementedError # def deploy_cert(self, payload): # raise NotImplementedError # def install_cert(self, payload): # raise NotImplementedError # def configure_mta(self, payload): # raise NotImplementedError def enable_admin(self, username, unused=None): # The `unused` param is an artifact of Archon workflows requiring an I/O # chain for sequencing. raise NotImplementedError def disable_admin(self, username, unused=None): # The `unused` param is an artifact of Archon workflows requiring an I/O # chain for sequencing. raise NotImplementedError def disable_all_admins(self, unused=None): # The `unused` param is an artifact of Archon workflows requiring an I/O # chain for sequencing. raise NotImplementedError # def snapshot_clean(self, payload): # raise NotImplementedError # def snapshot_prep(self, payload): # raise NotImplementedError def _get_cpu_utilization(self) -> Dict[str, float]: """Return current CPU utilization. :returns: dictionary with keys: - cpuUsed: percentage in range 0-100 """ raise NotImplementedError def _get_disk_utilization(self) -> Dict[str, int]: """Return current disk utilization. :returns: dictionary with keys: - diskTotal: mebibytes - diskUsed: mebibytes """ disk = shutil.disk_usage(self.DISK_UTILIZATION_PATH) return {'diskTotal': disk.total >> 20, # B > MiB 'diskUsed': disk.used >> 20} def _get_memory_utilization(self) -> Dict[str, int]: """Return current memory utilization. :returns: dictionary with keys: - memoryTotal: mebibytes - memoryUsed: mebibytes """ raise NotImplementedError def get_utilization(self) -> Dict[str, Union[float, int, str]]: """Return current system utilization. :returns: dictionary with keys: - collected: current time in :data:`~primordial.constants.CANONICAL_TIMESTRING_FORMAT` - memoryTotal: mebibytes - memoryUsed: mebibytes - diskTotal: mebibytes - diskUsed: mebibytes - cpuUsed: percentage in range 0-100 """ result = {} for stats in (self._get_cpu_utilization(), self._get_disk_utilization(), self._get_memory_utilization()): result.update(stats) result['collected'] = datetime.utcnow().strftime( CANONICAL_TIMESTRING_FORMAT) return result def _create_panopta_manifest_file(self, payload): """ Panopta agent uses a manifest file for configuration of the customer id and templates to use. This method creates that manifest file. """ manifest_file = self.PANOPTA_MANIFEST_FILE customer_key = payload['customer_key'] template_ids = payload['template_ids'] cust_key_contents = """[agent] customer_key = {customer_key} templates = {template_ids} enable_countermeasures = false""".format(customer_key=customer_key, template_ids=template_ids) server_key = payload.get('server_key') if server_key: cust_key_contents += "\nserver_key = {server_key}".format(server_key=server_key) fqdn = payload.get('fqdn') if fqdn: cust_key_contents += "\nfqdn = {fqdn}".format(fqdn=fqdn) server_name = payload.get('serverName') if server_name: cust_key_contents += "\nserver_name = {serverName}".format(serverName=server_name) disable_server_match = payload.get('disable_server_match', False) if disable_server_match: cust_key_contents += "\ndisable_server_match = true" create_file(manifest_file, cust_key_contents) def install_panopta(self, payload, *args, **kwargs): raise NotImplementedError def delete_panopta(self, *args, **kwargs): raise NotImplementedError def get_panopta_server_key(self, *args, **kwargs): raise NotImplementedError def _result_handler(self, exit_code: int, outs: str, errs: str, op_name: str, intermediate_result: Dict[str, Any] = None) -> NydusResult: """Take the result from a run command and check for retryable errors. If found, return a Retry. If not, return a Nydus result tuple. :param exit_code: The exit code from the executed command :param outs: The stdout output from the executed command :param errs: The stderr output from the executed command :param op_name: The name of the op :param intermediate_result: Dict containing metadata for retries :return: A Nydus result tuple, or Retry """ LOG.info(self.RETRYABLE_ERRS) success, result = self.build_result_from_cmd_output(exit_code, outs, errs, op_name, op_name + " succeeded") if not success: for retryable_err in self.RETRYABLE_ERRS: if retryable_err in errs: if intermediate_result is None: intermediate_result = {} intermediate_result['retryable_error'] = retryable_err return Retry(intermediate_result) return success, result