Файловый менеджер - Редактировать - /var/opt/nydus/ops/customer_local_ops/operating_system/package_manager.py
Назад
from typing import Dict, List, Optional import os from shutil import rmtree from pathlib import Path from importlib_resources import read_text from customer_local_ops.util.helpers import create_file from customer_local_ops.util.execute import runCommand, RunCommandResult APT_PUBLIC_KEYS = [ '61EE28720129F5F3' # http://packages.panopta.com/deb ] class PackageManager: # List of command-line arguments to install packages. # Packages to install are appended when command is run. # See install(). INSTALL_ARGS = None # type: List[str] def _env(self) -> Optional[Dict[str, str]]: """Environment variables to send to subprocess. This version copies the environment from the parent process. """ return None def install(self, *packages: str, tag: str = None) -> RunCommandResult: """Install one or more packages. :param *pkg_spec: one or more specifications of packages to install :param tag: optional identifier included in logs :returns: result of installation command as (exit_code, stdout, stderr) """ if not packages: raise ValueError('At least one package is required') cmd = self.INSTALL_ARGS + list(packages) if tag is None: tag = 'install %s' % str(packages) return runCommand(cmd, tag, env=self._env()) def update_indices(self) -> RunCommandResult: """Update local package indices from remote sources. Not all package managers need this, so this version does nothing and returns a fake success result. :returns: result of update command as (exit_code, stdout, stderr) """ return 0, '', '' class Apt(PackageManager): INSTALL_ARGS = ['apt-get', 'install', '-y'] def _env(self) -> Optional[Dict[str, str]]: """Environment variables to send to subprocess. We set DEBIAN_FRONTEND to ensure user is not prompted to answer any questions during installation. """ env = os.environ.copy() env['DEBIAN_FRONTEND'] = 'noninteractive' env['APT_KEY_DONT_WARN_ON_DANGEROUS_USAGE'] = '1' return env def update_indices(self) -> RunCommandResult: for key in APT_PUBLIC_KEYS: cmd = ['apt-key', 'adv', '--keyserver', 'hkp://keyserver.ubuntu.com:80', '--recv-keys', key] runCommand(cmd, 'apt-key get public keys', env=self._env()) cmd = ['apt-get', 'update', '-y', '-o', 'Acquire::ForceIPv4=true'] return runCommand(cmd, 'apt-get update', env=self._env()) class AptEOL(Apt): TMP_DIRS = [ '/tmp/tmp_apt', '/tmp/tmp_apt/source.list.d', '/tmp/tmp_apt/tmp_apt_cache', '/tmp/tmp_apt/tmp_apt_lists' ] TMP_SOURCE_LIST = '/tmp/tmp_apt/source.list' LINUX_EOL_DISTROS = ['Debian8', 'Ubuntu1604'] APT_CONF_PATH = '/etc/apt/apt.conf' RES_PACKAGE = 'customer_local_ops.operating_system.resources' def __init__(self, os_name: str): if os_name not in self.LINUX_EOL_DISTROS: raise ValueError('Invalid Linux distributive name was provided - %s.' % os_name) self.os_name = os_name def _add_tmp_source_list_dirs(self) -> None: """ Add folder structure for temporary APT config \n Fill source.list file with mirrors for provided Linux distributive :returns: None """ for dir_path in self.TMP_DIRS: Path(dir_path).mkdir(parents=True, exist_ok=True) mirrors_list = self.os_name.lower() + '_mirrors.repo' mirrors_list_content = read_text( self.RES_PACKAGE, mirrors_list) create_file(self.TMP_SOURCE_LIST, mirrors_list_content) def _add_tmp_apt_config(self) -> None: """ Fill apt.conf file with a new config :returns: None """ apt_config_content = read_text( self.RES_PACKAGE, 'tmp_apt_config.repo') create_file(self.APT_CONF_PATH, apt_config_content) def _clear_apt_conf(self) -> None: """ Delete apt.conf file :returns: None """ os.remove(self.APT_CONF_PATH) def _clear_tmp_apt_folders(self) -> None: """ Delete all tmp folders used for apt update :returns: None """ rmtree(self.TMP_DIRS[0]) def update_indices(self) -> RunCommandResult: """ Run both _add_tmp_source_list_dirs and _add_tmp_apt_config methods :returns: run update_indices method for super class if no errors; otherwise raise RuntimeError """ try: self._add_tmp_source_list_dirs() self._add_tmp_apt_config() except (PermissionError, IOError, OSError) as ex: raise RuntimeError('An error occurred during file operation with tmp source list: ' + str(ex)) from ex return super().update_indices() def install(self, *packages: str, tag: str = None) -> RunCommandResult: result = super().install(*packages, tag=tag) try: self._clear_apt_conf() self._clear_tmp_apt_folders() except (PermissionError, IOError, OSError) as ex: raise RuntimeError('An error occurred during clear apt.conf file: ' + str(ex)) from ex return result class Yum(PackageManager): INSTALL_ARGS = ['yum', 'install', '-y']
| ver. 1.4 |
Github
|
.
| PHP 8.0.30 | Генерация страницы: 0 |
proxy
|
phpinfo
|
Настройка