# download.py, supplies the 'download' command. # # Copyright (C) 2013-2015 Red Hat, Inc. # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # the GNU General Public License v.2, or (at your option) any later version. # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY expressed or implied, including the implied warranties of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General # Public License for more details. You should have received a copy of the # GNU General Public License along with this program; if not, write to the # Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # 02110-1301, USA. Any Red Hat trademarks that are incorporated in the # source code or documentation are not subject to the GNU General Public # License and may only be used or replicated with the express permission of # Red Hat, Inc. # from __future__ import absolute_import from __future__ import unicode_literals from dnfpluginscore import _, logger from dnf.cli.option_parser import OptionParser import dnf import dnf.cli import dnf.exceptions import dnf.i18n import dnf.subject import dnf.util import hawkey import itertools import os import shutil @dnf.plugin.register_command class DownloadCommand(dnf.cli.Command): aliases = ['download'] summary = _('Download package to current directory') def __init__(self, cli): super(DownloadCommand, self).__init__(cli) self.opts = None self.parser = None @staticmethod def set_argparser(parser): parser.add_argument('packages', nargs='+', help=_('packages to download')) parser.add_argument("--source", action='store_true', help=_('download the src.rpm instead')) parser.add_argument("--debuginfo", action='store_true', help=_('download the -debuginfo package instead')) parser.add_argument("--debugsource", action='store_true', help=_('download the -debugsource package instead')) parser.add_argument("--arch", '--archlist', dest='arches', default=[], action=OptionParser._SplitCallback, metavar='[arch]', help=_("limit the query to packages of given architectures.")) parser.add_argument('--resolve', action='store_true', help=_('resolve and download needed dependencies')) parser.add_argument('--alldeps', action='store_true', help=_('when running with --resolve, download all dependencies ' '(do not exclude already installed ones)')) parser.add_argument('--url', '--urls', action='store_true', dest='url', help=_('print list of urls where the rpms ' 'can be downloaded instead of downloading')) parser.add_argument('--urlprotocols', action='append', choices=['http', 'https', 'rsync', 'ftp'], default=[], help=_('when running with --url, ' 'limit to specific protocols')) def configure(self): # setup sack and populate it with enabled repos demands = self.cli.demands demands.sack_activation = True demands.available_repos = True if self.opts.resolve and self.opts.alldeps: demands.load_system_repo = False if self.opts.source: self.base.repos.enable_source_repos() if self.opts.debuginfo or self.opts.debugsource: self.base.repos.enable_debug_repos() if self.opts.destdir: self.base.conf.destdir = self.opts.destdir else: self.base.conf.destdir = dnf.i18n.ucd(os.getcwd()) def run(self): """Execute the util action here.""" if (not self.opts.source and not self.opts.debuginfo and not self.opts.debugsource): pkgs = self._get_pkg_objs_rpms(self.opts.packages) else: pkgs = [] if self.opts.source: pkgs.extend(self._get_pkg_objs_source(self.opts.packages)) if self.opts.debuginfo: pkgs.extend(self._get_pkg_objs_debuginfo(self.opts.packages)) if self.opts.debugsource: pkgs.extend(self._get_pkg_objs_debugsource(self.opts.packages)) # If user asked for just urls then print them and we're done if self.opts.url: for pkg in pkgs: # command line repo packages do not have .remote_location if pkg.repoid != hawkey.CMDLINE_REPO_NAME: url = pkg.remote_location(schemes=self.opts.urlprotocols) if url: print(url) else: msg = _("Failed to get mirror for package: %s") % pkg.name if self.base.conf.strict: raise dnf.exceptions.Error(msg) logger.warning(msg) return else: self._do_downloads(pkgs) # download rpms def _do_downloads(self, pkgs): """ Perform the download for a list of packages """ pkg_dict = {} for pkg in pkgs: pkg_dict.setdefault(str(pkg), []).append(pkg) to_download = [] cmdline = [] for pkg_list in pkg_dict.values(): pkgs_cmdline = [pkg for pkg in pkg_list if pkg.repoid == hawkey.CMDLINE_REPO_NAME] if pkgs_cmdline: cmdline.append(pkgs_cmdline[0]) continue pkg_list.sort(key=lambda x: (x.repo.priority, x.repo.cost)) to_download.append(pkg_list[0]) if to_download: self.base.download_packages(to_download, self.base.output.progress) if cmdline: # command line repo packages are either local files or already downloaded urls # just copy them to the destination for pkg in cmdline: # python<3.4 shutil module does not raise SameFileError, check manually src = pkg.localPkg() dst = os.path.join(self.base.conf.destdir, os.path.basename(src)) if os.path.exists(dst) and os.path.samefile(src, dst): continue shutil.copy(src, self.base.conf.destdir) locations = sorted([pkg.localPkg() for pkg in to_download + cmdline]) return locations def _get_pkg_objs_rpms(self, pkg_specs): """ Return a list of dnf.Package objects that represent the rpms to download. """ if self.opts.resolve: pkgs = self._get_packages_with_deps(pkg_specs) else: pkgs = self._get_packages(pkg_specs) return pkgs def _get_pkg_objs_source(self, pkg_specs): """ Return a list of dnf.Package objects that represent the source rpms to download. """ pkgs = self._get_pkg_objs_rpms(pkg_specs) source_pkgs = self._get_source_packages(pkgs) pkgs = set(self._get_packages(source_pkgs, source=True)) return pkgs def _get_pkg_objs_debuginfo(self, pkg_specs): """ Return a list of dnf.Package objects that represent the debuginfo rpms to download. """ dbg_pkgs = set() q = self.base.sack.query().available() for pkg in self._get_packages(pkg_specs): for dbg_name in [pkg.debug_name, pkg.source_debug_name]: dbg_available = q.filter( name=dbg_name, epoch=int(pkg.epoch), version=pkg.version, release=pkg.release, arch=pkg.arch ) if not dbg_available: continue for p in dbg_available: dbg_pkgs.add(p) break return dbg_pkgs def _get_pkg_objs_debugsource(self, pkg_specs): """ Return a list of dnf.Package objects that represent the debugsource rpms to download. """ dbg_pkgs = set() q = self.base.sack.query().available() for pkg in self._get_packages(pkg_specs): dbg_available = q.filter( name=pkg.debugsource_name, epoch=int(pkg.epoch), version=pkg.version, release=pkg.release, arch=pkg.arch ) for p in dbg_available: dbg_pkgs.add(p) return dbg_pkgs def _get_packages(self, pkg_specs, source=False): """Get packages matching pkg_specs.""" func = self._get_query_source if source else self._get_query queries = [] for pkg_spec in pkg_specs: try: queries.append(func(pkg_spec)) except dnf.exceptions.PackageNotFoundError as e: logger.error(dnf.i18n.ucd(e)) if self.base.conf.strict: logger.error(_("Exiting due to strict setting.")) raise dnf.exceptions.Error(e) pkgs = list(itertools.chain(*queries)) return pkgs def _get_packages_with_deps(self, pkg_specs, source=False): """Get packages matching pkg_specs and the deps.""" pkgs = self._get_packages(pkg_specs) pkg_set = set(pkgs) for pkg in pkgs: goal = hawkey.Goal(self.base.sack) goal.install(pkg) rc = goal.run() if rc: pkg_set.update(goal.list_installs()) pkg_set.update(goal.list_upgrades()) else: msg = [_('Error in resolve of packages:')] logger.error("\n ".join(msg + [str(pkg) for pkg in pkgs])) logger.error(dnf.util._format_resolve_problems(goal.problem_rules())) raise dnf.exceptions.Error() return pkg_set @staticmethod def _get_source_packages(pkgs): """Get list of source rpm names for a list of packages.""" source_pkgs = set() for pkg in pkgs: if pkg.sourcerpm: source_pkgs.add(pkg.sourcerpm) logger.debug(' --> Package : %s Source : %s', str(pkg), pkg.sourcerpm) elif pkg.arch == 'src': source_pkgs.add("%s-%s.src.rpm" % (pkg.name, pkg.evr)) else: logger.info(_("No source rpm defined for %s"), str(pkg)) return list(source_pkgs) def _get_query(self, pkg_spec): """Return a query to match a pkg_spec.""" schemes = dnf.pycomp.urlparse.urlparse(pkg_spec)[0] is_url = schemes and schemes in ('http', 'ftp', 'file', 'https') if is_url or (pkg_spec.endswith('.rpm') and os.path.isfile(pkg_spec)): pkgs = self.base.add_remote_rpms([pkg_spec], progress=self.base.output.progress) return self.base.sack.query().filterm(pkg=pkgs) subj = dnf.subject.Subject(pkg_spec) q = subj.get_best_query(self.base.sack, with_src=self.opts.source) q = q.available() q = q.filterm(latest_per_arch_by_priority=True) if self.opts.arches: q = q.filter(arch=self.opts.arches) if len(q.run()) == 0: msg = _("No package %s available.") % (pkg_spec) raise dnf.exceptions.PackageNotFoundError(msg) return q def _get_query_source(self, pkg_spec): """Return a query to match a source rpm file name.""" pkg_spec = pkg_spec[:-4] # skip the .rpm subj = dnf.subject.Subject(pkg_spec) for nevra_obj in subj.get_nevra_possibilities(): tmp_query = nevra_obj.to_query(self.base.sack).available() if tmp_query: return tmp_query.latest() msg = _("No package %s available.") % (pkg_spec) raise dnf.exceptions.PackageNotFoundError(msg)