var/opt/nydus/ops/customer_local_ops/operating_system/linux.py000064400000272156147205632210021121 0ustar00# pylint: disable-msg=C0302 # To be addressed in https://jira.godaddy.com/browse/HPLAT-3186 import base64 import functools import glob import grp import json import logging import os import pwd import re import shlex import shutil import subprocess from datetime import datetime from enum import Enum from pathlib import Path from typing import Optional, Tuple, Union, List, Dict, Any from collections import OrderedDict from shutil import rmtree from cryptography.x509 import load_pem_x509_certificate from importlib_resources import read_text from customer_local_ops import Ops, OpType, NydusResult from customer_local_ops.exceptions import DecryptError from customer_local_ops.operating_system import SHELL_SCRIPT_PATH from customer_local_ops.operating_system.package_manager import Apt, AptEOL, PackageManager, Yum from customer_local_ops.util.execute import runCommand, RunCommandResult, run_command_pipe, run_shell_script_file from customer_local_ops.util.helpers import append_line, edit_file_lines, replace_line, create_file from customer_local_ops.util.retry import retry, Retry, RETRY # `unused` params are an artifact of Archon workflows requiring an I/O chain for sequencing. LOG = logging.getLogger(__name__) NYDUS_EXECUTOR_CRT_PATH = os.path.join(os.path.sep, 'opt', 'nydus', 'ssl', 'executor.crt') SUDOERS_PREFIX = os.path.join(os.path.sep, 'etc', 'sudoers.d') SYSTEM_USERS = 'nydus', '48-wp-toolkit' EXEMPT_SUDOERS_PREFIX = ('icinga', ) HOSTS_PATH = os.path.join(os.path.sep, 'etc', 'hosts') CLOUD_CONFIG_PATH = os.path.join(os.path.sep, 'etc', 'cloud', 'cloud.cfg') CLOUD_CONFIG_PRESERVE_HOSTNAME_PATH = os.path.join(os.path.sep, 'etc', 'cloud', 'cloud.cfg.d', '50_preserve_hostname.cfg') CLOUD_CONFIG_UPDATE_HOSTS_INIT_MODULE = 'update_etc_hosts' CENTOS6_YUM_REPO = '/etc/yum.repos.d/CentOS-Base.repo' HFS_COMMON_YUM_REPO = '/etc/yum.repos.d/hfs-common.repo' MAX_FILE_SIZE = 65536 SEMVER_RGX = re.compile( r'^(?P0|[1-9]\d*)\.(?P0|[1-9]\d*)\.(?P0|[1-9]\d*)\-(?P0|[1-9]\d*)$') MAJOR_ONLY_RGX = re.compile(r'^(?P0|[1-9]\d*)$') LINUX_OS_INFO_FILE = "/etc/os-release" VPS4_ALLOW_DELETE_PREFIXES = ['Xdjw6fGvTto3MktG'] # Type aliases RetryInstallCommandResult = Union[Retry, RunCommandResult] class NydusUpdateType(str, Enum): UPGRADE = 'upgrade' DOWNGRADE = 'downgrade' REINSTALL = 'reinstall' class UnsupportedUnit(Exception): def __init__(self, fields, supported_units): super().__init__( 'Unsupported unit in fields %s. Supported units: %s' % ( fields, supported_units)) def add_sudoer(username): path = os.path.join(SUDOERS_PREFIX, username) try: create_file(path, "%s ALL=(ALL) NOPASSWD: ALL\n" % username) os.chmod(path, 0o0440) except (OSError, IOError): message = "failed to write sudoers file for %s (%s)" % (username, path) LOG.exception(message) raise return path def remove_sudoer(username: str) -> str: """ Remove sudoer permissions from a user. :param username: The username :raises: OSError, IOError if the path can't be deleted :return: The path to the sudoer file """ if username in SYSTEM_USERS: raise ValueError('Cannot remove sudo for system user "%s"' % username) path = os.path.join(SUDOERS_PREFIX, username) try: if os.path.exists(path): os.unlink(path) except (OSError, IOError): message = 'failed to remove sudoers file for %s (%s)' % (username, path) LOG.exception(message) raise return path def remove_sudoers(): filelist = [f for f in os.listdir(SUDOERS_PREFIX) if f not in SYSTEM_USERS and not f.startswith(EXEMPT_SUDOERS_PREFIX)] for f in filelist: remove_sudoer(f) def shutdown(): try: LOG.info('shutdown VM') os.system('shutdown -h now') except (OSError, IOError) as ex: LOG.exception("failed to shutdown vm. error: '%s'", str(ex)) raise def set_safe_env(): env = dict(os.environ) lp_key = 'LD_LIBRARY_PATH' lp_orig = env.get(lp_key + '_ORIG') if lp_orig is not None: env[lp_key] = lp_orig else: env.pop(lp_key, None) return env class Linux(Ops): DISK_UTILIZATION_PATH = '/' MEMORY_UTILIZATION_SUPPORTED_UNITS = frozenset(['kB']) RETRYABLE_INSTALL_ERRORS = ['Could not resolve host', 'Could not get lock', 'has no installation candidate', 'cannot sync correctly', 'Error performing handshake'] PANOPTA_MANIFEST_FILE = '/etc/fm-agent-manifest' PANOPTA_CONFIG_FILE = '/etc/fm-agent/fm_agent.cfg' PANOPTA_AGENT_NAME = 'fm-agent' PANOPTA_YUM_REPO = '/etc/yum.repos.d/fortimonitor.repo' PANOPTA_YUM_REPO_TEMPLATE = 'fortimonitor.repo' SYSTEMCTL = '/usr/bin/systemctl' QEMU_AGENT_CONFIG_LOC = "/etc/sysconfig/qemu-ga" op_type = OpType.OPERATING_SYSTEM package_manager = None # type: PackageManager def __init__(self, package_manager: Optional[PackageManager] = None) -> None: super().__init__() self.package_manager = package_manager def _get_vm_tag(self): """Return this VM's tag.""" tag = None with open(NYDUS_EXECUTOR_CRT_PATH, "rb") as cert_file: cert_contents = cert_file.read() cert = load_pem_x509_certificate(cert_contents) subject = cert.subject.rfc4514_string() subject_list = subject.split(",") for item in subject_list: if item.startswith('CN='): tag = item.split("=")[1].strip() break if tag is not None: return 0, tag, '' return 1, '', 'Unable to retrieve vm tag' def _install(self, *packages: str, **kwargs) -> RetryInstallCommandResult: """Install packages with operating system's package manager. :param *packages: specifications of one or more packages to install :returns: - a Retry object if a retryable error occurred, or - result of last command ran; execution stops on first non-zero exit code :raises ValueError: when no packages are specified """ if not packages: raise ValueError('At least one package is required') commands = OrderedDict() skip_update_indices = kwargs.get('skip_update_indices', False) if not skip_update_indices: commands['update_indices'] = self.package_manager.update_indices commands['install'] = lambda: self.package_manager.install(*packages) for method, command in commands.items(): exit_code, outs, errs = command() if any((e in errs) for e in self.RETRYABLE_INSTALL_ERRORS): LOG.warning('Temporary error, will retry: %s', errs) return RETRY if exit_code != 0: if method != 'update_indices': break return exit_code, outs, errs def add_user(self, payload, unused=None): op_name = 'add_user' username = payload['username'] fail_if_exists = payload.get('fail_if_exists', True) # Default for backwards compatibility exit_code, outs, errs = runCommand( ['getent', 'passwd', username], 'does_user_exist', errorOK=True) if exit_code == 0: if fail_if_exists: LOG.error('User %s already exists', username) return False, self.build_result_dict(outs, errs, op_name) else: exit_code, outs, errs = self._run_add_user_command(username) if exit_code != 0: LOG.error('failed to add user: %s (%s)', outs, errs) return False, self.build_result_dict(outs, errs, op_name) return self._change_password(payload, op_name) def _run_add_user_command(self, username: str) -> Tuple[int, str, str]: """ Run the add user command (differs per linux flavor) :param username: The username :return: exit code, output, errors """ return runCommand(['useradd', '-m', username], 'add_user') def remove_user(self, username, unused=None) -> NydusResult: """ Remove the user from the server :param username: The username :param unused: Parameter used for workflow chaining :return: Nydus Result Dict """ op_name = 'remove_user' # running `pkill` is only a precaution from `userdel` reporting error when user is logged in runCommand("pkill -u %s" % username, 'remove_user', useShell=True) exit_code, outs, errs = runCommand( "userdel -fr %s" % username, 'remove_user', useShell=True) if exit_code != 0: LOG.error("failed to remove user: %s (%s)", outs, errs) return False, self.build_result_dict(outs, errs, op_name) try: remove_sudoer(username) except (OSError, IOError) as ex: LOG.error("failed to remove sudoer: %s (%s)", '', str(ex)) return False, self.build_result_dict('', str(ex), op_name) return self.build_result_dict(outs, errs, op_name) def _change_password(self, payload, op_name): username = payload['username'] encrypted_password = payload['encrypted_password'] try: password = self.decrypt(encrypted_password) except DecryptError as ex: return False, self.build_result_dict(ex.outs, ex.errs, op_name) exit_code, outs, errs = self._run_change_password_cmd(username, password, op_name) if exit_code != 0: LOG.error("failed to change password: %s (%s)", outs, errs) return False, self.build_result_dict(outs, errs, op_name) return self.build_result_dict(outs, errs, op_name) def _run_change_password_cmd(self, username: str, password: str, op_name: str) -> Tuple[int, str, str]: """ Run the change password command :param username: The Username :param password: The user password :param op_name: The name of the calling op :return: The change password command """ chpasswd_arg = shlex.quote('%s:%s' % (username, password)) cmd = "echo %s | chpasswd" % chpasswd_arg return runCommand(cmd, op_name, useShell=True, omitString=chpasswd_arg) def change_password(self, payload, unused=None): return self._change_password(payload, 'change_password') def enable_admin(self, username, unused=None): errs = outs = '' op_name = 'enable_admin' try: outs = add_sudoer(username) except (OSError, IOError) as ex: LOG.exception('%s: failed to enable admin for user %s: %s', op_name, username, str(ex)) errs = str(ex) return False, self.build_result_dict(outs, errs, op_name) LOG.info('%s: Enabled admin for user %s', op_name, username) return self.build_result_dict(outs, errs, op_name) def disable_all_admins(self, unused=None): op_name = 'disable_all_admins' LOG.info('Removing user sudo permissions for * at %s', SUDOERS_PREFIX) remove_sudoers() admin_account_name = 'temphfsadmin' try: exit_code, stdout, stderr = self.remove_account(admin_account_name, op_name) result_dict = self.build_result_dict(stdout, stderr, op_name) if exit_code != 0: return False, result_dict except Exception: # pylint: disable=broad-except LOG.exception("%s: error removing '%s' account", op_name, admin_account_name) raise return result_dict def account_exists(self, account_name: str) -> bool: """\ Check to see if the specified account exists. :param account_name: The name of the account for which to check :returns: True if the account exists; otherwise, false """ try: pwd.getpwnam(account_name) except KeyError: return False return True def remove_account(self, account_name: str, op_name: Optional[str] = None) -> Tuple[int, str, str]: """\ Remove the specified account, if it exists :param account_name: The name of the account to remove :param op_name: Optional string indicating the name of the parent op. If not specified, it will default to `"remove_account"` :returns: A 3-tuple containing an integer indicating the success or failure of the overall operation and two strings containing stdout and stderr from the most recently executed subprocess """ if op_name is None: op_name = 'remove_account' if not self.account_exists(account_name): return 0, "", "" return runCommand(['/usr/sbin/userdel', '--force', '--remove', account_name], tag=op_name) def remove_non_default_users(self): allowedUsers = ['_apt', 'abrt', 'adm', 'apache', 'avahi', 'avahi-autoipd', 'backup', 'bin', 'bind', 'cpanel', 'cpanelcabcache', 'cpanelconnecttrack', 'cpaneleximfilter', 'cpaneleximscanner', 'cpanellogin', 'cpanelphpmyadmin', 'cpanelphppgadmin', 'cpanelroundcube', 'cpanelrrdtool', 'cpses', 'daemon', 'dbus', 'Debian-exim', 'dovecot', 'dovenull', 'ftp', 'games', 'gnats', 'gopher', 'haldaemon', 'halt', 'horde_sysuser', 'irc', 'list', 'lp', 'mail', 'mailman', 'mailnull', 'man', 'messagebus', 'mhandlers-user', 'mysql', 'named', 'news', 'nginx', 'nobody', 'nydus', 'nscd', 'ntp', 'operator', 'polkitd', 'popuser', 'postfix', 'proxy', 'psaadm', 'psaftp', 'roundcube_sysuser', 'root', 'rpc', 'saslauth', 'shutdown', 'smmsp', 'smmta', 'sshd', 'statd', 'sw-cp-server', 'sync', 'sys', 'syslog', 'systemd-bus-proxy', 'systemd-network', 'systemd-resolve', 'systemd-timesync', 'tcpdump', 'tss', 'uucp', 'uuidd', 'vcsa', 'webalizer', 'www-data'] allowedUsers.extend(SYSTEM_USERS) exit_code, outs, errs = runCommand( ['cut', '-d:', '-f1', '/etc/passwd'], 'get current users') user_list = outs.split('\n') LOG.debug("exit_code: %s currentUserList: %s errs: %s", exit_code, user_list, errs) for user in user_list: if user not in allowedUsers: LOG.info("Removing user: %s", user) self.remove_user(user) def disable_admin(self, username, unused=None): LOG.info('Removing user sudo permissions for %s at %s', username, SUDOERS_PREFIX) remove_sudoer(username) def shutdown_clean(self, unused=None): shutdown() RETRY_CONFIGURE_MTA_TIMEOUT = 1200 # seconds # pylint: disable=invalid-name RETRY_CONFIGURE_MTA_RETRY_INTERVAL = 30 # seconds # pylint: disable=invalid-name @retry(interval=RETRY_CONFIGURE_MTA_RETRY_INTERVAL, timeout=RETRY_CONFIGURE_MTA_TIMEOUT) def configure_mta(self, payload, unused=None, intermediate_result=None): # Function is split in order that control panel ops can call underlying os function directly, # without incurring op overhead such as formatted retry results op_name = 'configure_mta' return self.do_configure_mta(payload, op_name, intermediate_result=intermediate_result) def do_configure_mta(self, payload, op_name, intermediate_result: Dict[str, Any] = None): relay = payload.get('relay_address') LOG.debug("%s %s start relayAddress: %s", self.get_op_type().value, op_name, relay) install_result = self._install('sendmail') if isinstance(install_result, Retry): return install_result exit_code, outs, errs = install_result if exit_code != 0: LOG.error('Failed installing sendmail!\n%s', outs + '\n' + errs) self.list_processes() return False, self.build_result_dict(outs, errs, op_name) try: self.configure_sendmail(relay) except Exception as ex: # pylint: disable=broad-except LOG.error('os_op configure_mta result(fail):' + str(ex) + '\n' + outs + '\n' + errs) return False, self.build_result_dict(outs, errs, op_name) return self.build_result_dict(outs, errs, op_name) def configure_sendmail(self, relay: str = None) -> None: """Configure Sendmail. :param relay: mail relay to set """ if relay is None: return set_tgt = functools.partial(replace_line, match='DS', replace='DS[%s]\n' % relay, firstword=False) edit_file_lines('/etc/mail/sendmail.cf', set_tgt) edit_file_lines('/etc/mail/submit.cf', set_tgt) def list_processes(self): LOG.error( 'Process list:\n%s', runCommand('ps -ef'.split(), 'list processes')) def change_hostname(self, payload, unused=None): op_name = 'change_hostname' exit_code, outs, errs = runCommand( "hostnamectl set-hostname %s" % payload['hostname'], 'changeHostname', useShell=True) if exit_code != 0: return False, self.build_result_dict(outs, errs, op_name) exit_code, outs, errs = self.update_etc_hosts_hostname( payload['hostname'], payload['ip_address'], op_name) return exit_code == 0, self.build_result_dict(outs, errs, op_name) def get_ip_regex(self, ip_addr: str) -> str: """Converts an ipv4 ip address into a regex compatible form for match/replace functions :param ip_addr: The ipv4 IP address """ ip_addr_parts = ip_addr.split('.') ip_regex = r'\.'.join(ip_addr_parts) return ip_regex def get_hostname_prefix(self, hostname: str) -> str: """Returns the first part of a fully-qualified hostname (before the first '.') or the entire hostname if there are no '.' chars :param hostname: The Server hostname """ hostname_parts = hostname.split('.') hostname_prefix = hostname_parts[0] return hostname_prefix def regex_replace(self, path: str, regex: str, tag: str = None) -> Tuple[int, str, str]: """Updates file entries using the regex replacement string :param path: The path to the file :param regex: The regex replacement string for sed in format {regex_match}/{regex_replace} :param tag: Optional tag to describe operation """ if tag is None: tag = 'regex_replace' command = "sed -i -r 's/{regex}/' {path}".format( regex=regex, path=path) return runCommand(command, tag, useShell=True) def update_etc_hosts_hostname(self, hostname: str, ip_addr: str, op_name: str) -> Tuple[int, str, str]: """Updates the /etc/hosts file with the new hostname for the server. Need to use Child class methods. :param hostname: The new server hostname :param ip_addr: The IP address for the server :param op_name: The op that is running this function """ exit_code, outs, errs = self.disable_cloud_init_hosts_update() if exit_code != 0: return exit_code, outs, errs # ipv4 Fixed IP / Bound IP hostname_prefix = self.get_hostname_prefix(hostname) ip_regex = self.get_ip_regex(ip_addr) regex = r'^({ip_regex}).*$/\1 {hostname} {hostname_prefix}'.format( ip_regex=ip_regex, hostname=hostname, hostname_prefix=hostname_prefix) exit_code, outs, errs = self.regex_replace(HOSTS_PATH, regex, op_name) if exit_code != 0: return exit_code, outs, errs # ipv6 Fixed IP / Bound IP ip_regex = r"^([a-f0-9]{4}:[a-f0-9]{4}:[a-f0-9]{4}:[a-f0-9]{4}::).*$/\1" regex = r'{ip_regex} {hostname} {hostname_prefix}'.format( ip_regex=ip_regex, hostname=hostname, hostname_prefix=hostname_prefix) exit_code, outs, errs = self.regex_replace(HOSTS_PATH, regex, op_name) if exit_code != 0: return exit_code, outs, errs # ipv6 localhost local_ip = '::1' localhost = 'localhost' localhost_hosts = '(localhost.localdomain localhost6 localhost6.localdomain6|ip6-localhost ip6-loopback)' regex = r'^{local_ip}((\s+{localhost})*\s+{localhost_hosts}).*$/{local_ip}\1 {hostname}'.format( local_ip=local_ip, localhost=localhost, localhost_hosts=localhost_hosts, hostname=hostname) return self.regex_replace(HOSTS_PATH, regex, op_name) def disable_cloud_init_hosts_update(self) -> Tuple[int, str, str]: """If the server/vm is managed by cloud-init, this function comments out 'update_etc_hosts' from the list of modules that run in the 'init' stage of boot. If this isn't done, the /etc/hosts changes may not survive a reboot. """ if os.path.exists(CLOUD_CONFIG_PATH): regex = '^.*- {init_module}$/#- {init_module}'.format( init_module=CLOUD_CONFIG_UPDATE_HOSTS_INIT_MODULE) return self.regex_replace(CLOUD_CONFIG_PATH, regex, 'disable update_etc_hosts') return 0, 'cloud-init not installed', '' def enable_cloud_init_hosts_update(self) -> Tuple[int, str, str]: """If the server/vm is managed by cloud-init, this function removes the comment from 'update_etc_hosts' in the list of modules that run in the 'init' stage of boot. This enables the cloud init service to configure /etc/hosts on first boot """ if os.path.exists(CLOUD_CONFIG_PATH): regex = '^.*- {init_module}$/ - {init_module}'.format( init_module=CLOUD_CONFIG_UPDATE_HOSTS_INIT_MODULE) return self.regex_replace(CLOUD_CONFIG_PATH, regex, 'enable update_etc_hosts') return 0, 'cloud-init not installed', '' def get_os_info(self, *args: Any) -> Any: """Returns the contents of /etc/os-release file for the Operating System information""" op_name = "get_os_info" os_info = "" if os.path.exists(LINUX_OS_INFO_FILE): with open(LINUX_OS_INFO_FILE, 'r', encoding='utf-8') as os_info_file: contents = os_info_file.read() for line in contents.split('\n'): if line.startswith('ID='): os_info += "NAME={name}\n".format(name=line.split('=')[1].strip()) if line.startswith('VERSION_ID='): os_info += "VERSION={version}\n".format(version=line.split('=')[1].strip()) if os_info is not None: return os_info return False, self.build_result_dict('', 'Unable to retrieve OS information', op_name) def snapshot_clean(self, payload, unused=None): # pylint: disable=W0221 if not payload['clean']: return # TODO put all these commands into a single script # cannot use runCommand for this as it utilizes | command_line = "grep -qc set_hostname /etc/cloud/cloud.cfg || sudo sed -i 's/resizefs/&\\n - set_hostname\\n " \ "- update_hostname/' /etc/cloud/cloud.cfg" dummy_exitcode, o, e = run_command_pipe(command_line, useShell=True) LOG.info("re-enable cloud-init hostname updates. stdOut: %s stdErr: %s", o, e) # Newer style cloud-init hostname preservation # TODO: passing an array of args to runCommand does not escape # parameters following convention of subprocess module methods, # but it should. runCommand( "sed -i 's/^preserve_hostname:.*$/preserve_hostname: false/' /etc/cloud/cloud.cfg", 'Set cloud-init preserve_hostname = false', useShell=True, errorOK=True) try: os.unlink(CLOUD_CONFIG_PRESERVE_HOSTNAME_PATH) except FileNotFoundError: pass # Re-enable updating of /etc/hosts by cloud-init exit_code, outs, errs = self.enable_cloud_init_hosts_update() if exit_code != 0: return False, self.build_result_dict(outs, errs, 'snapshot_clean') # remove any non-default users self.remove_non_default_users() # Remove host keys used by the OpenSSH server so that unique ones will be # generated at VM startup time LOG.debug("Removing SSH host keys") runCommand("rm -f /etc/ssh/ssh_host_*", 'remove ssh host keys', useShell=True) # Remove DNS resolvers config file so on VM create the proper resolvers # associated with env will be set LOG.debug("Removing DNS resolvers") runCommand("> /etc/resolv.conf", 'remove dns config', useShell=True) # Delete all logs exit_code, outs, errs = runCommand( "rm -f /opt/thespian/director/thespian_system.log*", 'snapshotClean', useShell=True) return exit_code == 0, self.build_result_dict(outs, errs, 'snapshot_clean') def snapshot_prep(self, payload): return def _get_memory_utilization(self): """Get system memory utilization. We aim for our used calculation to match the "used" column of `free -m` as closely as possible. See used calculation for "Red Hat Enterprise Linux 7.1 or later" at https://access.redhat.com/solutions/406773. See also http://man7.org/linux/man-pages/man1/free.1.html#DESCRIPTION. """ command = 'cat /proc/meminfo |egrep "^(MemTotal|MemFree|Buffers|Cached|Slab):"' LOG.info('Collecting memory utilization: %s', command) result = subprocess.run( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, universal_newlines=True, timeout=3, check=True) # 'MemTotal: 4045332 kB\nMemFree: 2402732 kB\n...' fields = result.stdout.split() # ['MemTotal:', '4045864', 'kB', 'MemFree:', '2812676', 'kB', ...] if any([unit not in self.MEMORY_UTILIZATION_SUPPORTED_UNITS # pylint: disable=use-a-generator for unit in fields[2::3]]): raise UnsupportedUnit( fields, self.MEMORY_UTILIZATION_SUPPORTED_UNITS) fields = dict(zip( [f[:-1] for f in fields[::3]], # field name, strip colon [int(f) for f in fields[1::3]])) # value without unit # {'MemTotal': '4045864', 'MemFree': '2822160', ...} total = fields['MemTotal'] free = sum([v for f, v in fields.items() if f != 'MemTotal'] # pylint: disable=consider-using-generator ) return { 'memoryTotal': total >> 10, # KiB > MiB 'memoryUsed': (total - free) >> 10} def _get_cpu_utilization(self): command = 'sar -u 1 1'.split() LOG.info('Collecting cpu utilization: %s', command) result = subprocess.run( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, timeout=3, check=True) # Linux 3.10.0-862.14.4.el7.x86_64 (loclhfs01) 12/03/2018 _x86_64_ (2 CPU) # # 03:38:39 PM CPU %user %nice %system %iowait %steal %idle # 03:38:40 PM all 0.00 0.00 0.00 0.00 0.00 100.00 # Average: all 0.00 0.00 0.00 0.00 0.00 100.00 lines = result.stdout.split('\n') head = next(filter(lambda line: '%idle' in line, lines)) field_names = head.split() value_line = next(filter( lambda line: line.startswith('Average:'), lines)) values = value_line.split() values = dict(zip(reversed(field_names), reversed(values))) # Reversed because field 0 time is split into 2 values # {'%iowait': '0.00', '%nice': '0.00', '%idle': '100.00', 'PM': # 'Average:', '%steal': '0.00', 'CPU': 'all', '%user': '0.00', # '%system': '0.00'} return {'cpuUsed': 100.0 - float(values['%idle'])} def _yum_update(self): """Performs a yum update on the server""" return self._run_yum_command(['yum', 'update', '-y'], 'update system') def _yum_configure_repo(self) -> None: """Configures the yum repo. Is a noop for all linux distros except CentOS6, which is EOL""" return def _run_yum_command(self, *args, use_run_command_pipe: bool = False, **kwargs) -> Tuple[int, str, str]: """ A thin wrapper to runCommand or run_command_pipe for yum commands. Configures the yum repo before running the command :param use_run_command_pipe: Whether to use run_command_pipe instead of runCommand :return The runCommand/run_command_pipe result, as a tuple """ self._yum_configure_repo() if 'use_run_command_pipe' in kwargs: del kwargs['use_run_command_pipe'] if use_run_command_pipe: return run_command_pipe(*args, **kwargs) return runCommand(*args, **kwargs) def install_panopta(self, payload, *args, **kwargs): """Create a manifest file for the Panopta/Fortimonitor agent, add a repo file, and install panopta-agent/fm-agent using yum :param payload: payload containing customer_key (the key value to set in the manifest file) and template_ids (a csv string of the template ids to assign this server to) :return result of yum install operation""" self._create_panopta_manifest_file(payload) self._create_panopta_repo_file() exit_code, outs, errs = self._run_yum_command( ['yum', 'install', '-y', self.PANOPTA_AGENT_NAME], 'install {agent_name}'.format(agent_name=self.PANOPTA_AGENT_NAME)) check_msg = "Installation of {agent_name} had an error".format( agent_name=self.PANOPTA_AGENT_NAME) silent_yum_install_error = check_msg in outs success = exit_code == 0 and (not silent_yum_install_error) return success, self.build_result_dict(outs, errs, 'install_panopta') def upgrade_panopta(self, *args, customer_key: Optional[str] = None, **kwargs): # pylint: disable=too-many-locals """Deletes panopta apt keys and upgrades panopta to fortimonitor""" # cleaning panopta-agent keys exit_code, outs, errs = runCommand( ['apt-key', 'del', '61EE28720129F5F3'], 'upgrade_panopta') sources_file_panopta = '/etc/apt/sources.list' with open(sources_file_panopta, 'r', encoding='utf-8') as source_file: lines = source_file.read() with open(sources_file_panopta, 'w', encoding='utf-8') as source_file: for line in lines.split('\n'): if 'Addition to add panopta-agent' in line: pass if 'deb http://packages.panopta.com/deb stable main' in line: pass else: source_file.write(line + '\n') if self._check_panopta_installed(): if customer_key is None: try: customer_key = self._get_panopta_customer_key() except ValueError as ex: message = "customer_key not found" LOG.exception(message) return False, self.build_result_dict('customer_key not found', str(ex), 'upgrade_panopta') cmd = "echo 'debconf debconf/frontend select Noninteractive' | debconf-set-selections" runCommand(cmd, 'upgrade_panopta deb conf', useShell=True) cmd = 'curl -s https://repo.fortimonitor.com/install/linux/fm_agent_install.sh | ' \ 'bash /dev/stdin -c {customer_key} -x -y'.format(customer_key=customer_key) exit_code, outs, errs = runCommand(cmd.encode('utf-8'), 'curl upgrade_panopta', useShell=True) check_msg = "Installation of fm-agent had an error" silent_yum_install_error = check_msg in outs success = exit_code == 0 and (not silent_yum_install_error) return success, self.build_result_dict('fm_agent installed', errs, 'upgrade_panopta') LOG.info("Panopta-agent not Installed") return exit_code == 0, self.build_result_dict("Panopta-agent not Installed", "", 'upgrade_panopta') def _get_panopta_customer_key(self, *args, **kwargs): """ get the Panopta customer key from the agent manifest file and return it """ manifest_file = '/etc/panopta-agent-manifest' if not os.path.exists(manifest_file): raise ValueError('Panopta manifest file(/etc/panopta-agent-manifest) not found') cmd = r"""grep customer_key {manifest_file} | sed s/'customer_key\s*=\s*'//""".format( manifest_file=manifest_file) _, outs, _ = runCommand(cmd, 'get_customer_key', useShell=True) if outs is None: raise ValueError('Customer key not present in Panopta manifest file') return outs def _check_panopta_installed(self): _, outs, _ = self._run_yum_command( ['yum', 'list', 'installed', '|', 'grep panopta-agent'], 'check panopta installed', use_run_command_pipe=True) return outs def _create_panopta_repo_file(self): repo_file_contents = read_text( 'customer_local_ops.operating_system.resources', self.PANOPTA_YUM_REPO_TEMPLATE) create_file(self.PANOPTA_YUM_REPO, repo_file_contents) def delete_panopta(self, *args, **kwargs): """ Deletes panopta-agent/fm-agent from the server and removes the panopta-agent/fm-agent manifest file :return result of yum uninstall operation""" try: _, outs, _ = self._run_yum_command( ['yum', 'list', 'installed', '|', 'grep {agent_name}'.format( agent_name=self.PANOPTA_AGENT_NAME)], 'check {agent_name} installed'.format(agent_name=self.PANOPTA_AGENT_NAME), use_run_command_pipe=True) if outs: exit_code, outs, errs = self._run_yum_command( ['yum', 'remove', '-y', self.PANOPTA_AGENT_NAME], 'delete {agent_name}'.format(agent_name=self.PANOPTA_AGENT_NAME)) if exit_code != 0: LOG.error('failed to remove %s agent', self.PANOPTA_AGENT_NAME) return False, self.build_result_dict(outs, errs, 'delete_panopta') else: exit_code, outs, errs = 0, '{agent_name} agent is not installed'\ .format(agent_name=self.PANOPTA_AGENT_NAME), '' if os.path.exists(self.PANOPTA_MANIFEST_FILE): os.unlink(self.PANOPTA_MANIFEST_FILE) exit_code = 0 outs = outs + '\n{agent_name} agent manifest file removed'.format(agent_name=self.PANOPTA_AGENT_NAME) # trying to delete the panopta-agent if its installed, applicable for old boxes _, outs2, _ = self._run_yum_command( ['yum', 'list', 'installed', '|', 'grep panopta-agent'], 'check panopta-agent installed', use_run_command_pipe=True) if outs2: exit_code, outs, errs = self._run_yum_command( ['yum', 'remove', '-y', 'panopta-agent'], 'delete {agent_name}'.format(agent_name='panopta-agent')) if exit_code != 0: LOG.error('failed to remove panopta-agent') return False, self.build_result_dict(outs, errs, 'delete_panopta') else: exit_code, outs, errs = 0, 'panopta-agent is not installed', '' if os.path.exists('/etc/panopta-agent-manifest'): os.unlink('/etc/panopta-agent-manifest') exit_code = 0 outs = outs + '\npanopta-agent manifest file removed' except (OSError, IOError) as ex: message = "Failed to unlink Panopta/Fortimonitor manifest file" LOG.exception(message) return False, self.build_result_dict('', str(ex), 'delete_panopta') return exit_code == 0, self.build_result_dict(outs, errs, 'delete_panopta') def get_panopta_server_key(self, *args, **kwargs): """ get the Panopta/Fortimonitor server key from the agent configuration file and return it :return a dictionary of the outs and errors """ agent_conf = self.PANOPTA_CONFIG_FILE if not os.path.exists(agent_conf): raise ValueError('Panopta/Fortimonitor config file: {agent_conf} not found'.format(agent_conf=agent_conf)) cmd = r"""grep server_key {agent_conf} | sed s/'server_key\s*=\s*'//""".format( agent_conf=agent_conf) exit_code, outs, _ = runCommand(cmd, 'get_server_key', useShell=True) return exit_code == 0, {'outs': outs, 'append_info': False} def update_invalid_resolvers(self, valid_resolvers: List[str], invalid_resolvers: List[str], *args, **kwargs) -> NydusResult: """ If the server has any of the listed invalid resolvers, replace them with the valid resolvers. Otherwise, leave the resolvers as-is. Currently the script takes only two valid and two invalid resolvers for updating. :param valid_resolvers: A list of valid dns nameservers to be added to the server :param invalid_resolvers: A list of invalid dns nameservers to be removed from the server :return: a dictionary of the outs and errors """ resolvers_arg = invalid_resolvers resolvers_arg.extend(valid_resolvers) exit_code, outs, errs = run_shell_script_file( script_file=SHELL_SCRIPT_PATH / "update_dns_linux.sh", tag="update_dns_resolvers", script_file_args=resolvers_arg) if exit_code != 0: errs = errs + '; ' + 'exit_code:' + str(exit_code) return exit_code == 0, self.build_result_dict(outs, errs, 'update_invalid_resolvers') def __cleanup_on_write_error(self, file_path: Path) -> None: """ Helper method that performs cleanup for the 'write_out_file' op in-case of any errors :param file_path: The file path to the file/directory that was being written out by the op """ if file_path.exists(): if file_path.is_file(): try: file_path.unlink() except FileNotFoundError: pass else: file_path.rmdir() def __validate_write_file(self, op_name: str, name: str, location: str, user_name: str, group_name: str, contents: str, is_file: bool, exist_ok: bool) -> NydusResult: """ Helper method that performs validation for the 'write_out_file' op, before the file/directory is written out. :param op_name: Name of the op, should typically be 'write_out_file' :param name: Name of the file/directory to be created :param location: The directory/folder in which the file is to be created :param user_name: The user who will own the file/directory :param group: The group who will own the file/directory :param contents: The contents to be written out to the file :param is_file: Boolean parameter indicating if its a file or directory that is to be created :param exist_ok: Boolean parameter indicating if existence of the file is ok """ folder_loc = Path(location) if not folder_loc.exists(): errs = "Folder '{}' doesn't exist".format(location) LOG.exception("%s: %s", op_name, errs) return False, self.build_result_dict("", errs, op_name) file_path = folder_loc / name if file_path.exists() and not exist_ok: errs = "File '{}' already exists".format(str(file_path)) LOG.exception("%s: %s", op_name, errs) return False, self.build_result_dict("", errs, op_name) try: pwd.getpwnam(user_name) except KeyError: errs = "User '{}' doesn't exist".format(user_name) LOG.exception("%s: %s", op_name, errs) return False, self.build_result_dict("", errs, op_name) try: grp.getgrnam(group_name) except KeyError: errs = "Group '{}' doesn't exist".format(group_name) LOG.exception("%s: %s", op_name, errs) return False, self.build_result_dict("", errs, op_name) if is_file and len(contents) > MAX_FILE_SIZE: errs = "File contents exceeds size limit of {}".format(MAX_FILE_SIZE) LOG.exception("%s: %s", op_name, errs) return False, self.build_result_dict("", errs, op_name) return True, {} def __write_file(self, op_name: str, file_path: Path, contents: str) -> NydusResult: """ Helper method that handles writing out to a file for the 'write_out_file' op. :param op_name: Name of the op, should typically be 'write_out_file' :param file_path: File path of the file to be written :param contents: The contents to be written out to the file """ try: if len(contents) == 0: file_path.touch(exist_ok=True) else: decoded_contents = base64.b64decode(contents) file_path.write_bytes(decoded_contents) except (OSError, IOError) as ex: LOG.exception('%s: failed to write file %s: %s', op_name, str(file_path), str(ex)) errs = str(ex) return False, self.build_result_dict("", errs, op_name) return True, {} def __make_directory(self, op_name, file_path): """ Helper method that handles creating a directory for the 'write_out_file' op. :param op_name: Name of the op, should typically be 'write_out_file' :param file_path: File path of the directory to be created """ try: file_path.mkdir(exist_ok=True) except (OSError, IOError) as ex: LOG.exception('%s: failed to create directory %s: %s', op_name, str(file_path), str(ex)) errs = str(ex) return False, self.build_result_dict("", errs, op_name) return True, {} def __change_file_perms(self, op_name: str, file_path: Path, perms: str) -> NydusResult: """ Helper method that handles setting the file permissions for the 'write_out_file' op. :param op_name: Name of the op, should typically be 'write_out_file' :param file_path: File path of the directory to be created :param perms: Permissions for the new file """ try: file_path.chmod(int(perms, base=8)) except PermissionError as ex: self.__cleanup_on_write_error(file_path) LOG.exception('%s: failed to change file permissions %s: %s', op_name, str(file_path), str(ex)) errs = str(ex) return False, self.build_result_dict("", errs, op_name) return True, {} def __change_file_ownership(self, op_name: str, file_path: Path, user_name: str, group_name: str) -> NydusResult: """ Helper method that handles setting the file ownership for the 'write_out_file' op. :param op_name: Name of the op, should typically be 'write_out_file' :param file_path: File path of the directory to be created :param user_name: Owning user of the file :param group_name: Owning group of the file """ try: shutil.chown(str(file_path), user_name, group_name) except PermissionError as ex: self.__cleanup_on_write_error(file_path) LOG.exception('%s: failed to change file ownership %s: %s', op_name, str(file_path), str(ex)) errs = str(ex) return False, self.build_result_dict("", errs, op_name) return True, {} def write_out_file(self, name: str, location: str, perms: str, user_name: str, group_name: str, contents: str = b'', is_file: bool = True, exist_ok: bool = False) -> NydusResult: """ Op for writing out a regular file (with content) or creating a directory. Allows sets the file permissions and user/group ownership for the created file. :param name: Name of the file/directory to be created :param location: The directory/folder in which the file is to be created :param perms: The file permissons of the created file :param user_name: The user who will own the file/directory :param group_name: The group who will own the file/directory :param contents: The contents to be written out to the file :param is_file: Boolean parameter indicating if its a file or directory that is to be created :param exist_ok: Boolean parameter indicating if existence of the file is ok """ op_name = 'write_out_file' ok, result = self.__validate_write_file( op_name, name, location, user_name, group_name, contents, is_file, exist_ok) if not ok: return ok, result file_path = Path(location) / name ok, result = self.__write_file(op_name, file_path, contents) \ if is_file \ else self.__make_directory(op_name, file_path) if not ok: return ok, result ok, result = self.__change_file_perms(op_name, file_path, perms) if not ok: return ok, result ok, result = self.__change_file_ownership(op_name, file_path, user_name, group_name) if not ok: return ok, result return True, self.build_result_dict("", "", op_name) def delete_file(self, name: str, location: str, is_file: bool = True) -> NydusResult: """ Op for deleting a file or a directory. :param name: Name of the file/directory to be deleted :param location: The directory/folder in which the file is to be deleted :param is_file: Boolean parameter indicating if its a file or directory that is to be deleted """ op_name = 'delete_file' file_path = Path(location) / name if not any(name.startswith(prefix) for prefix in VPS4_ALLOW_DELETE_PREFIXES): raise RuntimeError("Invalid file/directory prefix") try: if not is_file: rmtree(file_path) else: os.remove(file_path) except (OSError, IOError, PermissionError, FileNotFoundError) as ex: raise RuntimeError( '{}: failed to delete file/directory {}: {}'.format(op_name, str(file_path), str(ex)) ) from ex return True, self.build_result_dict("", "", op_name) def restart_service(self, service_name: str, is_systemd: bool = True, should_block: bool = True) -> NydusResult: """ Op that restarts a service. Allows for restarting of a SysV service or a SystemD based service. :param service_name: Name of the service to be restarted :param is_systemd: Flag indicating if the service to be restarted is systemd unit or not :param should_block: Flag specific to systemd indicating if the we should wait for the completion of the command execution """ rc_tag = 'restart_service' if is_systemd: if should_block: exit_code, outs, errs = runCommand( [self.SYSTEMCTL, 'restart', service_name], rc_tag) else: exit_code, outs, errs = runCommand( [self.SYSTEMCTL, 'restart', '--no-block', service_name], rc_tag) else: exit_code, outs, errs = runCommand( ['/sbin/service', service_name, 'restart'], rc_tag) if exit_code != 0: LOG.error("failed to restart service: %s (%s)", outs, errs) return False, self.build_result_dict(outs, errs, rc_tag) return True, self.build_result_dict("", "", rc_tag) def _get_file_hash(self, path: str, hash_type: str) -> str: """ Get the hash for a given filepath and hash type, e.g. md5, sha1, sha256 :param path: The path of the file to be hashed :param hash_type: The type of hash to perform :return: The hashed file as a string :raises: RunTimeException if hashing fails """ command = hash_type + 'sum' exit_code, outs, errs = runCommand( [command, path], "{hash_type} file".format(hash_type=hash_type)) if exit_code != 0: LOG.error("Failed to %s hash file %s: %s (%s)", hash_type, path, outs, errs) raise RuntimeError("Failed to %s hash file %s: %s (%s)" % (hash_type, path, outs, errs)) # outputs 'hash filename' outs_list = outs.split(" ") return outs_list[0].strip() def _get_file_timestamps(self, path: str) -> Dict[str, Any]: """ Get the access, modify and change date and time of given file path :param path: The path of the file :return: Dict containing access, modify and change timestamps in format YYYY-MM-DD HH:MM:SS.xxxxxxxx TZ_OFFSET """ try: access_time = datetime.fromtimestamp(os.path.getatime(path)) modify_time = datetime.fromtimestamp(os.path.getmtime(path)) change_time = datetime.fromtimestamp(os.path.getctime(path)) file_timestamps = {"access": str(access_time), "modify": str(modify_time), "change": str(change_time)} return file_timestamps except OSError as ex: LOG.error("Failed to file timestamps %s: %s", path, str(ex)) raise def get_file_info(self, paths: str) -> Dict[str, Any]: """ Op for retrieving file info (if file exists) including the md5, sha-1 and sha-256 hashes, and access, change and modify dates :param paths: Comma-separated list of paths of files :return: Dict with filenames and their associated timestamps and hashes (if file present) """ out_list = [] paths_list = paths.split(",") for path in paths_list: path = path.strip() out_dict = {"fileName": path} file_loc = Path(path) if not file_loc.exists(): out_dict["exists"] = False out_dict["md5"] = None out_dict["sha1"] = None out_dict["sha256"] = None out_dict["access"] = None out_dict["modify"] = None out_dict["change"] = None else: out_dict["exists"] = True for hash_type in ['md5', 'sha1', 'sha256']: try: out_dict[hash_type] = self._get_file_hash(path, hash_type) except RuntimeError: # Set value to None if unable to get the hash out_dict[hash_type] = None try: file_timestamps = self._get_file_timestamps(path) out_dict["access"] = file_timestamps.get("access") out_dict["modify"] = file_timestamps.get("modify") out_dict["change"] = file_timestamps.get("change") except OSError: # Set value to None if unable to get the timestamps date out_dict["access"] = None out_dict["modify"] = None out_dict["change"] = None out_list.append(out_dict) ret = {"files": out_list} return ret def get_rpm_info(self) -> List[str]: """ Op for returning rpm info on the vm :return: List of installed rpms with additional info """ query_format_list = [ r'''"${HOSTNAME}|${VM_CONTAINER}|%{NAME}-%{VERSION}-%{RELEASE}.%{ARCH}|''', r'''%{INSTALLTIME}|%{BUILDHOST}|%{DSAHEADER:pgpsig}|%{RSAHEADER:pgpsig}|''', r'''%{SIGGPG:pgpsig}|%{SIGPGP:pgpsig}\n"'''] query_format = ''.join(query_format_list) cmd = "rpm -qa --queryformat " + query_format exit_code, outs, errs = run_command_pipe(cmd, useShell=True) if isinstance(outs, bytes): outs = outs.decode('utf-8') if isinstance(errs, bytes): errs = errs.decode('utf-8') if exit_code != 0: LOG.error("Failed to get rpm info: %s (%s)", outs, errs) return False, self.build_result_dict(outs, errs, 'get_rpm_info') ret = outs.split("\n") return ret RETRY_INSTALL_QEMU_AGENT_TIMEOUT = 1200 # pylint: disable=invalid-name RETRY_INSTALL_QEMU_AGENT_INTERVAL = 30 # pylint: disable=invalid-name @retry(interval=RETRY_INSTALL_QEMU_AGENT_INTERVAL, timeout=RETRY_INSTALL_QEMU_AGENT_TIMEOUT) def install_qemu_agent(self, pypi_url: str, *args, **kwargs) -> NydusResult: """ Op for installing the qemu agent on the vm :param pypi_url: The url for the pypi server where the package is located """ package_url = pypi_url + "/-/" + self.DISTRO + "/" + self.OS_VERSION + "/" + self.QEMU_PACKAGE_NAME result = self._install(package_url) if isinstance(result, Retry): return result exit_code, outs, errs = result if exit_code != 0: if 'Nothing to do' not in errs: return False, self.build_result_dict(outs, errs, 'install_qemu_agent') exit_code, outs, errs = self._configure_qemu_agent() if exit_code != 0: return False, self.build_result_dict(outs, errs, 'configure_qemu_agent') exit_code, outs, errs = self._enable_qemu_agent() if exit_code != 0: return False, self.build_result_dict(outs, errs, 'enable_qemu_agent') return def _configure_qemu_agent(self) -> RunCommandResult: """ Update the qemu agent configuration """ cmd = r"""sed -i 's/^BLACKLIST_RPC=.*$/BLACKLIST_RPC=""/g' """ + self.QEMU_AGENT_CONFIG_LOC return run_command_pipe(cmd, 'configure_qemu_agent') def _enable_qemu_agent(self) -> RunCommandResult: """ Enable the qemu guest agent to start at boot. """ return runCommand([self.SYSTEMCTL, 'enable', 'qemu-guest-agent'], 'enable_qemu_agent') class CentOS(Linux): DISTRO = 'Centos' def __init__(self): super().__init__(package_manager=Yum()) def _install(self, *packages: str, **kwargs) -> RetryInstallCommandResult: self._yum_configure_repo() return super()._install(*packages) def configure_ip_files(self, vm_address, addresses, gateway, unused=None): # NOQA pylint: disable=R0914,R0912 # NOQA added to avoid: # C901 'CentOS.configure_ip_files' is too complex (20). # backlog story created to refactor this to read data from mnt, instead of config. # additional story created to refactor this file to one file per OS/distribution. LOG.info("configure_ips: %s, %s, %s", vm_address, addresses, gateway) addresses.insert(0, vm_address) cfgdir = '/etc/sysconfig/network-scripts' for oldfile in glob.glob(os.path.join(cfgdir, 'ifcfg-eth0:*')): LOG.info('Removing old config: %s', oldfile) os.unlink(oldfile) for oldfile in glob.glob(os.path.join(cfgdir, 'route-eth0:*')): LOG.info('Removing old routing: %s', oldfile) os.unlink(oldfile) for oldfile in glob.glob(os.path.join(cfgdir, 'ifcfg-lo:*')): LOG.info('Removing old config: %s', oldfile) os.unlink(oldfile) for oldfile in glob.glob(os.path.join(cfgdir, 'route-lo:*')): LOG.info('Removing old routing: %s', oldfile) os.unlink(oldfile) nwconf = '/etc/sysconfig/network' if not os.path.exists(nwconf): create_file(nwconf, 'GATEWAYDEV=eth0\n') else: with open(nwconf, 'r', encoding='utf-8') as nwconf_file: gwdev = 'GATEWAYDEV' in nwconf_file.read() if gwdev: edit_file_lines(nwconf, functools.partial(replace_line, match='GATEWAYDEV', replace='GATEWAYDEV=eth0\n', firstword=False)) else: append_line(nwconf, 'GATEWAYDEV=eth0\n') with open(os.path.join(cfgdir, 'ifcfg-eth0'), 'r', encoding='utf-8') as base_cfg_file: base_cfg = base_cfg_file.read() if hasattr(base_cfg, 'decode'): base_cfg = base_cfg.decode('ascii') gw = gateway if not gw: for line in base_cfg.split('\n'): if line.startswith('GATEWAY='): gw = line[len('GATEWAY='):].split()[0] LOG.info('Endpoint Count: %s', str(len(addresses))) if len(addresses) < 2: def_route_line = 'DEFROUTE=yes\n' else: def_route_line = 'DEFROUTE=no\n' default_route_created = False for ifnum, ip_address in enumerate(addresses): # Disable Eth0 default route with open(os.path.join(cfgdir, 'ifcfg-eth0'), 'w', encoding='utf-8') as base_cfg_file: for line in base_cfg.split('\n'): if line.startswith('DEFROUTE'): base_cfg_file.write(def_route_line) else: base_cfg_file.write(line + '\n') if str(ip_address) in base_cfg: continue with open(os.path.join(cfgdir, 'ifcfg-eth0:%s' % ifnum), 'w', encoding='utf-8') as ethcfg: ethcfg.write('\n'.join(['DEVICE=eth0:%d' % ifnum, 'ONBOOT=yes', 'BOOTPROTO=static', 'IPADDR=' + str(ip_address), 'NETMASK=255.255.255.255']) + '\n') # remove any routes for this ip from route-eth0 (added by openstack) if os.path.exists(os.path.join(cfgdir, 'route-eth0')): with open(os.path.join(cfgdir, 'route-eth0'), 'r', encoding='utf-8') as base_route_file: base_route = base_route_file.read() with open(os.path.join(cfgdir, 'route-eth0'), 'w', encoding='utf-8') as base_route_file: for line in base_route.split('\n'): if str(ip_address) in line: pass else: base_route_file.write(line + '\n') # we only want to add one default route if default_route_created is not True: create_file(os.path.join(cfgdir, 'route-eth0:%s' % ifnum), 'default via %s dev eth0 proto static src %s metric 100\n' % (gw, str(ip_address))) LOG.info('Default route created for: %s', str(ip_address)) default_route_created = True LOG.warning('Configured %s to %s, gw %s', 'ifcg-eth0:%s' % ifnum, ip_address, gw) def update_etc_hosts_hostname(self, hostname: str, ip_addr: str, op_name: str) -> Tuple[int, str, str]: """Updates the /etc/hosts file with the new hostname for the server :param hostname: The new server hostname :param ip_addr: The IP address for the server :param op_name: The op that is running this function """ exit_code, outs, errs = super().update_etc_hosts_hostname(hostname, ip_addr, op_name) if exit_code != 0: return exit_code, outs, errs # ipv4 localhost comment = '#cloud-controlled; do not change' local_ip = '127.0.0.1' ip_regex = self.get_ip_regex(local_ip) regex = r'^{ip_regex}.*{comment}$/{local_ip} {hostname} {hostname} {comment}'.format( ip_regex=ip_regex, comment=comment, local_ip=local_ip, hostname=hostname) return self.regex_replace(HOSTS_PATH, regex, op_name) # pylint: disable=too-many-return-statements def update_nydus(self, pypi_url: str, update_type: Optional[str] = NydusUpdateType.REINSTALL, nydus_version: Optional[str] = None) -> NydusResult: """ Op for updating pyinstaller based nydus package :param pypi_url: The url for the pypi server where the package is located :param update_type: The type of update to perform, one of "reinstall", "upgrade" or "downgrade" :param nydus_version: The version of nydus to use for the update """ env = set_safe_env() LOG.info('update_type: %s , nydus_version: %s', update_type, nydus_version) if update_type not in (NydusUpdateType.REINSTALL, NydusUpdateType.DOWNGRADE, NydusUpdateType.UPGRADE): return False, self.build_result_dict("", "Invalid operation", 'update_nydus') exit_code, outs, errs = self._run_yum_command( ['yum', 'list', 'installed', 'nydus-executor'], 'Check for pyinstaller based package', env=env) if exit_code != 0: return False, self.build_result_dict( "", "Pyinstaller based nydus package is currently not installed", 'update_nydus') if update_type == NydusUpdateType.REINSTALL: exit_code, outs, errs = runCommand( ['rpm', '-q', '--queryformat', '%{VERSION}-%{RELEASE}', 'nydus-executor'], 'get nydus version', env=env) if exit_code != 0: return False, self.build_result_dict( "", "Error determining existing nydus version", 'update_nydus') # Force nydus version to be the same as the existing nydus version for reinstall nydus_version = outs if nydus_version is None: return False, self.build_result_dict( "", "Nydus version is a required parameter but wasn't provided", 'update_nydus') nydus_pkg_name = None if SEMVER_RGX.match(nydus_version): # eg: nydus-executor-6.21.1-97.x86_64.rpm nydus_pkg_name = 'nydus-executor-{}.x86_64.rpm'.format(nydus_version) elif MAJOR_ONLY_RGX.match(nydus_version): # eg: nydus-executor-6.rpm nydus_pkg_name = 'nydus-executor-{}.rpm'.format(nydus_version) else: return False, self.build_result_dict("", "Invalid nydus version", 'update_nydus') package_url = pypi_url + "/-/" + self.DISTRO + "/" + self.OS_VERSION + "/" + nydus_pkg_name exit_code = 0 if update_type == NydusUpdateType.UPGRADE: exit_code, outs, errs = self._run_yum_command( ['yum', 'update', '-y', package_url], 'Update nydus to version {}'.format(nydus_pkg_name), env=env) elif update_type == NydusUpdateType.REINSTALL: exit_code, outs, errs = self._run_yum_command( ['yum', 'reinstall', '-y', package_url], 'Update nydus to version {}'.format(nydus_pkg_name), env=env) elif update_type == NydusUpdateType.DOWNGRADE: exit_code, outs, errs = self._run_yum_command( ['yum', 'downgrade', '-y', package_url], 'Update nydus to version {}'.format(nydus_pkg_name), env=env) else: return False, self.build_result_dict( "", "Unsupported nydus update action", 'update_nydus') if exit_code != 0 or ("The same or higher version of nydus-executor is already installed" in errs): return False, self.build_result_dict("", "Nydus update failed {}".format(errs), 'update_nydus') exit_code, outs, errs = runCommand( ['systemctl', 'daemon-reload'], 'reload systemd unit files', env=env) if exit_code != 0: return False, self.build_result_dict( "", "Error reloading systemd unit files", 'get_nydus_version') return True, self.build_result_dict(outs, errs, 'update_nydus') def upgrade_panopta(self, *args, customer_key: Optional[str] = None, **kwargs) -> NydusResult: """Deletes panopta repo config and upgrades panopta to fortimonitor""" if not self._check_package_installed('panopta-agent'): return True, self.build_result_dict("", "Panopta is not installed, nothing to do", 'upgrade_panopta') if not self.install_python3(): return False, self.build_result_dict("", "Failed to install python3", 'upgrade_panopta') try: self._remove_panopta_repo() except (IOError, OSError) as ex: LOG.info("Error while deleting panopta repo file") return False, self.build_result_dict("", "Failed to delete panopta repo file: {}".format(str(ex)), 'upgrade_panopta') try: customer_key = self._get_panopta_customer_key() except ValueError as ex: message = "customer_key not found" LOG.exception(message) return False, self.build_result_dict('customer_key not found', str(ex), 'upgrade_panopta') cmd = r"""curl -s https://repo.fortimonitor.com/install/linux/fm_agent_install.sh | \ sudo bash /dev/stdin -c {customer_key} -x -y""".format(customer_key=customer_key) exit_code, outs, errs = runCommand(cmd, 'upgrade_panopta', useShell=True) if exit_code != 0: return False, self.build_result_dict(outs, errs, 'upgrade_panopta') err_check_msgs = ['Installation of fm-agent had an error', 'No suitable python version found', 'Agent installation must be run as root'] silent_yum_install_error = any(msg in outs for msg in err_check_msgs) success = exit_code == 0 and (not silent_yum_install_error) return success, self.build_result_dict('fm_agent installed', errs, 'upgrade_panopta') def _check_package_installed(self, package: str) -> bool: """Check if package is installed""" _, outs, _ = run_command_pipe('sudo yum list installed', 'Check whether package is installed') if package in outs.decode('utf-8'): return True return False def _remove_panopta_repo(self) -> None: """Remove panopta repo""" try: os.remove('/etc/yum.repos.d/panopta.repo') except FileNotFoundError: LOG.info("Panopta repo file not found") def install_python3(self) -> bool: """Install python3 if not already installed""" exit_code, outs, _ = runCommand(['python3', '-V'], 'Check Python version', useShell=True) if exit_code == 0 and 'Python 3.' in outs: return True try: self._add_hfs_common_repo() except (IOError, OSError) as ex: LOG.info("Failed to add hfs_common repo: %s", str(ex)) return False exit_code_yum, outs_yum, errs = self._run_yum_command( ['yum', 'install', '-y', 'python3'], 'Install python3') if exit_code_yum != 0: LOG.error("failed to install python3: %s (%s)", outs_yum, errs) return False try: self._remove_hfs_common_repo() except (IOError, OSError) as ex: LOG.info("Failed to remove hfs_common repo: %s", str(ex)) return False return True def _add_hfs_common_repo(self) -> None: """Add gpg key and hfs common repo config""" repo_file = HFS_COMMON_YUM_REPO repo_file_contents = read_text( 'customer_local_ops.operating_system.resources', 'hfs_common.repo') create_file(repo_file, repo_file_contents) def _remove_hfs_common_repo(self) -> None: """Remove hfs_common repo and gpg key""" os.remove(HFS_COMMON_YUM_REPO) class CentOS6(CentOS): OS_VERSION = '6' QEMU_PACKAGE_NAME = 'qemu-guest-agent-0.rpm' def update_nydus(self, *args, **kwargs) -> NydusResult: return False, self.build_result_dict("", "Not supported", 'update_nydus') def configure_ips(self, *args, **kwargs): super().configure_ip_files(*args, **kwargs) exit_code, outs, errs = runCommand( ['/sbin/service', 'network', 'restart'], 'network restart') if exit_code != 0: LOG.error("failed to configure ips: %s (%s)", outs, errs) return False, [outs, errs] return def change_hostname(self, payload, unused=None): # The hostname command is not persistent for CentOS-6 op_name = 'change_hostname' ip_addr = payload['ip_address'] hostname = payload['hostname'] exit_code, outs, errs = runCommand("hostname " + hostname, 'changeHostname', useShell=True) if exit_code != 0: return False, self.build_result_dict(outs, errs, op_name) # This is the persistent location for the hostname setting try: edit_file_lines( '/etc/sysconfig/network', functools.partial(replace_line, match='HOSTNAME', replace='HOSTNAME=%s\n' % hostname, firstword=False)) except Exception as ex: # pylint: disable=broad-except LOG.error("Failed to update /etc/sysconfig/network file: %s", str(ex)) return False, self.build_result_dict(outs, str(ex), op_name) # Ensure the hostname self-routes exit_code, outs, errs = self.update_etc_hosts_hostname(hostname, ip_addr, op_name) if exit_code != 0: return False, self.build_result_dict(outs, errs, op_name) return self.build_result_dict(outs, errs, op_name) def _yum_configure_repo(self) -> None: """Configures the yum repo. Is a noop for all linux distros except CentOS6, which is EOL""" yum_repo_file = CENTOS6_YUM_REPO repo_file_contents = read_text( 'customer_local_ops.operating_system.resources', 'centos6_base.repo') create_file(yum_repo_file, repo_file_contents) def restart_service(self, service_name: str, is_systemd: bool = False, should_block: bool = True) -> NydusResult: """ CentOS6 implementation of restart_service op that defaults to SysV service :param service_name: Name of the service to be restarted :param is_systemd: Flag indicating if the service to be restarted is systemd unit or not :param should_block: Flag specific to systemd indicating if the we should wait for the completion of the command execution """ return super().restart_service(service_name, is_systemd, should_block) def _enable_qemu_agent(self) -> RunCommandResult: """ Enable the qemu guest agent to start at boot. """ return runCommand(['/sbin/chkconfig', 'qemu-ga', 'on'], 'enable_qemu_agent') class CentOS7(CentOS): OS_VERSION = '7' QEMU_PACKAGE_NAME = 'qemu-guest-agent-2.rpm' def configure_ips(self, *args, **kwargs): super().configure_ip_files(*args, **kwargs) exit_code, outs, errs = runCommand( ['/usr/bin/systemctl', 'restart', 'network'], 'network restart') if exit_code != 0: LOG.error("failed to configure ips: %s (%s)", outs, errs) return False, [outs, errs] return def _yum_configure_repo(self) -> None: """Configures the yum repo. Is a noop for all linux distros except CentOS7, which is EOL""" yum_repo_file = CENTOS6_YUM_REPO # Centos6 and Centos7 has the same file name and path repo_file_contents = read_text( 'customer_local_ops.operating_system.resources', 'centos7_base.repo') create_file(yum_repo_file, repo_file_contents) def configure_mta(self, payload, unused=None, intermediate_result=None): self._yum_configure_repo() return super().configure_mta(payload, unused, intermediate_result) class CentOS8(CentOS): OS_VERSION = '8' QEMU_PACKAGE_NAME = 'qemu-guest-agent-6.rpm' class AlmaLinux8(CentOS8): DISTRO = 'Almalinux' OS_VERSION = '8' QEMU_PACKAGE_NAME = 'qemu-guest-agent-6.rpm' class AlmaLinux9(AlmaLinux8): OS_VERSION = '9' QEMU_PACKAGE_NAME = 'qemu-guest-agent-7.rpm' class Debian(Linux): SYSTEMCTL = '/bin/systemctl' QEMU_PACKAGE_NAME = 'qemu-guest-agent' PANOPTA = 'fm-agent' PANOPTA_CONFIG_FILE = '/etc/fm-agent/fm_agent.cfg' PANOPTA_AGENT_NAME = PANOPTA PANOPTA_STABLE_RELEASE = 'https://repo.fortimonitor.com/deb-stable' PANOPTA_PUBLISHER_URL = 'https://repo.fortimonitor.com/fortimonitor.pub' PANOPTA_MANIFEST_FILE = '/etc/fm-agent-manifest' def __init__(self, package_manager=Apt()): super().__init__(package_manager=package_manager) def remove_file(self, full_file_name): if os.path.exists(full_file_name): LOG.info('Removing: %s', full_file_name) os.unlink(full_file_name) def mount_network_settings(self): data = "" mountpath = "/mnt/cdrom" networkconfig = mountpath + "/openstack/latest/network_data.json" if not os.path.exists(mountpath): os.mkdir(mountpath) command_line = "sudo /bin/mount /dev/sr0 " + mountpath p = subprocess.Popen( # pylint: disable=consider-using-with command_line, shell=True, stdout=subprocess.PIPE) o, e = p.communicate() LOG.info("mount network data drive. stdOut: %s stdErr: %s", o, e) if os.path.isfile(networkconfig): with open(networkconfig, encoding='utf-8') as nc: data = json.load(nc) return data def get_gateway(self, base_cfg): gw = "" for line in base_cfg.split('\n'): if line.strip().startswith('gateway'): gw = line.split()[1] LOG.debug('Gateway: %s', str(gw)) # this is needed due to an update to nocfox ubuntu code. # It should be used as the basis for pulling network configs going forward if str(gw) == "": data = self.mount_network_settings() try: gw = data['networks'][0]['routes'][0]['gateway'] LOG.debug('Updated Gateway: %s', str(gw)) except TypeError: LOG.critical('Could not auto-detect gateway') return str(gw) def get_nameservers(self, base_cfg): ns = "" for line in base_cfg.split('\n'): if line.strip().startswith('dns-nameservers'): ns = line.split()[1:] ns = ' '.join(ns) LOG.debug('nameservers: %s', str(ns)) if str(ns) == "": data = self.mount_network_settings() try: ns = ' '.join([s['address'] for s in data['services'] if s['type'] == 'dns']) LOG.debug('Updated nameservers: %s', str(ns)) except TypeError: LOG.critical('Could not auto-detect nameservers') return ns def configure_ip_files(self, vm_address, addresses, gateway, unused=None): # pylint: disable=R0914 LOG.info("configure_ips: %s, %s, %s", vm_address, addresses, gateway) cfgdir = '/etc/network/interfaces.d' routefile = '/etc/network/if-up.d/update-routes' for oldfile in glob.glob(os.path.join(cfgdir, 'hfs-*.cfg')): self.remove_file(oldfile) # new public config file added on base build self.remove_file(os.path.join(cfgdir, '51-public-ips.cfg')) update_interface_flag = False if os.path.exists(os.path.join(cfgdir, '50-cloud-init.cfg')): with open(os.path.join(cfgdir, '50-cloud-init.cfg'), 'r', encoding='utf-8') as base_cfg_file: base_cfg = base_cfg_file.read() elif os.path.exists('/etc/network/interfaces'): with open('/etc/network/interfaces', 'r', encoding='utf-8') as base_cfg_file: base_cfg = base_cfg_file.read() if str('source /etc/network/interfaces.d/*') not in base_cfg: update_interface_flag = True else: LOG.critical('Default Networking config not found!') base_cfg = '' if update_interface_flag: runCommand(['chattr', '-i', '/etc/network/interfaces'], 'allow write to /etc/network/interfaces', errorOK=True) with open('/etc/network/interfaces', 'a', encoding='utf-8') as base_cfg_file_w: base_cfg_file_w.write('\n'.join(['source /etc/network/interfaces.d/*'])) runCommand(['chattr', '+i', '/etc/network/interfaces'], 'remove write to /etc/network/interfaces', errorOK=True) gw = gateway if not gw: gw = self.get_gateway(base_cfg) LOG.debug('Gateway: %s', str(gw)) ns = self.get_nameservers(base_cfg) self.remove_file(routefile) default_route_created = False for ifnum, ip_address in enumerate(addresses): # we don't need to re-add the default ip as it is not removed if str(ip_address) not in base_cfg: with open(os.path.join(cfgdir, 'hfs-%s.cfg' % str(ip_address)), 'w', encoding='utf-8') as ethcfg: ethcfg.write( '\n'.join( ['auto eth0:%d' % ifnum, 'iface eth0:%d inet static' % ifnum, ' dns-nameservers ' + str(ns), ' address ' + str(ip_address), ' netmask 255.255.255.255', ' up route add -net ' + str(ip_address) + ' netmask 255.255.255.255 dev eth0']) + '\n') LOG.warning('Configured %s to eth0', str(ip_address)) if default_route_created is not True: with open(routefile, 'w', encoding='utf-8') as routecfg: LOG.debug('Writing: %s', routefile) routecfg.write('\n'.join([ '#!/bin/bash', 'ip route del default', 'ip route replace default via ' + str(gw) + ' dev eth0 src ' + str( ip_address) + ' proto static metric 1024', 'exit 0']) + '\n') # chmod 0755 /etc/network/if-up.d/update-routes os.chmod(routefile, 0o755) default_route_created = True UPDATE_ROUTES_PATH = '/etc/network/if-up.d/update-routes' def snapshot_clean(self, payload, unused=None): if not payload['clean']: return # remove public rout file for snapshot. otherwise default route set up by # openstack does not work if os.path.exists(self.UPDATE_ROUTES_PATH): os.unlink(self.UPDATE_ROUTES_PATH) return super().snapshot_clean(payload) def configure_ips(self, *args, **kwargs): self.configure_ip_files(*args, **kwargs) runCommand(['ip', 'addr', 'flush', 'dev', 'eth0'], 'flush old interface data', errorOK=True) exit_code, outs, errs = runCommand( ['/bin/systemctl', 'restart', 'networking'], 'network restart') if exit_code != 0: LOG.error("failed to configure ips: %s (%s)", outs, errs) return False, [outs, errs] return RETRY_INSTALL_PANOPTA_TIMEOUT = 1200 # pylint: disable=invalid-name RETRY_INSTALL_PANOPTA_INTERVAL = 30 # pylint: disable=invalid-name @retry(interval=RETRY_INSTALL_PANOPTA_INTERVAL, timeout=RETRY_INSTALL_PANOPTA_TIMEOUT) def install_panopta(self, payload, *args, **kwargs): """ Create a manifest file for the fm-agent, update the apt-key with fm-agent, update apt-get, and install fm-agent using apt-get :param payload: payload containing customer_key (the key value to set in the manifest file) and template_ids (a csv string of the template ids to assign this server to) :return result of apt-get install operation""" # cleaning panopta-agent keys exit_code, outs, errs = runCommand( ['apt-key', 'del', '61EE28720129F5F3'], 'install {agent}'.format(agent=self.PANOPTA_AGENT_NAME)) sources_file_panopta = '/etc/apt/sources.list' with open(sources_file_panopta, 'r', encoding='utf-8') as source_file: lines = source_file.read() with open(sources_file_panopta, 'w', encoding='utf-8') as source_file: for line in lines.split('\n'): if 'deb http://packages.panopta.com/deb stable main' in line: pass else: source_file.write(line + '\n') sources_file_path = '/etc/apt/sources.list.d' if not os.path.exists(sources_file_path): os.mkdir(sources_file_path) with open(os.path.join(sources_file_path, 'fm_agent.list'), 'w', encoding='utf-8') as source_file: sources_file_addition = '\n'.join(['## Addition to add fm-agent', 'deb [signed-by=/usr/share/keyrings/fortimonitor.pub] ' '{agent_repo} stable main' .format(agent_repo=self.PANOPTA_STABLE_RELEASE)]) source_file.write(sources_file_addition) exit_code, outs, errs = runCommand( "wget -O - {publish_url} | tee /usr/share/keyrings/fortimonitor.pub > /dev/null" .format(publish_url=self.PANOPTA_PUBLISHER_URL), "install {agent}".format(agent=self.PANOPTA_AGENT_NAME), useShell=True) if exit_code != 0: return False, [outs, errs] self._create_panopta_manifest_file(payload) exit_code, outs, errs = runCommand( ['apt-get', 'update', '--fix-missing'], 'install {agent}'.format(agent=self.PANOPTA_AGENT_NAME)) if exit_code != 0: return False, [outs, errs] # debconf is giving some unnecessary errors in the logs, hence setting it as noninteractive cmd = "echo 'debconf debconf/frontend select Noninteractive' | debconf-set-selections" runCommand(cmd, 'install {agent}'.format(agent=self.PANOPTA_AGENT_NAME), useShell=True) exit_code, outs, errs = runCommand( ['apt-get', 'install', '-y', self.PANOPTA_AGENT_NAME], 'install {agent}'.format(agent=self.PANOPTA_AGENT_NAME)) silent_install_error = "Installation of fm_agent had an error" in outs return exit_code == 0 and (not silent_install_error), \ self.build_result_dict(outs, errs, 'install_panopta') def delete_panopta(self, *args, **kwargs): """ delete panopta/fortimonitor from the server and removes the panopta/fm_agent manifest file :return result of apt uninstall operation""" try: _, outs, _ = run_command_pipe( 'dpkg --list | grep panopta-agent', 'check panopta installed') _, outs2, _ = run_command_pipe( 'dpkg --list | grep fm-agent', 'check fm-agent installed') if outs: exit_code, outs, errs = runCommand( ['apt-get', 'purge', '-y', 'panopta-agent'], 'delete panopta') if exit_code != 0: LOG.error('failed to remove panopta agent') return False, self.build_result_dict(outs, errs, 'delete panopta') if outs2: exit_code, outs, errs = runCommand( ['apt-get', 'purge', '-y', 'fm-agent'], 'delete fm-agent') if exit_code != 0: LOG.error('failed to remove fm-agent') return False, self.build_result_dict(outs, errs, 'delete fm-agent') else: exit_code, outs, errs = 0, 'panopta/fm agent is not installed', '' if os.path.exists('/etc/panopta-agent-manifest'): os.unlink('/etc/panopta-agent-manifest') exit_code = 0 outs = outs + '\npanopta agent manifest file removed' if os.path.exists('/etc/fm-agent-manifest'): os.unlink('/etc/fm-agent-manifest') exit_code = 0 outs = outs + '\nfm-agent manifest file removed' except (OSError, IOError) as ex: message = "Failed to unlink panopta/fm agent manifest file" LOG.exception(message) return False, self.build_result_dict('', str(ex), 'delete_panopta') return exit_code == 0, self.build_result_dict(outs, errs, 'delete_panopta') def _check_panopta_installed(self): _, outs, _ = run_command_pipe( 'dpkg --list | grep panopta-agent', 'check panopta installed') return outs def update_etc_hosts_hostname(self, hostname: str, ip_addr: str, op_name: str) -> Tuple[int, str, str]: """Updates the /etc/hosts file with the new hostname for the server :param hostname: The new server hostname :param ip_addr: The IP address for the server :param op_name: The op that is running this function """ exit_code, outs, errs = super().update_etc_hosts_hostname(hostname, ip_addr, op_name) if exit_code != 0: return exit_code, outs, errs hostname_prefix = self.get_hostname_prefix(hostname) # ipv4 localhost local_ip = '127.0.1.1' ip_regex = self.get_ip_regex(local_ip) regex = r'^{ip_regex}.*$/{local_ip} {hostname} {hostname_prefix}'.format( ip_regex=ip_regex, local_ip=local_ip, hostname=hostname, hostname_prefix=hostname_prefix, ) return self.regex_replace(HOSTS_PATH, regex, op_name) def _run_change_password_cmd(self, username: str, password: str, op_name: str) -> Tuple[int, str, str]: """ Run the change password command :param username: The Username :param password: The user password :param op_name: The name of the calling op :return: The change password command """ chpasswd_arg = shlex.quote('%s:%s' % (username, password)) cmd = "echo %s | chpasswd" % chpasswd_arg return runCommand(cmd.encode('utf-8'), op_name, useShell=True, omitString=chpasswd_arg.encode('utf-8')) def _install_python_is_python3(self) -> Tuple[int, str, str]: """ Make sure that the 'python' executable runs python3. No install needed by default. :returns: A runCommand result tuple """ return 0, '', '' def _run_add_user_command(self, username) -> Tuple[int, str, str]: """ Run the add user command (differs per linux flavor) :param username: The username :return: exit code, output, errors """ return runCommand(['useradd', '-m', username, '-s', '/bin/bash'], 'add_user') RETRY_INSTALL_QEMU_AGENT_TIMEOUT = 1200 # pylint: disable=invalid-name RETRY_INSTALL_QEMU_AGENT_INTERVAL = 30 # pylint: disable=invalid-name @retry(interval=RETRY_INSTALL_QEMU_AGENT_INTERVAL, timeout=RETRY_INSTALL_QEMU_AGENT_TIMEOUT) def install_qemu_agent(self, pypi_url: str, *args, **kwargs) -> NydusResult: """ Install qemu guest agent on the vm :param pypi_url: The url for the pypi server where the package is located """ result = self._install(self.QEMU_PACKAGE_NAME) if isinstance(result, Retry): return result exit_code, outs, errs = result if exit_code != 0: if 'Nothing to do' not in errs: return False, self.build_result_dict(outs, errs, 'install_qemu_agent') exit_code, outs, errs = self._enable_qemu_agent() if exit_code != 0: return False, self.build_result_dict(outs, errs, 'enable_qemu_agent') return class Debian8(Debian): SYSTEMCTL = '/bin/systemctl' QEMU_PACKAGE_NAME = 'qemu-guest-agent_2.deb' OS_VERSION = '8' DISTRO = 'Debian' def __init__(self, package_manager=AptEOL('Debian8')): super().__init__(package_manager) def install_qemu_agent(self, pypi_url: str, *args, **kwargs) -> NydusResult: """ Install qemu guest agent on the vm :param pypi_url: The url for the pypi server where the package is located """ package_url = pypi_url + '/-/' + self.DISTRO + '/' + self.OS_VERSION + '/' + self.QEMU_PACKAGE_NAME exit_code, outs, errs = runCommand(['wget', package_url], 'Download qemu_guest_agent') if exit_code != 0: return False, self.build_result_dict(outs, errs, 'download_qemu_agent') exit_code, outs, errs = runCommand( ['dpkg', '-i', self.QEMU_PACKAGE_NAME], 'install_qemu_agent') if exit_code != 0: return False, self.build_result_dict(outs, errs, 'install_qemu_agent') exit_code, outs, errs = self._enable_qemu_agent() if exit_code != 0: return False, self.build_result_dict(outs, errs, 'enable_qemu_agent') return class Debian10(Debian): pass class Debian11(Debian10): def _install_python_is_python3(self) -> Tuple[int, str, str]: """ Make sure that the 'python' executable runs python3 :returns: A runCommand result tuple """ return self._install('python-is-python3') class Debian12(Debian11): pass class Ubuntu1604(Debian): SYSTEMCTL = '/bin/systemctl' RETRY_INSTALL_QEMU_AGENT_TIMEOUT = 1200 # pylint: disable=invalid-name RETRY_INSTALL_QEMU_AGENT_INTERVAL = 30 # pylint: disable=invalid-name def __init__(self, package_manager=AptEOL('Ubuntu1604')): super().__init__(package_manager) @retry(interval=RETRY_INSTALL_QEMU_AGENT_INTERVAL, timeout=RETRY_INSTALL_QEMU_AGENT_TIMEOUT) def install_qemu_agent(self, pypi_url: str, *args, **kwargs) -> NydusResult: """ Install qemu guest agent on the vm :param pypi_url: The url for the pypi server where the package is located """ result = self._install(self.QEMU_PACKAGE_NAME) if isinstance(result, Retry): return result exit_code, outs, errs = result if exit_code != 0: if 'Nothing to do' not in errs: return False, self.build_result_dict(outs, errs, 'install_qemu_agent') exit_code, outs, errs = self._enable_qemu_agent() if exit_code != 0: return False, self.build_result_dict(outs, errs, 'enable_qemu_agent') return class Ubuntu2004(Debian): def _install_python_is_python3(self) -> Tuple[int, str, str]: """ Make sure that the 'python' executable runs python3 :returns: A runCommand result tuple """ return self._install('python-is-python3') class Ubuntu2204(Ubuntu2004): pass class Ubuntu2404(Ubuntu2204): pass