Source code for sos.policies

from __future__ import with_statement

import os
import re
import platform
import time
import fnmatch
import tempfile
from os import environ

from sos.utilities import (ImporterHelper,
                           import_module,
                           shell_out)
from sos.plugins import IndependentPlugin, ExperimentalPlugin
from sos import _sos as _
from textwrap import fill
from six import print_
from six.moves import input


[docs]def import_policy(name): policy_fqname = "sos.policies.%s" % name try: return import_module(policy_fqname, Policy) except ImportError: return None
[docs]def load(cache={}, sysroot=None): if 'policy' in cache: return cache.get('policy') import sos.policies helper = ImporterHelper(sos.policies) for module in helper.get_modules(): for policy in import_policy(module): if policy.check(): cache['policy'] = policy(sysroot=sysroot) if 'policy' not in cache: cache['policy'] = GenericPolicy() return cache['policy']
[docs]class PackageManager(object): """Encapsulates a package manager. If you provide a query_command to the constructor it should print each package on the system in the following format:: package name|package.version You may also subclass this class and provide a get_pkg_list method to build the list of packages and versions. """ query_command = None verify_command = None verify_filter = None chroot = None def __init__(self, chroot=None, query_command=None, verify_command=None, verify_filter=None): self.packages = {} self.query_command = query_command if query_command else None self.verify_command = verify_command if verify_command else None self.verify_filter = verify_filter if verify_filter else None if chroot: self.chroot = chroot
[docs] def all_pkgs_by_name(self, name): """ Return a list of packages that match name. """ return fnmatch.filter(self.all_pkgs().keys(), name)
[docs] def all_pkgs_by_name_regex(self, regex_name, flags=0): """ Return a list of packages that match regex_name. """ reg = re.compile(regex_name, flags) return [pkg for pkg in self.all_pkgs().keys() if reg.match(pkg)]
[docs] def pkg_by_name(self, name): """ Return a single package that matches name. """ pkgmatches = self.all_pkgs_by_name(name) if (len(pkgmatches) != 0): return self.all_pkgs_by_name(name)[-1] else: return None
[docs] def get_pkg_list(self): """Returns a dictionary of packages in the following format:: {'package_name': {'name': 'package_name', 'version': 'major.minor.version'}} """ if self.query_command: cmd = self.query_command pkg_list = shell_out( cmd, timeout=0, chroot=self.chroot ).splitlines() for pkg in pkg_list: if '|' not in pkg: continue name, version = pkg.split("|") self.packages[name] = { 'name': name, 'version': version.split(".") } return self.packages
[docs] def all_pkgs(self): """ Return a list of all packages. """ if not self.packages: self.packages = self.get_pkg_list() return self.packages
[docs] def pkg_nvra(self, pkg): fields = pkg.split("-") version, release, arch = fields[-3:] name = "-".join(fields[:-3]) return (name, version, release, arch)
[docs] def build_verify_command(self, packages): """build_verify_command(self, packages) -> str Generate a command to verify the list of packages given in ``packages`` using the native package manager's verification tool. The command to be executed is returned as a string that may be passed to a command execution routine (for e.g. ``sos_get_command_output()``. :param packages: a string, or a list of strings giving package names to be verified. :returns: a string containing an executable command that will perform verification of the given packages. :returntype: str or ``NoneType`` """ if not self.verify_command: return None by_regex = self.all_pkgs_by_name_regex verify_list = map(by_regex, packages) # No packages after regex match? if not verify_list: return None verify_packages = "" for package_list in verify_list: for package in package_list: if any([f in package for f in self.verify_filter]): continue if len(verify_packages): verify_packages += " " verify_packages += package return self.verify_command + " " + verify_packages
[docs]class Policy(object): msg = _("""\ This command will collect system configuration and diagnostic information \ from this %(distro)s system. An archive containing the collected information \ will be generated in %(tmpdir)s. For more information on %(vendor)s visit: %(vendor_url)s The generated archive may contain data considered sensitive and its content \ should be reviewed by the originating organization before being passed to \ any third party. No changes will be made to system configuration. %(vendor_text)s """) distro = "Unknown" vendor = "Unknown" vendor_url = "http://www.example.com/" vendor_text = "" PATH = "" _in_container = False _host_sysroot = '/' def __init__(self, sysroot=None): """Subclasses that choose to override this initializer should call super() to ensure that they get the required platform bits attached. super(SubClass, self).__init__(). Policies that require runtime tests to construct PATH must call self.set_exec_path() after modifying PATH in their own initializer.""" self._parse_uname() self.report_name = self.hostname self.case_id = None self.package_manager = PackageManager() self._valid_subclasses = [] self.set_exec_path() self._host_sysroot = sysroot
[docs] def get_valid_subclasses(self): return [IndependentPlugin] + self._valid_subclasses
[docs] def set_valid_subclasses(self, subclasses): self._valid_subclasses = subclasses
[docs] def del_valid_subclasses(self): del self._valid_subclasses
valid_subclasses = property(get_valid_subclasses, set_valid_subclasses, del_valid_subclasses, "list of subclasses that this policy can " "process")
[docs] def check(self): """ This function is responsible for determining if the underlying system is supported by this policy. """ return False
[docs] def in_container(self): """ Returns True if sos is running inside a container environment. """ return self._in_container
[docs] def host_sysroot(self): return self._host_sysroot
[docs] def dist_version(self): """ Return the OS version """ pass
[docs] def get_preferred_archive(self): """ Return the class object of the prefered archive format for this platform """ from sos.archive import TarFileArchive return TarFileArchive
[docs] def get_archive_name(self): """ This function should return the filename of the archive without the extension. """ if self.case_id: self.report_name += "." + self.case_id return "sosreport-%s-%s" % (self.report_name, time.strftime("%Y%m%d%H%M%S"))
[docs] def get_tmp_dir(self, opt_tmp_dir): if not opt_tmp_dir: return tempfile.gettempdir() return opt_tmp_dir
[docs] def match_plugin(self, plugin_classes): if len(plugin_classes) > 1: for p in plugin_classes: # Give preference to the first listed tagging class # so that e.g. UbuntuPlugin is chosen over DebianPlugin # on an Ubuntu installation. if issubclass(p, self.valid_subclasses[0]): return p return plugin_classes[0]
[docs] def validate_plugin(self, plugin_class, experimental=False): """ Verifies that the plugin_class should execute under this policy """ valid_subclasses = [IndependentPlugin] + self.valid_subclasses if experimental: valid_subclasses += [ExperimentalPlugin] return any(issubclass(plugin_class, class_) for class_ in valid_subclasses)
[docs] def pre_work(self): """ This function is called prior to collection. """ pass
[docs] def post_work(self): """ This function is called after the sosreport has been generated. """ pass
[docs] def pkg_by_name(self, pkg): return self.package_manager.pkg_by_name(pkg)
def _parse_uname(self): (system, node, release, version, machine, processor) = platform.uname() self.system = system self.hostname = node self.release = release self.smp = version.split()[1] == "SMP" self.machine = machine
[docs] def set_commons(self, commons): self.commons = commons
def _set_PATH(self, path): environ['PATH'] = path
[docs] def set_exec_path(self): self._set_PATH(self.PATH)
[docs] def is_root(self): """This method should return true if the user calling the script is considered to be a superuser""" return (os.getuid() == 0)
[docs] def get_preferred_hash_name(self): """Returns the string name of the hashlib-supported checksum algorithm to use""" return "md5"
[docs] def display_results(self, archive, directory, checksum): # Display results is called from the tail of SoSReport.final_work() # # Logging is already shutdown and all terminal output must use the # print() call. # make sure a report exists if not archive and not directory: return False self._print() if archive: self._print(_("Your sosreport has been generated and saved " "in:\n %s") % archive) else: self._print(_("sosreport build tree is located at : %s" % directory)) self._print() if checksum: self._print(_("The checksum is: ") + checksum) self._print() self._print(_("Please send this file to your support " "representative.")) self._print()
def _print(self, msg=None): """A wrapper around print that only prints if we are not running in quiet mode""" if not self.commons['cmdlineopts'].quiet: if msg: print_(msg) else: print_()
[docs] def get_msg(self): """This method is used to prepare the preamble text to display to the user in non-batch mode. If your policy sets self.distro that text will be substituted accordingly. You can also override this method to do something more complicated.""" width = 72 _msg = self.msg % {'distro': self.distro, 'vendor': self.vendor, 'vendor_url': self.vendor_url, 'vendor_text': self.vendor_text, 'tmpdir': self.commons['tmpdir']} _fmt = "" for line in _msg.splitlines(): _fmt = _fmt + fill(line, width, replace_whitespace=False) + '\n' return _fmt
[docs]class GenericPolicy(Policy): """This Policy will be returned if no other policy can be loaded. This should allow for IndependentPlugins to be executed on any system"""
[docs] def get_msg(self): return self.msg % {'distro': self.system}
[docs]class LinuxPolicy(Policy): """This policy is meant to be an abc class that provides common implementations used in Linux distros""" distro = "Linux" vendor = "None" PATH = "/bin:/sbin:/usr/bin:/usr/sbin" _preferred_hash_name = None def __init__(self, sysroot=None): super(LinuxPolicy, self).__init__(sysroot=sysroot)
[docs] def get_preferred_hash_name(self): if self._preferred_hash_name: return self._preferred_hash_name checksum = "md5" try: fp = open("/proc/sys/crypto/fips_enabled", "r") except: self._preferred_hash_name = checksum return checksum fips_enabled = fp.read() if fips_enabled.find("1") >= 0: checksum = "sha256" fp.close() self._preferred_hash_name = checksum return checksum
[docs] def default_runlevel(self): try: with open("/etc/inittab") as fp: pattern = r"id:(\d{1}):initdefault:" text = fp.read() return int(re.findall(pattern, text)[0]) except: return 3
[docs] def kernel_version(self): return self.release
[docs] def host_name(self): return self.hostname
[docs] def is_kernel_smp(self): return self.smp
[docs] def get_arch(self): return self.machine
[docs] def get_local_name(self): """Returns the name usd in the pre_work step""" return self.host_name()
[docs] def sanitize_report_name(self, report_name): return re.sub(r"[^-a-zA-Z.0-9]", "", report_name)
[docs] def sanitize_case_id(self, case_id): return re.sub(r"[^-a-z,A-Z.0-9]", "", case_id)
[docs] def pre_work(self): # this method will be called before the gathering begins cmdline_opts = self.commons['cmdlineopts'] customer_name = cmdline_opts.customer_name localname = customer_name if customer_name else self.get_local_name() caseid = cmdline_opts.case_id if cmdline_opts.case_id else "" if not cmdline_opts.batch and not \ cmdline_opts.quiet: try: self.report_name = input(_("Please enter your first initial " "and last name [%s]: ") % localname) self.case_id = input(_("Please enter the case id " "that you are generating this " "report for [%s]: ") % caseid) self._print() except KeyboardInterrupt: self._print() raise if len(self.report_name) == 0: self.report_name = localname if customer_name: self.report_name = customer_name if cmdline_opts.case_id: self.case_id = cmdline_opts.case_id self.report_name = self.sanitize_report_name(self.report_name) if self.case_id: self.case_id = self.sanitize_case_id(self.case_id) if (self.report_name == ""): self.report_name = "default" return
# vim: set et ts=4 sw=4 :