__init__.py000064400000000000147205126350006650 0ustar00cacheutils.py000064400000002572147205126350007255 0ustar00# -*- coding: utf-8 -*- from functools import lru_cache, wraps from datetime import datetime, timedelta DEFAULT_MAXSIZE = 128 def memoize(obj): """A general-use memoizer decorator for class, object, function; supports kwargs""" cache = obj.cache = {} @wraps(obj) def memoizer(*args, **kwargs): """Actual implementation""" key = str(args) + str(kwargs) if key not in cache: cache[key] = obj(*args, **kwargs) return cache[key] return memoizer def timed_lru_cache(seconds: int, maxsize: int = DEFAULT_MAXSIZE): """A decorator that wraps lru_cache and allows the setting of a lifetime in seconds, after which the cache will expire :param seconds: The number of seconds after which the cache will expire :param maxsize: The maximum number of entries in the cache before it will start dropping old entries """ def wrapper_cache(func): func = lru_cache(maxsize=maxsize)(func) func.lifetime = timedelta(seconds=seconds) func.expiration = datetime.utcnow() + func.lifetime @wraps(func) def wrapped_func(*args, **kwargs): if datetime.utcnow() >= func.expiration: func.cache_clear() func.expiration = datetime.utcnow() + func.lifetime return func(*args, **kwargs) return wrapped_func return wrapper_cache config.py000064400000007155147205126350006400 0ustar00# -*- coding: utf-8 -*- from configparser import ConfigParser from typing import Any, Callable, Dict, List, Optional # pylint: disable=W0611 class Config: """Base configuration object.""" def get(self, key: str, default: Any = None) -> Any: """ Get the config option as a string :param key: config option name :param default: default value if no value exists in the config :return: option value """ raise NotImplementedError(__name__ + '.get()') def get_bool(self, key: str, default: Optional[bool] = None) -> bool: """ Get the config option as a boolean :param key: config option name :param default: default value if no value exists in the config :return: option value """ return bool(self.get(key, default)) def get_int(self, key: str, default: Optional[int] = None) -> int: """ Get the config option as an integer :param key: config option name :param default: default value if no value exists in the config :return: option value """ return int(self.get(key, default)) def get_float(self, key: str, default: Optional[float] = None) -> float: """ Get the config option as a float :param key: config option name :param default: default value if no value exists in the config :return: option value """ return float(self.get(key, default)) class FileConfig(Config): """Config class that loads configuration options from the locally-cached ZooKeeper config""" DEFAULT_FILENAME = '/var/cache/hfs/config.ini' # type: str default_section = 'config-default' # type: str prefixes = [] # type: List[str] def __init__( self, service_prefix: Optional[str] = None, filename: Optional[str] = None, defaults: Optional[Dict[str, Any]] = None) -> None: if filename is None: filename = self.DEFAULT_FILENAME self.config = ConfigParser(default_section=self.default_section, interpolation=None) self.config.read(filename) self.prefixes = [] if service_prefix: prefix_parts = service_prefix.split('.') while prefix_parts: self.prefixes.append('.'.join(prefix_parts) + '.') prefix_parts.pop() self.prefixes.append('') self.defaults = defaults def __getattr__(self, key: str) -> Any: """ Get a config key value as a direct attribute of this class. :param key: The config key name :return: The config key value, if it exists, or None """ return self.get(key) def _get(self, getfunc: Callable, key: str, default: Any = None) -> Any: if default is None and self.defaults: default = self.defaults.get(key) for prefix in self.prefixes: prefixed_key = prefix + key val = getfunc(self.default_section, prefixed_key, fallback=None) if val: break else: val = default return val def get(self, key: str, default: Any = None) -> Any: return self._get(self.config.get, key, default) def get_bool(self, key: str, default: Optional[bool] = None) -> bool: return self._get(self.config.getboolean, key, default) def get_int(self, key: str, default: Optional[int] = None) -> int: return self._get(self.config.getint, key, default) def get_float(self, key: str, default: Optional[float] = None) -> float: return self._get(self.config.getfloat, key, default) constants.py000064400000002427147205126350007144 0ustar00# -*- coding: utf-8 -*- """This module holds constants needed for primordial.""" try: import fcntl except ImportError: pass # Windows import ipaddress import struct import socket CANONICAL_TIMESTRING_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' "Standard format for time strings." CANONICAL_TIMESTRING_FORMAT_NO_ZULU = '%Y-%m-%dT%H:%M:%S.%f' "Standard format for time strings without a trailing Z." CANONICAL_TIMESTRING_FORMAT_NO_MICROSECONDS = '%Y-%m-%dT%H:%M:%SZ' "Strandard format for time strings without microseconds." def getIP() -> str: """Get the IP address of this machine.""" try: ips = [ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")] if ips: for ip in ips: if ipaddress.ip_address(str(ip)).is_private: return ip return ips[0] except socket.gaierror: pass try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) ipstr = struct.pack('256s'.encode('ascii'), 'eth0'.encode('ascii')) return socket.inet_ntoa( fcntl.ioctl(s.fileno(), 0x8915, ipstr)[20:24]) except (IOError, RuntimeError, socket.gaierror): return '127.0.0.1' IP = getIP() "The IP address of this machine on which this is running." context.py000064400000012300147205126350006603 0ustar00import threading from contextlib import contextmanager from typing import Any, Optional, Mapping class Context: ''' Context instances can be used to hold state and values during the execution life cycle of a python application without needing to change the existing API of various interacting classes. State/values can be updated in one part of the application and be visible in another part of the application without requiring to pass this state thru the various methods of the call stack. Context also supports maintenance of global and thread specific state which can be very useful in a multi-threaded applications like Flask/Falcon based web applications or the NES Journal reader. State stored at the global level is visible in every concurrently executed thread, while thread specific state is isolated to the corresponding thread of execution. This can useful again in a multi-threaded application like a web-app where each incoming request is processed by a separate thread and things like request headers, authentication user context is thread specific and isolated to the thread handling a particular request. Example usage: -- In the main thread of an application's start up code we might want to inject some global state like so. # In some start-up file called app.py from primordial.context import Context MY_APP_CTX = Context() # Instantiate some shared object jwt_fetcher = JWTFetcher(some_config) MY_APP_CTX.set_global('jwt_fetcher', jwt_fetcher) # In a thread that's handling a particular HTTP request # handle_req.py from app import MY_APP_CTX MY_APP_CTX.user = User() MY_APP_CTX.token = copy_token_from_header() # In a third file somewhere down the line of request processing # some_file_called_by_controller.py from app import MY_APP_CTX def some_func(): # All of these are magically available. # some_func's function signature didn't require to be changed MY_APP_CTX.jwt_fetcher.get() MY_APP_CTX.user.name == 'jDoe' MY_APP_CTX.token.is_valid() ''' def __init__(self): self._global = {} self._local = threading.local() def __getattr__(self, name: str) -> Any: try: return getattr(self._local, name) except AttributeError: if name in self._global: return self._global[name] raise def __setattr__(self, name: str, value: Any): if name in ('_global', '_local'): return super().__setattr__(name, value) setattr(self._local, name, value) def __delattr__(self, name: str): try: delattr(self._local, name) except AttributeError: if name not in self._global: raise del self._global[name] def set_global(self, name: str, value: Any): self._global[name] = value def unset_global(self, name: str): self._global.pop(name) CTX = Context() @contextmanager def make_context(local_vals: Optional[Mapping[str, Any]] = None, global_vals: Optional[Mapping[str, Any]] = None, ctx: Optional[Context] = None): ''' Python context-manager for managing the life-cycle of state stored in a context. This context manager allows for state to be both stored in a context and also cleans up this state when the context-manager is exited. Usage: # In some python module file1.py from primordial import Context ctx = Context() # In some python module file2.py from threading import Thread from primordial import make_context from file1 import ctx from file3 import fn3 def fn2(): global_vals = {'v1': 'abc', v2: 'def'} # Set some global values with make_context(global_vals=global_value, ctx): # Kick of fn3 in a new thread t1 = Thread(target=fn3, args=[]) t1.start() t1.join() fn2() # In some python module file3.py from primordial import make_context from file1 import ctx from file4 import fn4 def fn3(): # v2 will shadow the value that was set globally local_vals = {'v3': 'ghi', v2: 'jkl'} # Set some thread specific values # Once this function returns, ctx.v3 and ctx.v2 are not available for access with make_context(local_vals=local_value, ctx): fn4() # We can still access the globally set state here even after the above context manager # has exited. ctx.v1 ctx.v2 # The globally set v2 # In some python module file3.py from file1 import ctx def fn4(): # All of the accesses are valid ctx.v1 ctx.v2 # This accesses the local thread specific v2 ctx.v3 ''' ctx = ctx if ctx else CTX local_vals = local_vals if local_vals else {} global_vals = global_vals if global_vals else {} for k, v in local_vals.items(): setattr(ctx, k, v) for k, v in global_vals.items(): ctx.set_global(k, v) try: yield ctx finally: for k in local_vals: delattr(ctx, k) for k in global_vals: ctx.unset_global(k) encryptor.py000064400000026247147205126350007163 0ustar00# -*- coding: utf-8 -*- import hashlib import json import os from base64 import b64decode, b64encode from pathlib import Path from typing import AnyStr, Optional, Union, cast from cryptography import x509 from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric.padding import MGF1, OAEP from oscrypto.symmetric import aes_cbc_pkcs7_encrypt, aes_cbc_pkcs7_decrypt def _make_bytes(data: AnyStr) -> bytes: """ Convert a Python `str` object to a `bytes` object. If the parameter is already a `bytes` object, return it unmodified. :param data: The object to be converted :return: The converted object, or the original object if it was not a `str` object """ if isinstance(data, str): data = data.encode("utf-8") # type: ignore return cast(bytes, data) def load_key_content(key_path: Union[Path, str]) -> bytes: """ Convenience function to load the content of a key or cert file and return its contents. :param key_path: Path to the key/cert file to be loaded :return: The file contents as a bytes object :raises: ValueError if the key_path parameter is None of doesn't point to an existing file """ if key_path is None: raise ValueError("key_path parameter cannot be None") if isinstance(key_path, str): key_path = Path(key_path) if not key_path.is_file(): raise ValueError("key path '{key_path}' does not exist".format(key_path=key_path)) return key_path.read_bytes() class Encryptor: __iv_size = 16 # iv -> Initialization Vector @classmethod def decrypt(cls, encrypted_data: AnyStr, secret_key: AnyStr) -> str: """ Decrypt encrypted data using the PKCS7 symmetric decryption algorithm :param encrypted_data: Base-64 encoded byte array containing encrypted data, which is a combination of the salt and the actual data :param secret_key: Secret value used to generate an encryption key :return: Decrypted, plain text value """ decoded_data = b64decode(_make_bytes(encrypted_data)) iv = decoded_data[:cls.__iv_size] data = decoded_data[cls.__iv_size:] secret_key_bytes = _make_bytes(secret_key) hashed_secret_key = hashlib.sha256(secret_key_bytes).digest() return aes_cbc_pkcs7_decrypt(hashed_secret_key, data, iv).decode("utf-8") @classmethod def encrypt(cls, unencrypted_data: AnyStr, secret_key: AnyStr) -> bytes: """ Encrypts data using the PKCS7 symmetric encryption algorithm :param unencrypted_data: Data to be encrypted :param secret_key: Secret value used to generate an encryption key :return: Base-64 encoded byte array containing encrypted value """ iv_bytes = os.urandom(cls.__iv_size) plain_text_bytes = _make_bytes(unencrypted_data) secret_key_bytes = _make_bytes(secret_key) hashed_secret_key = hashlib.sha256(secret_key_bytes).digest() iv, encrypted_data = aes_cbc_pkcs7_encrypt(hashed_secret_key, plain_text_bytes, iv_bytes) return b64encode(iv + encrypted_data) class SmallPayloadEncryptor: """ Utility class that provides methods to encrypt and decrypt small-ish payloads via an asymmetric (public/private key) algorithm. The definition of "small" depends on the size of the encryption key and the type of padding algorithm used. For example, given a key size of 4096 bytes and the type of padding used by this set class, the maximum size of a payload that can be encrypted is 447 bytes. """ __hash_algorithm = hashes.SHA256() __padding = OAEP(mgf=MGF1(algorithm=__hash_algorithm), algorithm=__hash_algorithm, label=None) @classmethod def decrypt(cls, encrypted_data: AnyStr, decryption_key_content: bytes) -> Optional[str]: """ Decrypts data encrypted by the `encrypt()` method in this class. :param encrypted_data: The data to be decrypted :param decryption_key_content: The content of the OpenSSL private key file corresponding to the public cert used to encrypt the data :return: The decrypted data :raises: ValueError if decryption_key_content` is None """ if encrypted_data is None: return None if decryption_key_content is None: raise ValueError("decryption_key_content can't be None") encrypted_data = b64decode(_make_bytes(encrypted_data)) # type: ignore decryption_key = serialization.load_pem_private_key(decryption_key_content, password=None) return decryption_key.decrypt(encrypted_data, cls.__padding).decode("utf-8") # type: ignore @classmethod def encrypt(cls, unencrypted_data: AnyStr, encryption_key_content: bytes) -> Optional[bytes]: """ Encrypts any small payload using an RSA asymmetric key algorithm. The maximum size of the payload depends on the size of the encryption key. For example, given a key size of 4096 bits, the maximum size of the payload that can be encrypted is 447 bytes. :param unencrypted_data: The data to be encrypted :param encryption_key_content: The content of the OpenSSL X509 public certificate that will be used to encrypt the data :return: The base64 encoded and encrypted data as a bytes object :raises: ValueError if the payload size is too large :raises: ValueError if `encryption_key_content` is None """ if unencrypted_data is None: return None if encryption_key_content is None: raise ValueError("encryption_key_content can't be None") unencrypted_data = _make_bytes(unencrypted_data) # type: ignore encryption_cert = x509.load_pem_x509_certificate(encryption_key_content) encryption_key = encryption_cert.public_key() return b64encode(encryption_key.encrypt(unencrypted_data, cls.__padding)) # type: ignore class LargePayloadEncryptor: """ This class provides methods to encrypt and decrypt large payloads via the Fernet symmetric encryption algorithm. The `encrypt()` method automatically generates a key for encryption. That key is then encrypted using the asymmetric public/private key algorithm of the `SmallPayloadEncrypter.encrypt()` method and is included in the resulting byte stream returned by this classes' `encrypt()` method. The "receiving" endpoint must then extract the Fernet key from the byte stream and use the corresponding private key of public/private key pair to decrypt the Fernet key. The decrypted Fernet key can then be used to decrypt the remainder of the payload. The only known restriction on payload size is that the payload must fit into memory. """ @classmethod def decrypt(cls, encrypted_data: AnyStr, decryption_key_content: bytes) -> Optional[str]: """ Decrypts data encrypted by the `encrypt()` method of this class. The decryption algorithm is 1. Decode the base-64 representation of the JSON object 2. Load the JSON into a Python dictionary 3. Extract the encrypted Fernet key from the JSON object and decrypt it using our asymmetric decryption algorithm, i.e., the same algorithm we use to decrypt passwords. 4. Extract the encrypted data from the JSON object and decrypt it using the Fernet decryption algorithm. :param encrypted_data: The data to be decrypted :param decryption_key_content: The content of the OpenSSL private key file corresponding to the public cert used to encrypt the data :return: The decrypted data as a `str` object :raises: ValueError if the decryption key is missing from the `encrypted_data` payload :raises: ValueError if decryption_key_content` is None """ if encrypted_data is None: return None if decryption_key_content is None: raise ValueError("decryption_key_content can't be None") encrypted_data = _make_bytes(encrypted_data) # type: ignore json_object = json.loads(b64decode(encrypted_data).decode("utf-8")) encrypted_token_key = json_object.get("key") if encrypted_token_key is None: raise ValueError("token decryption key is missing from the payload") encrypted_token_key = _make_bytes(encrypted_token_key) # Machinations to make mypy happy decrypted_token_key = cast(str, SmallPayloadEncryptor.decrypt(encrypted_token_key, decryption_key_content)) decrypted_token_key = _make_bytes(decrypted_token_key) # type: ignore fernet_encryptor = Fernet(cast(bytes, decrypted_token_key)) return fernet_encryptor.decrypt(json_object["token"].encode("utf-8")).decode("utf-8") @classmethod def encrypt(cls, unencrypted_data: AnyStr, encryption_key_content: bytes) -> Optional[bytes]: """ Encrypts arbitrary data. This method uses a symmetric encryption algorithm (Fernet) to encrypt the data. This algorithm is capable of encrypting much larger payloads than asymmetric algorithms like RSA, which are limited by the key size and padding, if used. The encryption process is 1. Generate a random encryption key 2. Use that key to encrypt the original data. 3. Encrypt the key generated in step 1 by our asymmetric encryption algorithm, i.e., the same algorithm we use to encrypt passwords. This step may or may not use the same public/private keys we use for password encryption. 4. Create a Python dictionary with two entries: key: the encrypted Fernet key token: the data that was encrypted with the Fernet key Both the dictionary keys and values must be of type `str` to be JSON serializable. 5. Serialize the dictionary as a JSON string 6. Return a base-64 encoded representation of the JSON. :param unencrypted_data: The data to be encrypted :param encryption_key_content: The content of the OpenSSL X509 public certificate that will be used to encrypt the data :return: The encrypted key/text pair as a base-64 encoded `bytes` object :raises: ValueError if `encryption_key_content` is None """ if unencrypted_data is None: return None if encryption_key_content is None: raise ValueError("encryption_key_content can't be None") if isinstance(unencrypted_data, str): unencrypted_data = unencrypted_data.encode("utf-8") # type: ignore key = Fernet.generate_key() fernet_encryptor = Fernet(key) # Keys and values must be type `str`, not `bytes`, to be JSON serializable. result = { "key": SmallPayloadEncryptor.encrypt(key, encryption_key_content).decode("utf-8"), # type: ignore "token": fernet_encryptor.encrypt(cast(bytes, unencrypted_data)).decode("utf-8"), } return b64encode(json.dumps(result).encode("utf-8")) fileutils.py000064400000016003147205126350007123 0ustar00# -*- coding: utf-8 -*- import datetime import logging import os import os.path from typing import Any, Optional, Tuple try: from pwd import getpwnam except ImportError: pass # Windows try: from grp import getgrnam except ImportError: pass # Windows from shutil import rmtree import tempfile try: import win32security import ntsecuritycon as con except ImportError: pass # Not-Windows. from primordial.settings import get_file_group, get_file_user DELAY_INCREASE_SECS = 1.0 MAX_RETRIES = 3 LOGGER = logging.getLogger(__name__) def datetime_to_epoch_seconds(in_time: datetime.datetime) -> int: """Return integer seconds since epoch. :param in_time: Datetime :returns: Integer seconds since epoch """ return int((in_time - datetime.datetime(1970, 1, 1)).total_seconds()) def touch(filename: str, times: Optional[Tuple[int, int]] = None) -> None: """Set the atime and mtime of a file. .. note:: Linux (MAYBE Windows) ONLY. This does not work on Mac OSX. :param filename: The file to touch :param times: a two-tuple of (atime, mtime) where these are integer seconds since epoch; see os.utimes :raises OSError: if file doesn't exist """ # stackoverflow.com/questions/1158076/implement-touch-using-python with open(filename, 'a'): # pylint: disable=unspecified-encoding os.utime(filename, times) def rm_path(path: str) -> None: """Simple rmtree wrapper :param path: path to recursively remove :raises: IOError if the path can't be removed """ try: rmtree(path) except OSError as e: raise IOError("Failed to remove %s" % path) from e def rm_file(filename: str) -> None: """Simple remove wrapper :param filename: filename to remove :raises: IOError if the file can't be removed """ try: os.remove(filename) except OSError as e: raise IOError("Failed to remove %s" % filename) from e def _write_file(filename: str, data: Any) -> None: """A utility helper for the atomic file utils""" outfile_handle, tmpfilename = tempfile.mkstemp(dir=os.path.dirname(filename)) outfile = os.fdopen(outfile_handle, 'wb') datalen = len(data) # write to a tmpfile then atomically move try: outfile.write(str(data).encode('utf8')) except OSError as e: raise IOError("Failed to write data to path %s (%s)" % (filename, e)) from e except Exception as e: # pylint: disable=broad-except raise IOError("Unexpectedly failed to write data to path %s (%s)" % (filename, e)) from e finally: outfile.close() # verify written data len stat_struct = os.stat(tmpfilename) if stat_struct.st_size != datalen: raise IOError("Failed to write correct number of bytes (%s, %s, %s)" % (tmpfilename, stat_struct.st_size, datalen)) try: # last man wins os.rename(tmpfilename, filename) except OSError as e: raise IOError("Failed to write data to path %s (%s)" % (filename, e)) from e def win_atomically_write_file(filename: str, data: Any, add_all_access_user: Optional[str] = None) -> None: """Safely/atomically write a file on Windows. Write a file in such a way that the data is guaranteed to be complete and not intermixed with that of another process. This implementation will NOT clobber the target file if it exists already; instead, it will fail immediately. This is a windows-specific implementation that has Windows locking semantics for open file handles and rather than changing the file's ownership, setting add_all_access_user will cause the file to have a read-write ACE added to the ACL for the directory/file for the target user. Note that in both this function and atomically_write_file, the fact that the file has locking or not at the OS level is NOT being relied on in e.g. archon's file utilities for leasing or exclusive access. Instead, this implementation uses tmpfile to guarantee uniqueness of the source of information, and then a simple atomic mv to replace the destination. Windows, unlike Linux, could support a true OS level file locking layer for exclusive access, and so a future Windows specific file utility would be feasible for true single-host, global locking for use in e.g. archon. From https://stackoverflow.com/questions/12168110/setting-folder-permissions-in-windows-using-python :param filename: filename to write :param data: data to put in file :param add_all_access_user: the user if any to add a FILE_ALL_ACCESS ACE for :raises pywintypes.error: on failure to modify the file's ACL; pywintypes.error on user not found; pywintypes.error on file not found """ if os.path.exists(filename): # _write_file() will not replace files on Windows when file already exists rm_file(filename) _write_file(filename, data) # if there is an ACE requested, set it if add_all_access_user is not None: userx, _, _ = win32security.LookupAccountName("", add_all_access_user) file_sd = win32security.GetFileSecurity(filename, win32security.DACL_SECURITY_INFORMATION) dacl = file_sd.GetSecurityDescriptorDacl() dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, userx) file_sd.SetSecurityDescriptorDacl(1, dacl, 0) win32security.SetFileSecurity(filename, win32security.DACL_SECURITY_INFORMATION, file_sd) def atomically_write_file( filename: str, data: Any, file_owner: Optional[str] = None, file_group: Optional[str] = None) -> None: """Safely/atomically write a file. Write to a tmpfile then do os.rename(); Note that this is for Unix systems only, as the implicit contract is that the destination might exist and will be overwritten. This contract is violated on Windows. Also even on Unix, the file system of the source and dest must be the same; since we're in the same directory for this move, that constraint is satisfied. We use tempfile to make it so that we don't collide on the source tmp write -- different processes or threads will not select the same tmpfile. Last man in wins for the move -- there's no general way to sync this across processes or hosts using a filesystem store. :param filename: filename to ultimately write :param data: data to put in file :param file_owner: If set, will change file to this owner if permission is available :param file_group: If set, will change file to this group if permission is available :raises: IOError on failure; OSError on permission change without appropriate permissions """ _write_file(filename, data) file_uid = -1 file_owner = file_owner or get_file_user() if file_owner is not None: file_uid = getpwnam(file_owner).pw_uid file_group_gid = -1 file_group = file_group or get_file_group() if file_group is not None: file_group_gid = getgrnam(file_group).gr_gid os.chown(filename, file_uid, file_group_gid) flow.py000064400000011256147205126350006077 0ustar00# -*- coding: utf-8 -*- import logging from time import sleep from typing import Any, Callable, Tuple, Type, Union DELAY_INCREASE_SECS = 1.0 MAX_RETRIES = 3 LOGGER = logging.getLogger(__name__) class RetryError(Exception): """A special type which signals the failure of a retry loop.""" def wait_for_true(test_function: Callable, max_tests: int = 10, sleep_secs: float = 0.5, count_trues: int = 3) -> Any: """Attempt test_function over and over, waiting for a true value. Try a maximum of max_tests times. Sleep sleep_secs in between each test. Receive count_trues before moving on. wait_for_true is designed specifically to handle a design principle behind AWS or other clustered services: that you might succeed when making a request once and then fail very soon after. For example, when you make a REST API call against S3 to see if a bucket exists, the fact that you get a "True" response does not guarantee that you will get that same response to a request made very soon after. In other words, it's for cases in which the goal is to produce a wait time for an eventually consistent external service to resolve your request. That's why wait_for_true lets you specify a threshold of how many trues you want to get before you're satisfied. :param test_function: a function to run whose result we will test for truthiness :param max_tests: limit to how many times we'll try test_function :param sleep_secs: how long to wait between tests :param count_trues: how many true results we need until we're totally true; this is useful with e.g. cluster tests, where we want a quorum of true answers before we're happy that the entire cluster is consistent (e.g. s3 put) :returns: the return value of the test function, the number of time it tried, and how many true results it found :raises RetryError: if the function "never" returned sufficiently many trues """ itercount = 0 ret = None trues = 0 if count_trues > max_tests: LOGGER.warning("count_trues > max_tests, bumping max_tests to count_trues") max_tests = count_trues while itercount < max_tests: ret = test_function() if ret is True: trues += 1 if trues >= count_trues: break itercount += 1 sleep(sleep_secs) if trues < count_trues: raise RetryError("wait_for_true never succeeded %s times for function %s" % (count_trues, test_function)) return ret, itercount, trues def retry(action_function: Callable, sleep_secs: float = 1.0, backoff: Callable = lambda x: x + DELAY_INCREASE_SECS, max_attempts: int = MAX_RETRIES, exceptions_to_ignore: Union[Type[Exception], Tuple[Type[Exception]]] = Exception) -> Any: """Retry an e.g. network connection until it doesn't throw an exception of any kind. This is for single-success retry cases (e.g. wow, that TCP connection didn't get established because of high latency, let's dial back and try again in a bit). The action_function is expected to be of a sort that any expected exceptions are caught, as this will retry under /any/ exception. The return value, if any, from the action_function is returned by retry on success. sleep_secs is the number of seconds to sleep between failures, and backoff defaults to adding DELAY_INCREASE_SECS to the prior value each time. max_attempts is the limit of the number of retries in any case. On failure to complete the action_function without exception, raises RetryError. :param action_function: a function to run that we will retry if it raises any exception :param sleep_secs: how long to wait between tries :param backoff: a function that will expand the sleep duration based on what iteration we are on :param max_attempts: limit to how many times we'll try action_function :param exceptions_to_ignore: exception type or types to ignore; by default, ignores all Exception-derived exceptions :returns: the return value of the test function, the number of time it tried, and how many true results it found :raises RetryError: if the function "never" returned without an exception """ attempts = 0 while attempts < max_attempts: try: return action_function() except exceptions_to_ignore as e: # pylint: disable=W0703 attempts += 1 if attempts >= max_attempts: raise RetryError("Failure to retry %s: %s" % (action_function, str(e))) from e LOGGER.debug("retry: pausing %s secs before retrying %s (%s)", sleep_secs, action_function, str(e)) sleep(sleep_secs) sleep_secs = backoff(sleep_secs) jsonutils.py000064400000006367147205126350007171 0ustar00# -*- coding: utf-8 -*- import datetime import json from json import JSONEncoder, JSONDecoder import logging import re from importlib import import_module from typing import Any, Callable, Dict from jsonschema import validate as json_validate from jsonschema import exceptions as json_exceptions from primordial.constants import CANONICAL_TIMESTRING_FORMAT DELAY_INCREASE_SECS = 1.0 MAX_RETRIES = 3 LOGGER = logging.getLogger(__name__) def validate_swagger(json_dict: Dict[Any, Any], schema_dict: Dict[Any, Any]) -> bool: """Test that json_dict dictionary conforms to the swagger schema provided. :param json_dict: A dictionary of values :param schema_dict: A dict representation of swagger schema :returns: True or False :raises AssertionError: if either is None """ assert json_dict is not None assert schema_dict is not None try: json_validate(json_dict, schema_dict) return True except json_exceptions.ValidationError: return False def pretty_json_str(json_dict: Dict[Any, Any]) -> str: """Return a pretty-formatted string containing the values in json_dict. :param json_dict: A dictionary or other object that is serializable as a json string :returns: a (unicode) string representation with a default format of sorted keys and indent of 4 spaces :raises TypeError: if the thing in question cannot be json serialized """ return json.dumps(json_dict, cls=DateTimeEncoder, sort_keys=True, separators=(',', ':')) def func_to_str(func: Callable) -> str: """Create a string for a python function in a module. Class and object methods not supported currently. :param func: a python function :returns: a string representing this function """ return "%s.%s" % (func.__module__, func.__name__) def str_to_func(func_str: str) -> Callable: """Use importlib to turn a string into a function. At the moment, this is limited to module functions and not classes. :param func_str: the string representation of a function, e.g. ``tests.test_workflow.ex_tst_func`` :returns: a python function :raises ImportError: if the function does not exist """ funcnotation = func_str.split('.') symbol = funcnotation.pop() module = import_module('.'.join(funcnotation)) return getattr(module, symbol) class DateTimeEncoder(JSONEncoder): """Uses our canonical format.""" def default(self, obj: Any) -> Any: # pylint: disable=E0202,W0221,arguments-renamed if isinstance(obj, datetime.datetime): ret = obj.strftime(CANONICAL_TIMESTRING_FORMAT) else: ret = JSONEncoder.default(self, obj) return ret # regex for CANONICAL_TIMESTRING_FORMAT DATETIME_REGEX = re.compile(r"^(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2}).(\d{6})Z$") class DateTimeDecoder(JSONDecoder): """Uses our canonical format.""" def __init__(self, *args, **kwargs): super().__init__(*args, object_hook=self.dict_to_object, **kwargs) def dict_to_object(self, d: Dict[Any, Any]) -> Any: # pylint: disable=C0103 for key, value in d.items(): if isinstance(value, str) and DATETIME_REGEX.match(value): d[key] = datetime.datetime.strptime(value, CANONICAL_TIMESTRING_FORMAT) return d py.typed000064400000000000147205126350006236 0ustar00settings.py000064400000002345147205126350006767 0ustar00# -*- coding: utf-8 -*- import os # user and group for atomic file (linux) writes (moves); can be set to "nobody" or any other linux file user name FILE_USER = None FILE_GROUP = None def get_file_user(): """Indirection for testing and settings""" return FILE_USER def get_file_group(): """Indirection for testing and settings""" return FILE_GROUP ZK_HOSTS = os.getenv("ZK_HOSTS", '127.0.0.1:2181') GD_ZKRUN_VIRTUALENV_PATH = os.getenv("GD_ZKRUN_VIRTUALENV_PATH") GD_ZKRUN_COMMAND = os.getenv("GD_ZKRUN_COMMAND") GD_ZKRUN_NAME = os.getenv("GD_ZKRUN_NAME") GD_ZKRUN_HOSTS = os.getenv("GD_ZKRUN_HOSTS", ZK_HOSTS) GD_ZKRUN_SERVICE_TYPE = os.getenv("GD_ZKRUN_SERVICE_TYPE") GD_ZKRUN_PORT = os.getenv("GD_ZKRUN_PORT") GD_ZKRUN_SSL_PORT = os.getenv("GD_ZKRUN_SSL_PORT") GD_ZKRUN_LOCATIONS = os.getenv("GD_ZKRUN_LOCATIONS") # NB: these are HFS-y, but I'm leaving them in since they are overridable. For now. -EA SSL_PRIVATE_KEY = os.getenv("SSL_PRIVATE_KEY", '/opt/hfs/ssl/server.key') SSL_CERTIFICATE = os.getenv("SSL_CERTIFICATE", '/opt/hfs/ssl/server.crt') SSL_CA_LIST = os.getenv("SSL_CA_LIST", '/opt/hfs/ssl/client-trust-chain.crt') try: from local_settings import * # pylint: disable=W0401 except ImportError: pass sizes.py000064400000030412147205126350006260 0ustar00# -*- coding: utf-8 -*- # pylint: disable=W0212 import math from typing import List, Optional, Tuple # pylint: disable=W0611 _sizeUnits = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB'] class ByteSize: """Encapsulation for tracking sizes in bytes.""" def __init__(self, Bytes: int = 0, KiBytes: int = 0, MiBytes: int = 0, GiBytes: int = 0, TiBytes: int = 0, PiBytes: int = 0, EiBytes: int = 0) -> None: self._bytes = int(Bytes) + ( 1024 * (int(KiBytes) + ( 1024 * (int(MiBytes) + ( 1024 * (int(GiBytes) + ( 1024 * (int(TiBytes) + ( 1024 * (int(PiBytes) + ( 1024 * int(EiBytes)))))))))))) def _breakdown(self) -> List[Tuple[int, str]]: sres = [] # type: List[Tuple[int, str]] csize = self._bytes for size in _sizeUnits: sval = csize & 1023 if sval: sres.insert(0, (sval, size)) csize >>= 10 # the # of bits in 1024 if csize == 0: break return sres or [(0, '')] def __str__(self) -> str: return ','.join(['%d%s' % each for each in self._breakdown()]) def __repr__(self) -> str: return '{}({})'.format( self.__class__.__name__, ','.join(['{}ytes={}'.format(U or 'B', V) for V, U in self._breakdown()]) ) def simpleSizeStr(self) -> str: """Return the size in simple form (int ceiling).""" bd = self._breakdown() return '%d%s' % (bd[0][0] if len(bd) == 1 else bd[0][0] + 1, bd[0][1]) def approxSizeStr(self) -> str: """Return the size in floating point form to two significant digits.""" bd = self._breakdown() if (len(bd) == 1 or (_sizeUnits.index(bd[1][1]) != _sizeUnits.index(bd[0][1]) - 1 and _sizeUnits.index(bd[1][1]) != 'KiB')): return '%d%s' % (bd[0]) fv = ((bd[0][0] * 1024.0) + bd[1][0]) / 1024.0 rsp = '%.2f' % fv if rsp.endswith('.00'): return rsp[:-3] + bd[0][1] return rsp + bd[0][1] def fullSizeStr(self) -> str: """Return the size in full detail units.""" return ','.join(['%d%s' % each for each in self._breakdown()]) @property def Bytes(self) -> int: return self._bytes @property def KiBytes(self) -> float: return self.Bytes / 1024.0 @property def MiBytes(self) -> float: return self.KiBytes / 1024.0 @property def GiBytes(self) -> float: return self.MiBytes / 1024.0 @property def TiBytes(self) -> float: return self.GiBytes / 1024.0 @property def PiBytes(self) -> float: return self.TiBytes / 1024.0 @property def EiBytes(self) -> float: return self.PiBytes / 1024.0 def __trunc__(self): return self._bytes def __index__(self): return self._bytes # Comparisons def __eq__(self, o): if o is None: return self._bytes == 0 try: return self._bytes == o._bytes except AttributeError: return self._bytes == o def __ne__(self, o): if o is None: return self._bytes != 0 try: return self._bytes != o._bytes except AttributeError: return self._bytes != o def __gt__(self, o): if o is None: return self._bytes > 0 try: return self._bytes > o._bytes except AttributeError: return self._bytes > o def __ge__(self, o): if o is None: return self._bytes >= 0 try: return self._bytes >= o._bytes except AttributeError: return self._bytes >= o def __lt__(self, o): if o is None: return False try: return self._bytes < o._bytes except AttributeError: return self._bytes < o def __le__(self, o): if o is None: return self._bytes == 0 try: return self._bytes <= o._bytes except AttributeError: return self._bytes <= o # Arithmetic operations # Add is always two sizes, result is a size def __add__(self, o): if hasattr(o, '_bytes'): return self.__class__(self._bytes + o._bytes) return self.__class__(self._bytes + o) def __iadd__(self, o): if hasattr(o, '_bytes'): self._bytes += o._bytes else: self._bytes += o return self # Subtract is always two sizes, result is a size def __sub__(self, o): if hasattr(o, '_bytes'): return self.__class__(Bytes=max(0, self._bytes - o._bytes)) return self.__class__(Bytes=max(0, self._bytes - o)) def __isub__(self, o): if hasattr(o, '_bytes'): self._bytes -= max(o._bytes, 0) else: self._bytes -= o return self # Can only multiply by another integer, result is always a size def __mul__(self, o): return self.__class__(Bytes=self._bytes * o) def __rmul__(self, o): return self.__class__(Bytes=self._bytes * o) def __imul__(self, o): self._bytes *= o return self # Can Div by another size or an integer, result is the opposite of the divisor def __floordiv__(self, o): if isinstance(o, ByteSize): return self._bytes // o._bytes return self.__class__(int(self._bytes // o)) # Mod is by size or integer, result is always a size def __mod__(self, o): return self.__class__(self._bytes % (o._bytes if isinstance(o, ByteSize) else o)) def __divmod__(self, o): return self // o, self % o def __truediv__(self, o): if isinstance(o, ByteSize): return self._bytes * 1.0 / o._bytes return self.__class__(int(self._bytes / o)) # No rdiv operators for sizes def __idiv__(self, o): self._bytes /= o # only update divide by integer since result should be a size self._bytes = int(self._bytes) return self def __itruediv__(self, o): return self.__idiv__(o) def __ifloordiv__(self, o): self._bytes //= o # only update divide by integer since result should be a size self._bytes = int(self._bytes) return self def __imod__(self, o): self._bytes %= o # only update divide by integer since result should be a size self._bytes = int(self._bytes) return self # Boolean comparisons def __nonzero__(self): return bool(self._bytes) # for Python2 def __bool__(self): return bool(self._bytes) # for Python3 class MemSize(ByteSize): """Encapsulation for tracking amount of memory""" class DiskSize(ByteSize): """Encapsulation for tracking size of persistent storage (disk)""" class InvalidMemSize(MemSize): def __init__(self, invalid_value): # pylint: disable=W0231 self._invVal = invalid_value def __str__(self): return 'InvalidMemSize "%s"' % str(self._invVal) def __repr__(self): return 'InvalidMemSize(%s)' % repr(self._invVal) # let property accessors throw exception on no self._bytes member def __eq__(self, o): return isinstance(o, InvalidMemSize) def __ne__(self, o): return not isinstance(o, InvalidMemSize) class InvalidDiskSize(DiskSize): def __init__(self, invalid_value): # pylint: disable=W0231 self._invVal = invalid_value def __str__(self): return 'InvalidDiskSize "%s"' % str(self._invVal) def __repr__(self): return 'InvalidDiskSize(%s)' % repr(self._invVal) # let property accessors throw exception on no self._bytes member def __eq__(self, o): return isinstance(o, InvalidDiskSize) def __ne__(self, o): return not isinstance(o, InvalidDiskSize) class ByteSizes: """Encapsulation for an aggregation of byte size values. The naturalUnits indicates which units to report sizes in if the size is not indicated or is a close approximation of those units (naturalUnits should be one of _sizeUnits) """ def __init__(self, naturalUnits: str = 'B') -> None: self._accum = ByteSize(0) self._minSize = None # type: Optional[ByteSize] self._maxSize = ByteSize(0) self._numSizes = 0 self._natUnit = naturalUnits self._sumUnits = None # type: Optional[str] def __iadd__(self, o): if not isinstance(o, ByteSize): o = ByteSize(Bytes=o) self._accum += o self._numSizes += 1 if self._minSize is None or o < self._minSize: self._minSize = o if o > self._maxSize: self._maxSize = o self._sumUnits = None return self def __str__(self) -> str: return str(self._accum) @property def minimumSize(self) -> ByteSize: return ByteSize(0) if self._minSize is None else self._minSize @property def maximumSize(self) -> ByteSize: return self._maxSize @property def averageSize(self) -> ByteSize: return ByteSize(0) if self._numSizes == 0 else (self._accum / self._numSizes) def _calcSummaryUnits(self) -> str: suI = len(_sizeUnits) - 1 for suI in range(len(_sizeUnits) - 1, -1, -1): baseSize = ByteSize(Bytes=1024 ** suI) if baseSize > self._accum: continue if suI + 1 == _sizeUnits.index(self._natUnit) and self._accum % baseSize >= 100: return self._natUnit return _sizeUnits[suI] return self._natUnit def summaryUnits(self) -> str: if self._sumUnits is None: self._sumUnits = self._calcSummaryUnits() return self._sumUnits def fullSizeStr(self, ofSize: Optional[ByteSize] = None) -> str: """Return the specified size (or total size) in full detail units. :param: The size to be displayed """ if ofSize is None: return self._accum.fullSizeStr() return ofSize.fullSizeStr() def simpleSizeStr(self, ofSize: Optional[ByteSize] = None, withUnits: bool = False) -> str: """Returns the specified size (or total size) in simple form (int ceiling) based on the summary Units. The withUnits can be used to enable or suppress reporting of units in the output (units are summaryUnits() in either case). :param ofSize: The size to simplify :param withUnits: Whether to include units in the output """ if ofSize is None: return self._accum.simpleSizeStr() val = math.ceil({'': ofSize.Bytes, 'B': ofSize.Bytes, 'KiB': ofSize.KiBytes, 'MiB': ofSize.MiBytes, 'GiB': ofSize.GiBytes, 'TiB': ofSize.TiBytes, 'PiB': ofSize.PiBytes, 'EiB': ofSize.EiBytes}[self.summaryUnits()]) if withUnits: return '%d%s' % (val, self.summaryUnits()) return str(int(val)) def approxSizeStr(self, ofSize: Optional[ByteSize] = None, withUnits: bool = False) -> str: """Give a string representation of a close approximation of the size. Returns the specified size (or total size) in floating point form to two significant digits, based on the on the summary Units. The withUnits can be used to enable or suppress reporting of units in the output (units are summaryUnits() in either case). :param ofSize: The size to be represented :param withUnits: Whether to include units in the output """ if ofSize is None: return self._accum.approxSizeStr() val = int({'': ofSize.Bytes, 'B': ofSize.Bytes, 'KiB': ofSize.KiBytes, 'MiB': ofSize.MiBytes, 'GiB': ofSize.GiBytes, 'TiB': ofSize.TiBytes, 'PiB': ofSize.PiBytes, 'EiB': ofSize.EiBytes}[self.summaryUnits()] * 1024.0) / 1024.0 return '%.2f%s' % (val, self.summaryUnits() if withUnits else '') timeutils.py000064400000002677147205126350007156 0ustar00# -*- coding: utf-8 -*- from typing import Optional import datetime from pytz import utc as UTC from primordial.constants import CANONICAL_TIMESTRING_FORMAT, CANONICAL_TIMESTRING_FORMAT_NO_MICROSECONDS def datetime_to_epoch_seconds(in_time: datetime.datetime) -> int: """ Return integer seconds since epoch. :param in_time: Datetime :returns: Integer seconds since epoch """ return int((in_time - datetime.datetime(1970, 1, 1)).total_seconds()) def iso8601_utc(dt: datetime.datetime, tz: Optional[datetime.tzinfo] = None, microseconds: bool = True) -> str: """ Return a string representation of a datetime in ISO8601 format (YYYY-MM-DDTHH:MM:SS.ssssssZ) in the UTC (Z) timezone. :param dt: The datetime object. :param tz: The timezone to assume when coverting a naive datetime to UTC (Required if `dt` is naive). :param microseconds: Whether to include microseconds in the representation. Defaults to `True`. :returns: ISO8601-formatted string representation :raises ValueError: If the datetime is naive, and no tz is provided """ tformat = CANONICAL_TIMESTRING_FORMAT if microseconds else CANONICAL_TIMESTRING_FORMAT_NO_MICROSECONDS if dt.tzinfo is None: if tz is None: raise ValueError('`tz` param must be provided if datetime object is naive') dt = dt.replace(tzinfo=tz) if dt.tzinfo is not UTC: dt = dt.astimezone(UTC) return dt.strftime(tformat) utils.py000064400000011047147205126350006266 0ustar00# -*- coding: utf-8 -*- import logging from functools import partial, update_wrapper from time import sleep from typing import Any, Dict, Iterable, Optional, Tuple, Type, Callable MAX_RETRIES = 3 LOGGER = logging.getLogger(__name__) class RetriesExhaustedError(Exception): """A special type which signals the failure of a retry loop.""" def abbreviate_hostname_for_windows(hostname: Optional[str]) -> Optional[str]: """Abbreviate hostname for use on a Windows machine. :param hostname: the hostname :returns: the first non-empty domain in the hostname, excluding "www." """ if hostname is None: return None if hostname.lower().startswith('www.'): hostname = hostname[4:] for domain in hostname.split('.'): if domain: return domain return hostname def subdict(dict_: Dict[Any, Any], keys: Iterable[Any]) -> Dict[Any, Any]: """Filter a dictionary to contain only a certain set of keys. :param dict_: The original dictionary to be filtered. :param keys: A list, or other iterable, containing the desired dictionary keys. :returns: A dictionary containing only the desired keys. """ return {k: v for k, v in dict_.items() if k in keys} def subdict_omit(dict_: Dict[Any, Any], keys: Iterable[Any]) -> Dict[Any, Any]: """Filter a dictionary to omit a set of keys. :param dict_: The original dictionary to be filtered. :param keys: An iterable containing the keys to omit. :returns: A dictionary with the desired keys omitted. """ return {k: v for k, v in dict_.items() if k not in keys} def _should_retry(*_args, curr_attempt: int = 0, max_attempts: int = MAX_RETRIES, **_kwargs) -> bool: return curr_attempt < max_attempts def _retry_after(*_args, curr_attempt: int = 0, sleep_secs: int = 1, **_kwargs) -> int: return sleep_secs def retry_this(on_ex_classes: Tuple[Type[Exception], ...] = (Exception,), max_attempts: int = MAX_RETRIES, sleep_secs: int = 1, should_retry: Optional[Callable[[Any], bool]] = None, retry_after: Optional[Callable[[Any], int]] = None): """Decorator that adds retry on error functionality to a function. Currently the retry strategy is 'linear' on errors. i.e. this function waits a set period of time before retrying the failed function again. :param on_ex_classes: A tuple of exceptions to retry on. By default, its all exceptions that derive from the 'Exception' class. :param max_attempts: Limit to how many times we'll retry :param sleep_secs: How long to wait between retries. :param should_retry: A predicate which when called will return a boolean saying whether the call should be retried or not. This parameter overrides the max_attempts parameter and gives more control to dynamically choose on if we need to continue retrying a call. :param retry_after: A callable that returns how long to wait between retries. This parameter overrides the sleep_secs parameter and gives more control to dynamically choose the wait time. :returns: This returns a decorator function that actually provides the retry functionality. """ should_retry = should_retry or partial(_should_retry, max_attempts=max_attempts) retry_after = retry_after or partial(_retry_after, sleep_secs=sleep_secs) def wrapper(f): ex_classes = tuple(ex_cls for ex_cls in on_ex_classes if issubclass(ex_cls, Exception)) def new_func(*pargs, **kwargs): curr_attempt = 0 while True: try: return f(*pargs, **kwargs) except ex_classes as e: # pylint: disable=E0712 curr_attempt += 1 LOGGER.error("Exception (%s) occured while executing %s", str(e), f.__name__) if not should_retry(*pargs, curr_attempt=curr_attempt, **kwargs): msg = 'Max attempts exhausted for {}'.format(f.__name__) # pylint: disable=E0703 raise RetriesExhaustedError(msg) from e s_secs = retry_after(*pargs, curr_attempt=curr_attempt, **kwargs) LOGGER.debug( "Sleeping %s secs before retrying %s, due to exception (%s)", s_secs, f.__name__, str(e)) sleep(s_secs) return update_wrapper(new_func, f) return wrapper validator.py000064400000022114147205126350007110 0ustar00# -*- coding: utf-8 -*- import functools import re from datetime import datetime, timedelta from typing import Any, Callable, Dict, Iterable, List, Optional, Union from uuid import UUID from voluptuous import Schema, Url, MultipleInvalid from primordial.constants import CANONICAL_TIMESTRING_FORMAT ValidatorType = Optional[Union[Iterable[Callable], Callable]] DateTimeType = Union[datetime, str] class BadParameterException(Exception): """To be raised when a validation operation fails.""" def validate(validator: Callable, param_value: Any, coerce_: bool = False) -> Any: """Run a validation operation. Validate a particular parameter with a particular validator and possibly coerce the value into the validator's return type. :param validator: The validator to be run :param param_value: The value to be validated :param coerce_: Whether to return a type coerced value :raises ValueError: If the parameter could not be validated. """ if callable(validator): # it's a callable try: myval = validator(param_value) except Exception as e: raise BadParameterException("Parameter %s failed validation (%s)" % (param_value, e)) from e if coerce_ is True: ret = myval else: ret = param_value return ret raise ValueError("Cannot use a non-callable as a parameter validator: %s" % validator) def validate_param(param_validator: ValidatorType, param_value: Any, coerce_: bool = False) -> Any: """Validate a parameter. :param param_validator: The validator (or list of validators) to be run :param param_value: The value to be validated :param coerce_: Whether to return a type coerced value :raises ValueError: If the parameter could not be validated. """ if param_validator is not None: # Exclusion below is due to Pylint bug https://github.com/PyCQA/pylint/issues/3507 if isinstance(param_validator, Iterable): # pylint: disable=isinstance-second-argument-not-valid-type for validator in param_validator: if validator is None: # maybe this is a bad semantic choice, but, unlike a bare None as # a validator not in a list context, None here doesn't mean skip with # no validation, but instead means that the value can be the value None # itself. The reason I think this is OK is that it's nonsense to have # a list of validators which includes the global None validator since # that would be formally equivalent to just using a bare None -- EA if param_value is None: return param_value # otherwise we keep searching the list try: retval = validate(validator, param_value, coerce_=coerce_) # take first non-excepting value return retval except Exception as _: # pylint: disable=W0703 pass raise ValueError("No validator in list validated %s (%s)" % (param_value, param_validator)) return validate(param_validator, param_value, coerce_=coerce_) return param_value URL_SCHEMA = Schema(Url()) # pylint: disable=E1120 def url(val: str) -> str: """Validate that a string looks like a URL. url is intended to be used like str or int to be a basic callable that will except on type mismatch or non- coercible value. :param val: The value to be checked :raises ValueError: If the value does not look like a URL. """ # this will raise a voluptuous MultipleInvalid error if it fails to validate try: URL_SCHEMA(val) except MultipleInvalid as e: # use a normal valueerror externally raise ValueError("Not a url: %s (%s)" % (val, e)) from e # return uncoerced value return val def parseable_datetime(val: str) -> bool: """Validate that we can parse a datetime from a string. Catch exception and return false if strptime doesn't work. :param val: The value to be checked """ try: _ = datetime.strptime(val, CANONICAL_TIMESTRING_FORMAT) return True except Exception: # pylint: disable=W0703 return False def is_datetime(val: DateTimeType) -> DateTimeType: """Validate that a value represents a datetime. :param val: The value to be checked :raises ValueError: If the value does not represent a datetime. """ if isinstance(val, datetime) or parseable_datetime(val): return val raise ValueError("Not a datetime: %s" % val) def is_timedelta(val: timedelta) -> timedelta: """Validate that a value is a timedelta. :param val: The value to be checked :raises ValueError: If the value is not a timedelta. """ if isinstance(val, timedelta): return val raise ValueError("Not a timedelta: %s" % val) def is_non_empty_dict(val: Dict) -> Dict: """Validate that a value is a non-empty dictionary. :param val: The value to be checked :raises ValueError: If the value is not a dictionary, or is empty. """ if isinstance(val, dict) and val != {}: return val raise ValueError("Not a non-empty dict: %s" % val) POSTGRES_NAME_REGEX = re.compile(r'^[a-z_][a-z0-9_]*$') def is_postgres_name(val: str) -> str: """Validate that argument is a valid Postgres identifier. :param val: The value to be checked :raises ValueError: If the value is not a valid Postgres identifier. """ if POSTGRES_NAME_REGEX.match(val): return val raise ValueError("Not a valid Postgres name (%s): %s" % (POSTGRES_NAME_REGEX.pattern, val)) def internal_validate_positionals(positional_args: List[Any], positional_arg_values: List[Any], coerce_: bool = False) -> List[Any]: """Validate a list of positional arguments. If we run out of stated positionals, we simply dump the originals unvalidated (we're saying the validators are optional) :param positional_args: The validators to be run, in order, against the values :param positional_arg_values: The values to be validated :param coerce_: Whether to return type coerced values """ outargs = [] for index, myvalue in enumerate(positional_arg_values): try: validator = positional_args[index] except IndexError as _: validator = None if validator is not None: myval = validate_param(validator, myvalue, coerce_=coerce_) else: myval = myvalue outargs.append(myval) return outargs def internal_validate_keywords(keyword_args: Dict[str, Any], keyword_arg_values: Dict[str, Any], coerce_: bool = False) -> Dict[str, Any]: """Validate a dictionary of keyword arguments. If there is no matching validator for a particular keyword, then the original value is passed along in the output unvalidated. :param keyword_args: The validators to be run against the values :param keyword_arg_values: The values to be validated :param coerce_: Whether to return type coerced values """ outdict = {} for key, value in keyword_arg_values.items(): try: validator = keyword_args[key] except KeyError as _: validator = None if validator is not None: myval = validate_param(validator, value, coerce_=coerce_) else: myval = value outdict[key] = myval return outdict def coerce_or_error( positionals: Optional[List[Any]] = None, keywords: Optional[Dict[str, Any]] = None, coerce_: bool = False) -> Callable: """Either coerce the arguments in the suggested way or die with error back to the client. :param positionals: A list of validators to be run against positional arguments :param keywords: A dictionary of validators to be run against keyword arguments :param coerce_: Whether to return type coerced values """ def decorator(function): """Inner""" @functools.wraps(function) def wrapper(*args, **kwargs): """Actual wrapper""" try: if positionals is not None: outargs = internal_validate_positionals(positionals, args, coerce_=coerce_) else: outargs = args if keywords is not None: outkwargs = internal_validate_keywords(keywords, kwargs, coerce_=coerce_) else: outkwargs = kwargs except BadParameterException as e: raise ValueError("Failed to validate: %s, %s" % (str(e), str(e))) from e return function(*outargs, **outkwargs) return wrapper return decorator def is_uuid(uuid_maybe: str) -> bool: """Validate that a value represents a UUID. :param uuid_maybe: The value to be checked """ try: UUID(uuid_maybe) return True except ValueError: return False zkconfig.py000064400000010167147205126350006742 0ustar00# -*- coding: utf-8 -*- import re import socket from collections import deque from typing import Any from kazoo.client import KazooClient from kazoo.exceptions import ConnectionLoss, NoNodeError, ZookeeperError from kazoo.handlers.threading import KazooTimeoutError from primordial.config import Config from primordial.utils import retry_this class ZKConfig(Config): """Config class that loads configuration options from a ZooKeeper cluster""" ROOT = '/config' # type: str # The ? in the first capture is so the regex matching is non-greedy. # Passwords with a "=" in them can otherwise result in weird parsing CONFIG_LINE = re.compile(r'\s*(\S+?)\s*=\s*(\S+)\s*') def __init__(self, hosts: str) -> None: self._client = KazooClient(hosts=hosts, read_only=True) self._config = {} # type: ignore self.load_config() def __enter__(self): """ This turns this class/object into a context manager for connections to the zookeeper cluster. This starts up the connection and returns the client as the resource being managed. """ # If start throws an error it will get propagated up the stack # The start method can throw a KazooTimeoutError error self._client.start() return self._client def __exit__(self, _exc_type, _exc_val, _exc_tb): """ This method takes care of releasing the client connection to the zookeeper cluster. """ try: self._client.stop() self._client.close() except Exception: # pylint: disable=W0703 # Squelch any exception we may encounter as part of connection closing pass @retry_this(on_ex_classes=(ConnectionLoss, KazooTimeoutError, socket.error, NoNodeError, ZookeeperError)) def load_config(self): """ Load HFS config data including config at the various namespaces and flatten it out into a dict representation. """ with self as zk_client: for path in self.enumerate(zk_client): self._config.update(self.get_config_at_path(zk_client, path)) def enumerate(self, zk_client): """ Generate all child paths, starting at a particular path. The starting path is also included in this enumeration. """ paths_to_traverse = deque([self.ROOT]) # Do a breadth first traversal of nodes while paths_to_traverse: path = paths_to_traverse.popleft() yield path for child in zk_client.get_children(path): child_path = '{}/{}'.format(path, child) paths_to_traverse.append(child_path) def get_config_at_path(self, zk_client, path): """ Get the data (which is interpreted as config uploaded to that path/namespace) at a particular zookeeper path. Parse out the data looking for lines that look like key=value. Generate a dict out of this normalizing the key appropriately. Finally return back this dict. """ config = {} # The path is going to have /config as the prefix, so when we split on '/' the first 2 items # are '' and 'config'. Hence, we drop the first 2 items in the code below. # Example 1: if path was '/config' then path_sections would be [] # Example 2: if path was '/config/hhfs/nydus' then path_sections would be ['hhfs', 'nydus'] path_sections = path.split('/')[2:] zk_value, _zk_stat = zk_client.get(path) stored_config = zk_value.decode("utf-8") for line in stored_config.split('\n'): match = self.CONFIG_LINE.match(line) if match: k, v = match.groups() config_key = '.'.join(path_sections + [k]) config[config_key] = v return config def get(self, key: str, default: Any = None) -> Any: """ Get the config option as a string :param key: config option name :param default: default value if no value exists in the config :return: option value """ return self._config.get(key, default) zookeeper.py000064400000032563147205126350007137 0ustar00# -*- coding: utf-8 -*- """\ Zookeeper functions """ import os import socket from enum import Enum from typing import Any, Generator from tempfile import NamedTemporaryFile from contextlib import contextmanager from kazoo.client import KazooClient from kazoo.exceptions import ConnectionLoss # This is the error thrown when the KazooClient.start call times out and the connections wasn't established from kazoo.handlers.threading import KazooTimeoutError from primordial.utils import retry_this ZK_CONFIG_FILE = '/etc/sysconfig/zookeeper' def lookup_hosts(zk_config_file: str = ZK_CONFIG_FILE) -> str: """ Lookup the zookeeper nodes and return them as a comma-separated string :returns: The zookeeper nodes as a comma-separated string """ nodes = os.getenv('ZK_HOSTS', None) if not nodes: if os.access(zk_config_file, os.R_OK): with open(zk_config_file, 'r', encoding='UTF-8') as zkcfg: for line in zkcfg.readlines(): if line.strip().startswith('ZK_HOSTS'): nodes = eval(line.partition('=')[2]) # pylint: disable=eval-used if not nodes: raise ZookeeperException('Could not retrieve Zookeeper Hosts') return nodes class Zookeeper: """Represents a connection to Zookeeper""" def __init__(self, zk_config_file: str = ZK_CONFIG_FILE): """ Initialise a Zookeeper connection :param zk_config_file: The path to the zookeeper config file (if not /etc/sysconfig/zookeeper) """ self.zk_config_file = zk_config_file self.zk_hosts = self.lookup_hosts() self.client = KazooClient(hosts=self.zk_hosts) @contextmanager def _start_session_cm(self) -> Generator[KazooClient, None, None]: """ Yield a started zookeeper client in a context manager :yields: A started instance of KazooClient """ self.client.start() try: yield self.client finally: self.stop_session() def exists(self, node: str) -> bool: """ Check if node exists :param node: Name of zookeeper node :returns: (bool) Whether the node exists """ with self._start_session_cm() as zk: return zk.exists(node) is not None def get(self, node: str) -> Any: """ Get the node value :param node: Name of zookeeper node :returns: The node value """ with self._start_session_cm() as zk: return zk.get(node)[0] def get_or_default(self, node: str, default: Any = None) -> Any: """ Get a node value if it exists. If it does not exist, return the default value specified :param node: Name of zookeeper node :param default: The default value to return if the node does not exist :returns: The node value or the default value if the node does not exist """ with self._start_session_cm() as zk: if zk.exists(node) is not None: return zk.get(node)[0] return default def set(self, node: str, value: bytes) -> None: """ Set the node value :param node: Name of zookeeper node :param value: Value of zookeeper node """ with self._start_session_cm() as zk: zk.ensure_path(node) return zk.set(node, value) def delete(self, node: str, recursive: bool = False) -> None: """ Delete the node, if it exists :param node: Name of zookeeper node :param recursive: Whether to delete the node and all child nodes """ with self._start_session_cm() as zk: if zk.exists(node) is not None: zk.delete(node, recursive=recursive) def stop_session(self) -> None: """End and close the current zookeeper session""" self.client.stop() self.client.close() def lookup_hosts(self) -> str: """ Lookup the zookeeper nodes and return them as a comma-separated string :returns: The zookeeper nodes as a comma-separated string """ return lookup_hosts(self.zk_config_file) class ZKFile(Zookeeper): """ Creates a named temporary file with the contents of one or more znodes. Useful for APIs that only accept file paths rather than file-like objects. Warning: Because the file is a temp file there is a possibility of it being deleted by some tempfile cleanup mechanisms (tmpwatch, etc.). In this case it may be a better idea to read the znode into memory and create the temp file from that each time you use it. """ def __init__(self, *znodes: str, zk_config_file: str = ZK_CONFIG_FILE): """ Load the znode contents into a temporary file :param znodes: An expanded list of zookeeper node names :param zk_config_file: The zookeeper config file (if not the default of /etc/sysconfig/zookeeper) """ super().__init__(zk_config_file=zk_config_file) self.file = NamedTemporaryFile() # pylint: disable=consider-using-with for znode in znodes: if znode: # looks silly, but None is a possibility self.file.write(self.get(znode)) self.file.flush() @property def name(self) -> str: """Get the filename for the temporary file storing the contents of the zk node(s)""" return self.file.name class ZookeeperException(Exception): pass class ZKMode(Enum): """ Enum to represent the mode a zk node is currently operating in """ LEADER = 1 FOLLOWER = 2 STANDALONE = 3 ERROR = 4 UNKNOWN = 5 class ZkEnsembleNode: """ This class represents a single node in a zookeeper cluster and holds to the status/mode that node is operating in. Values are leader, follower, standalone, error or unknown. """ def __init__(self, host: str): """ Initialize the class. :param host: The host:port connection for the node whose mode/state this class holds. """ self.__client = KazooClient(hosts=host) self.__host, self.__port = host.split(':') self.__mode = ZKMode.UNKNOWN def __enter__(self): """ This turns this class/object into a context manager for connections to the zookeeper node. This starts up the connection and returns the client as the resource being managed. """ # If start throws an error it will get propagated up the stack # The start method can throw a KazooTimeoutError error self.__client.start() return self.__client def __exit__(self, _exc_type, _exc_val, _exc_tb): """ This method takes care of releasing the client connection to the zookeeper node. """ try: self.__client.stop() self.__client.close() except Exception: # pylint: disable=W0703 # Squelch any exception we may encounter as part of connection closing pass def __repr__(self): """ Python repr implementation for this class. """ return 'Host: {}, Mode: {}'.format(self.host, self.mode) @property def host(self): return self.__host @property def port(self): return self.__port @retry_this(on_ex_classes=(ConnectionLoss, KazooTimeoutError, socket.error)) def fetch_mode(self, force: bool = False) -> ZKMode: """ This method returns the mode the zk node is currently operating in. If a nodes mode has already been fetched, then this method returns the cached mode/status. To initiate a re-fetch of the status, use the force parameter. :param force: Force re-fetch of the zk node's status/mode """ if not force and self.__mode is not ZKMode.UNKNOWN: return self.__mode with self as zk_client: # If there was an exception while executing the 4 letter command ruok, let it propagate up the stack ruok = zk_client.command(cmd=b"ruok") if ruok == "imok": # If there was an exception while executing the 4 letter command srvr, let it propagate up the stack srvr = zk_client.command(cmd=b"srvr") mode_line = list(filter(lambda l: l.startswith('Mode: '), srvr.split('\n')))[0] m = mode_line[6:] self.__mode = ZKMode[m.upper()] else: self.__mode = ZKMode.ERROR return self.__mode @property def mode(self) -> ZKMode: """ Property to return the internally cached mode """ return self.__mode @property def is_follower(self) -> bool: """ Python property to check if the node is currently operating as a follower. """ return self.mode == ZKMode.FOLLOWER @property def is_leader(self) -> bool: """ Python property to check if the node is currently operating as a leader. """ return self.mode == ZKMode.LEADER @property def is_standalone(self) -> bool: """ Python property to check if the node is currently operating in standalone mode. """ return self.mode == ZKMode.STANDALONE class ZkEnsembleStatusNotGathered(Exception): pass class ZookeeperEnsemble: """ This class is used to represent a zookeeper ensemble/cluster and test if there is currently a quorum in the cluster. """ def __init__(self, zk_config_file: str = ZK_CONFIG_FILE): """ Initialise a Zookeeper connection :param zk_config_file: Path to the zookeeper config file (default /etc/sysconfig/zookeeper) """ self.zk_config_file = zk_config_file # This looks like p3dlhfzk01.cloud.phx3.gdg:2181,p3dlhfzk02.cloud.phx3.gdg:2181,p3dlhfzk03.cloud.phx3.gdg:2181 self.zk_hosts_conn_str = lookup_hosts(zk_config_file) # This looks like ['p3dlhfzk01.cloud.phx3.gdg:2181', 'p3dlhfzk02.cloud.phx3.gdg:2181', # 'p3dlhfzk03.cloud.phx3.gdg:2181'] self.zk_hosts = self.zk_hosts_conn_str.split(",") self.ensemble_nodes = [ZkEnsembleNode(host=h) for h in self.zk_hosts] self.SIZE_FOR_QUORUM = int(len(self.zk_hosts) / 2) + 1 self._leaders = [] # type: ignore self._followers = [] # type: ignore self._standalone = [] # type: ignore self.gather_status() @property def ensemble_size(self): """ Python property that returns the number of nodes in this cluster. """ return len(self.ensemble_nodes) @property def followers_size(self): """ Python property that returns the number of followers in the quorum. """ return len(self._followers) def gather_status(self, force: bool = False): """ Method to gather the status of the nodes in the ensemble. Note, currently if the node has a cached status, then that is what's used. We dont force a re-fetch. :param force: Force re-fetch of the zk node's status/mode """ for node in self.ensemble_nodes: try: node.fetch_mode(force) except Exception: # pylint: disable=W0703 # Squelch any exception we may encounter pass self._leaders = list(filter(lambda node: node.is_leader, self.ensemble_nodes)) self._followers = list(filter(lambda node: node.is_follower, self.ensemble_nodes)) self._standalone = list(filter(lambda node: node.is_standalone, self.ensemble_nodes)) @property def is_in_standalone_mode(self) -> bool: """ Python property to check if the cluster is operating in standalone mode. """ return len(self._standalone) > 0 @property def has_split_brain(self) -> bool: """ Python property to check if the cluster has a split brain, perhaps as a result of a network partition. """ return len(self._leaders) > 1 @property def has_quorum(self) -> bool: """ Python property to check if the cluster currently has quorum. Make sure gather_status has been called before we call this method. """ # The logic for checking if an ensemble has quorum is: # 1. Is not running in standalone mode # 2. There is no split brain i.e. more than one leader # 3. The leader plus follower count is more than the number required for quorum i.e. n/2 rounded up return not self.is_in_standalone_mode and \ not self.has_split_brain \ and (len(self._leaders) + len(self._followers) >= self.SIZE_FOR_QUORUM) def __repr__(self): """ Python repr implementation for this class. """ zk_nodes = ", ".join([n.host for n in self.ensemble_nodes]) if self.is_in_standalone_mode: leader = "STANDALONE" followers = "STANDALONE" elif self.has_quorum: leader = "{}".format(self._leaders[0].host) followers = ", ".join([f.host for f in self._followers]) else: leader = "SPLIT BRAIN" followers = "SPLIT BRAIN" details = "\t\n".join(['{}'.format(node) for node in self.ensemble_nodes]) return 'Nodes: {}\nEnsemble mode? {}\nHas Quorum? {}\nLeader: {}\nFollowers: {}\n\nDetails:\n{}'\ .format( zk_nodes, not self.is_in_standalone_mode, self.has_quorum, leader, followers, details) log/__init__.py000064400000022141147205126350007443 0ustar00# -*- coding: utf-8 -*- from datetime import datetime from logging.handlers import SysLogHandler import logging import logging.config import os import socket from socket import AF_INET, SOCK_DGRAM, SOCK_STREAM, SocketType from typing import Any, Dict, Optional, Tuple # pylint: disable=W0611 from pkg_resources import resource_stream import yaml # If Flask is installed, x-request-id headers are included in logs. try: import flask except ImportError: flask = None # type: ignore import pytz from primordial.timeutils import iso8601_utc from primordial.log.envelope import Envelope from primordial.log.envelopedata import ConfigLogData # Keeping unused imports here to prevent breaking change to hfs, where these classes are referenced via primordial.log from primordial.log.extra import PerfLogExtra, DevLogExtra, BALogExtra # pylint: disable=unused-import SYSLOG_DEFAULT_PORT = 514 SYSLOG_FORMAT_VERSION = 1 COOKIE = '@cee:' PREFIX_MAP = { logging.DEBUG: '<7>', logging.INFO: '<6>', logging.NOTSET: '<6>', logging.WARNING: '<4>', logging.WARN: '<4>', logging.ERROR: '<3>', logging.CRITICAL: '<2>', logging.FATAL: '<1>' } LOG = logging.getLogger(__name__) class VerticalServiceStructuredFormatter(logging.Formatter): """Specialized structured logging formatter for HFS vertical services. Takes a log record and configuration data and formats it according to the structure in the Envelope class, returning a json-formatted log message. """ def __init__(self, *args, log_type: str = ConfigLogData.DEFAULT_TYPE, log_sub_type: str = ConfigLogData.DEFAULT_SUB_TYPE, datacenter: str = ConfigLogData.DEFAULT_DATACENTER, environment: str = ConfigLogData.DEFAULT_ENVIRONMENT, **kwargs) -> None: super().__init__(*args, **kwargs) self.configLogData = ConfigLogData()\ .set(ConfigLogData.TYPE_FIELD, log_type)\ .set(ConfigLogData.SUB_TYPE_FIELD, log_sub_type)\ .set(ConfigLogData.DATACENTER_FIELD, datacenter)\ .set(ConfigLogData.ENVIRONMENT_FIELD, environment) def format(self, record: logging.LogRecord) -> str: logenvelope = Envelope.getEnvelope(record, self.configLogData) return COOKIE + logenvelope.getJSON() class VerticalServiceSystemdFormatter(logging.Formatter): """Systemd logging formatter for HFS vertical services.""" def format(self, record: logging.LogRecord) -> str: prefix = PREFIX_MAP.get(record.levelno, '') return prefix + super().format(record) class VerticalServiceRequestLoggingFilter(logging.Filter): """A log filter for producing only Flask web request messages.""" def filter(self, record: logging.LogRecord) -> bool: if flask and flask.has_request_context(): if not hasattr(record, 'extra'): record.extra = {} # type: ignore request_id = flask.request.headers.get('x-request-id') if request_id: record.extra['request_id'] = request_id # type: ignore return True class HFSSyslogHandler(SysLogHandler): """A customized Log handler for HFS syslogs. Given a log record, emits a log message formatted according to the :class:`VerticalServiceStructuredFormatter` class. """ def __init__(self, # pylint: disable=W0231 ident: Optional[str] = None, address: Tuple[str, int] = ('localhost', SYSLOG_DEFAULT_PORT), facility: int = SysLogHandler.LOG_USER, socktype: SocketType = SOCK_DGRAM) -> None: # type: ignore """Produce a new HFS syslog handler. :param ident: :param address: :param facility: :raises TypeError: For socket types other than TCP or UDP """ if socktype not in (SOCK_DGRAM, SOCK_STREAM): raise TypeError('HFSSyslogHandler only supports TCP and UDP AF_INET sockets') # NB: we avoid the standard: # super(HFSSyslogHandler, self).__init__(address=address, facility=facility, socktype=socktype) # here in preference for the manual init directly below and the implemenatation of the _connect method. # The notable thing here is that we're limiting our ability to handle UNIX style sockets, and, we have added # a restrictive timeout for our socket connections. We may want to revisit this later. --EA logging.Handler.__init__(self) # pylint: disable=non-parent-init-called self.ident = ident or os.getenv('SYSLOG_IDENT') or 'python' self.address = address self.facility = facility self.socktype = socktype # type: ignore self.socket = None # type: Optional[socket.socket] self.unixsocket = False # required by base close() def close(self) -> None: """Ensure socket object, as expected by base class.""" if self.socket is None: self._set_socket() super().close() def emit(self, record: logging.LogRecord) -> None: """Emit a log record. The record is formatted, and an RFC-compliant message is sent to the syslog server. No structured data segment is created and message id is always nil ('-'). :param record: The record to be emitted. """ try: # We need to convert record level to lowercase, maybe this will # change in the future. # NOTE: https://github.com/python/typeshed/issues/2577 prio = self.encodePriority(self.facility, self.mapPriority(record.levelname)) # type: ignore timestamp = iso8601_utc(datetime.fromtimestamp(record.created, pytz.utc)) hostname = os.getenv('HOSTNAME') pid = record.process if record.process is not None else 0 message_id = '-' header = '<%d>%d %s %s %s %d %s' % ( prio, SYSLOG_FORMAT_VERSION, timestamp, hostname, self.ident, pid, message_id ) structured_data = '-' # we don't support structured data message = self.format(record) ascii_part = header + ' ' + structured_data + ' ' utf8_part = message + '\n' # we don't insert the UTF-8 BOM because rsyslog neither wants it nor handles it properly encoded_message = ascii_part.encode('ascii') + utf8_part.encode('utf8') try: if self.socket is None: self._connect_socket() # NOTE: Types are ignored here because of https://github.com/python/mypy/issues/4805 if self.socktype == SOCK_DGRAM: self.socket.sendto(encoded_message, self.address) # type: ignore else: self.socket.sendall(encoded_message) # type: ignore except OSError: # socket error. close it and we'll reconnect next time self.socket.close() # type: ignore self.socket = None except Exception: # pylint: disable=broad-except self.handleError(record) def _connect_socket(self) -> None: self._set_socket() self.socket.settimeout(.5) # type: ignore if self.socktype == SOCK_STREAM: self.socket.connect(self.address) # type: ignore def _set_socket(self) -> None: self.socket = socket.socket(AF_INET, self.socktype) def init_logging(name: str = __name__, filename: str = "logging.yaml", override_loggers: Optional[Dict[str, Any]] = None, log_type: str = ConfigLogData.DEFAULT_TYPE, log_sub_type: str = ConfigLogData.DEFAULT_SUB_TYPE, datacenter: Optional[str] = None, environment: Optional[str] = None) -> None: """ Initialize logging, updating the existing log config loggers dict with an override_loggers dict. It's possible, since we have many ways to run the system, that init_logging be called more than once. We allow this, but warn if so. :param name: package name where the logging YAML config file is located :param filename: filename of the logging YAML config :param override_loggers: a dict of overrides to the default log dictconfig :param log_type: :param log_sub_type: :param datacenter: :param environment: """ datacenter = datacenter if datacenter is not None else ConfigLogData.DEFAULT_DATACENTER environment = environment.upper() if environment is not None else ConfigLogData.DEFAULT_ENVIRONMENT config = yaml.safe_load(resource_stream(name, filename).read().decode()) for formatter in config['formatters'].values(): if formatter.get('()') ==\ 'primordial.log.VerticalServiceStructuredFormatter': formatter['log_type'] = log_type formatter['log_sub_type'] = log_sub_type formatter['environment'] = environment formatter['datacenter'] = datacenter if override_loggers is not None: config['loggers'].update(override_loggers) logging.config.dictConfig(config) logging.captureWarnings(True) log/analyticslogger.py000064400000011252147205126350011074 0ustar00# -*- coding: utf-8 -*- from logging import Logger, LoggerAdapter from typing import Any, Dict, MutableMapping, Optional, Tuple # pylint: disable=W0611 from primordial.log.extra import LogExtra from primordial.log.envelopedata import EnvelopeVersion, EnvelopeRole, EnvelopeDataSchema from primordial.log.envelope import Envelope class AnalyticsLogger(LoggerAdapter): """A LoggerAdapter which specifies an interface for log calls and adds default/initial log data into the record.""" def __init__( self, logger: Logger, envelope_version: Optional[str] = None, data_schema: Optional[str] = None) -> None: self.extra = {} # type: Dict[str, Any] super().__init__(logger, self.extra) # Set up any defaults if envelope_version is None: self._envelope_version = EnvelopeVersion() else: self._envelope_version = EnvelopeVersion.fromString(envelope_version) if data_schema is None: self._data_schema = EnvelopeDataSchema() else: self._data_schema = EnvelopeDataSchema.fromString(data_schema) self.setExtraAttr(Envelope.ENVELOPE_VERSION_FIELD, str(self._envelope_version)) self.setExtraAttr(Envelope.DATA_SCHEMA_FIELD, str(self._data_schema)) self.role = EnvelopeRole() self.setExtraAttr(EnvelopeRole.ROLE_FIELD, str(self.role)) self.validate_data_schema = False @property def performance(self) -> 'AnalyticsLogger': self._role(EnvelopeRole.ROLE_PERFORMANCE) return self @property def development(self) -> 'AnalyticsLogger': self._role(EnvelopeRole.ROLE_DEVELOPMENT) return self @property def business_analytics(self) -> 'AnalyticsLogger': self._role(EnvelopeRole.ROLE_BUSINESS_ANALYTICS) return self @property def validate(self) -> 'AnalyticsLogger': self.validate_data_schema = True return self def setExtraAttr(self, field: str, value: Any) -> None: self.extra[field] = value # Calls to info() etc will call this method by default def process(self, msg: str, kwargs: MutableMapping[str, Any]) -> Tuple[str, MutableMapping[str, Any]]: if 'envelope_version' in kwargs: self.v(kwargs['envelope_version']) if 'data_schema' in kwargs: self.s(kwargs['data_schema']) if 'role' in kwargs: self._role(kwargs['role']) self._validate_schema() logextra = LogExtra.getLogExtra(**self.extra) logextra.set(**kwargs) new_kwargs = {} new_kwargs['extra'] = logextra.get() self._reset() return msg, new_kwargs def envelope_version(self, envelope_version: str) -> 'AnalyticsLogger': """Permanently set envelope_version for the lifetime of this logger.""" self._envelope_version = EnvelopeVersion.fromString(envelope_version) self.setExtraAttr(Envelope.ENVELOPE_VERSION_FIELD, str(self._envelope_version)) return self def data_schema(self, data_schema: str) -> 'AnalyticsLogger': """Permanently set data_schema for the lifetime of this logger.""" self._data_schema = EnvelopeDataSchema.fromString(data_schema) self.setExtraAttr(Envelope.DATA_SCHEMA_FIELD, str(self._data_schema)) return self def v(self, envelope_version: str) -> 'AnalyticsLogger': """Temporary override envelope_version for one logging call.""" self.setExtraAttr(Envelope.ENVELOPE_VERSION_FIELD, envelope_version) return self def s(self, data_schema: str) -> 'AnalyticsLogger': """Temporary override data_schema for one logging call.""" self.setExtraAttr(Envelope.DATA_SCHEMA_FIELD, data_schema) return self def _role(self, role: str) -> None: self.role = EnvelopeRole().set(EnvelopeRole.ROLE_FIELD, role) self.setExtraAttr(EnvelopeRole.ROLE_FIELD, str(self.role)) # Implementers will need to override this class and function for data_schema validation def _validate_schema(self) -> bool: if self.validate_data_schema: # pylint: disable=no-else-return data_schema = self.extra['data_schema'] # pylint: disable=unused-variable # TODO - implement for envelope v2 return True else: return True def _reset(self) -> None: self.role = EnvelopeRole() self.validate_data_schema = False self.extra = {} self.setExtraAttr(EnvelopeRole.ROLE_FIELD, str(self.role)) self.setExtraAttr(Envelope.ENVELOPE_VERSION_FIELD, str(self._envelope_version)) self.setExtraAttr(Envelope.DATA_SCHEMA_FIELD, str(self._data_schema)) log/data.py000064400000005672147205126350006627 0ustar00# -*- coding: utf-8 -*- import abc import json import logging import logging.config from typing import Any, Dict, Optional, Tuple # pylint: disable=W0611 LOG = logging.getLogger(__name__) class LogData(metaclass=abc.ABCMeta): """Logging data""" FIELDS = () # type: Tuple[str, ...] RAW_FIELDS = () # type: Tuple[str, ...] def __init__(self, record: Optional[logging.LogRecord] = None) -> None: self.data = {} # type: Dict[str, Any] self.freefields = False if not hasattr(self, 'freefields') else self.freefields # type: bool self.record = None # type: Optional[logging.LogRecord] if isinstance(record, logging.LogRecord): self.load(record) def get(self, field: Optional[str] = None) -> Any: """Get the data from a single field. :param field: The field to get """ if field is None: # Return 'data' dict return self.data if field in self.data: # Return particular field from 'data' dict return self.data[field] return None def getDict(self) -> Dict[str, Any]: """Get a dictionary of the data.""" datadict = {} for key, val in self.get().items(): if isinstance(val, LogData): datadict[key] = val.getDict() else: datadict[key] = val return datadict def getJSON(self) -> str: return json.dumps(self.getDict()) def parseVal(self, field: str, value: Any) -> Any: """Parse a value and coerce it into appropriate datatypes. :param field: The name of the field to be parsed :param value: The value of the field. """ if isinstance(value, dict): for k, v in value.items(): value[k] = self.parseVal(k, v) return value # process_time needs to be a float for Kibana analysis if (field in self.RAW_FIELDS or value is None or field == "process_time" or isinstance(value, (LogData, bool))): return value # If we haven't specified otherwise, make sure value is a string, # for consistency of data type and to make readable in logs/ES/Kibana return str(value) def set(self, field: str, value: Any): """Set a value in the data dictionary. :param field: The field to be set :param value: The value of the field :raises ValueError: If the field name is invalid """ if field in self.FIELDS or self.freefields: self.data[field] = self.parseVal(field, value) else: raise ValueError("No such field '" + field + "'") return self def load(self, record: logging.LogRecord) -> None: """Load a log record. :param record: The record to be loaded """ self.record = record # Child should implement any class-specific loading code log/envelope.py000064400000015417147205126350007531 0ustar00# -*- coding: utf-8 -*- import abc import logging import logging.config from datetime import datetime from typing import Optional # pylint: disable=W0611 import pytz from primordial.timeutils import iso8601_utc from primordial.log.extra import LogExtra from primordial.log.data import LogData from primordial.log.envelopedata import EnvelopeVersion, EnvelopeRole, ConfigLogData, HostLogData, PayloadLogData LOG = logging.getLogger(__name__) # Abstract Factory class class Envelope(LogData, metaclass=abc.ABCMeta): """Customized envelope for sending log data. Given a log record, envelope version and configuration data, constructs a log message in the required format which can be returned as a json. """ ENVELOPE_VERSION_FIELD = 'envelope_version' DATA_SCHEMA_FIELD = 'data_schema' @staticmethod def getEnvelope(record: logging.LogRecord, configLogData: ConfigLogData, envelopeVersion: Optional[EnvelopeVersion] = None) -> 'Envelope': """A factory method for producing an Envelope instance. :param record: The log record :param configLogData: The logging data :param envelopeVersion: The version of the envelope :returns: A new Envelope object :rtype: :class:`Envelope` """ extra = getattr(record, LogExtra.EXTRA_FIELD, {}) logextra = None if isinstance(extra, dict): try: logextra = LogExtra.getLogExtra(**extra) except Exception as e: # pylint: disable=broad-except LOG.error("Unable to process extra log fields: %s", str(e)) if not isinstance(envelopeVersion, EnvelopeVersion): envelopeVersion = EnvelopeVersion() if isinstance(logextra, LogExtra): if logextra.get(Envelope.ENVELOPE_VERSION_FIELD) is not None: version = logextra.get(Envelope.ENVELOPE_VERSION_FIELD) envelopeVersion = EnvelopeVersion.fromString(version) major = envelopeVersion.get(EnvelopeVersion.MAJOR_FIELD) # type: ignore if major == 1: return EnvelopeV1(record, envelopeVersion, configLogData, logextra) # type: ignore LOG.error("Unknown log envelope major version %s. Using default version", str(major)) return Envelope.getEnvelope(record, configLogData, EnvelopeVersion()) # Envelope v1. Extend and overload for v2 onwards. class EnvelopeV1(Envelope): """Version 1 of the logging envelope.""" BRAND_ID_FIELD = 'brand_id' DATACENTER_FIELD = 'datacenter' ENVIRONMENT_FIELD = 'environment' ENVELOPE_VERSION_FIELD = Envelope.ENVELOPE_VERSION_FIELD HOST_FIELD = 'host' PAYLOAD_FIELD = 'data' REQUEST_ID_FIELD = 'request_id' ROLE_FIELD = EnvelopeRole.ROLE_FIELD SESSION_ID_FIELD = 'session_id' SEVERITY_FIELD = 'severity' SUB_TYPE_FIELD = 'sub_type' TIMESTAMP_FIELD = 'timestamp' TYPE_FIELD = 'type' DEFAULT_ROLE = EnvelopeRole.ROLE_DEVELOPMENT FIELDS = ( BRAND_ID_FIELD, PAYLOAD_FIELD, DATACENTER_FIELD, ENVELOPE_VERSION_FIELD, ENVIRONMENT_FIELD, HOST_FIELD, REQUEST_ID_FIELD, ROLE_FIELD, SESSION_ID_FIELD, SEVERITY_FIELD, SUB_TYPE_FIELD, TIMESTAMP_FIELD, TYPE_FIELD ) EXTRA_MODIFIABLE_FIELDS = ( PAYLOAD_FIELD, SESSION_ID_FIELD, BRAND_ID_FIELD, SUB_TYPE_FIELD, REQUEST_ID_FIELD, ROLE_FIELD ) EXTRA_IGNORE_FIELDS = (Envelope.ENVELOPE_VERSION_FIELD, Envelope.DATA_SCHEMA_FIELD) def __init__(self, record: logging.LogRecord, envelopeVersion: EnvelopeVersion, configLogData: ConfigLogData, logextra: LogExtra) -> None: super().__init__() self.envelopeVersion = envelopeVersion self.configLogData = configLogData self.set(self.ENVELOPE_VERSION_FIELD, str(envelopeVersion)) self.logextra = logextra self.payloadLogData = None # type: Optional[PayloadLogData] self.load(record) def load(self, record: logging.LogRecord) -> None: super().load(record) self.loadDefaults() self.loadConfig() self.loadRecordFields() self.loadHost() self.loadPayload() self.loadExtra() def loadDefaults(self) -> None: """Load default fields.""" self.set(self.BRAND_ID_FIELD, None) # Optional self.set(self.REQUEST_ID_FIELD, None) # Optional self.set(self.ROLE_FIELD, str(EnvelopeRole())) # Default self.set(self.SESSION_ID_FIELD, None) # Optional def loadConfig(self) -> None: """Load configuration data.""" self.set(self.DATACENTER_FIELD, self.configLogData.get(ConfigLogData.DATACENTER_FIELD)) self.set(self.ENVIRONMENT_FIELD, self.configLogData.get(ConfigLogData.ENVIRONMENT_FIELD)) self.set(self.SUB_TYPE_FIELD, self.configLogData.get(ConfigLogData.SUB_TYPE_FIELD)) # Optional self.set(self.TYPE_FIELD, self.configLogData.get(ConfigLogData.TYPE_FIELD)) def loadPayload(self) -> None: """Load the payload from the log record.""" self.payloadLogData = PayloadLogData(self.record) self.set(self.PAYLOAD_FIELD, self.payloadLogData) def loadHost(self) -> None: """Load the host data.""" self.set(self.HOST_FIELD, HostLogData(self.record)) def loadRecordFields(self) -> None: """Load relevant fields from the log record.""" timestamp = iso8601_utc(datetime.fromtimestamp(self.record.created, pytz.utc)) # type: ignore self.set(self.TIMESTAMP_FIELD, timestamp) self.set(self.SEVERITY_FIELD, self.record.levelname) # type: ignore def loadExtra(self) -> None: """Load any "extra" data.""" if isinstance(self.logextra, LogExtra): try: for k, v in self.logextra.get()[LogExtra.EXTRA_FIELD].items(): if k in self.EXTRA_IGNORE_FIELDS: continue if k in self.EXTRA_MODIFIABLE_FIELDS: if k == self.PAYLOAD_FIELD: if isinstance(v, dict): for key, val in v.items(): self.payloadLogData.set(key, val) # type: ignore else: raise ValueError("Payload is not of type 'dict'") else: self.set(k, v) else: # Place in 'data' block self.payloadLogData.set(k, v) # type: ignore except Exception as e: # pylint: disable=broad-except LOG.error("Unable to process extra log fields: %s", str(e)) log/envelopedata.py000064400000014314147205126350010356 0ustar00# -*- coding: utf-8 -*- from typing import Optional import logging import logging.config import os import traceback from primordial.log.data import LogData from primordial.constants import IP LOG = logging.getLogger(__name__) class ConfigLogData(LogData): """Store configuration log data for passing to other logging classes and easy retrieval.""" TYPE_FIELD = 'type' SUB_TYPE_FIELD = 'sub_type' DATACENTER_FIELD = 'datacenter' ENVIRONMENT_FIELD = 'environment' FIELDS = (TYPE_FIELD, SUB_TYPE_FIELD, DATACENTER_FIELD, ENVIRONMENT_FIELD) DEFAULT_TYPE = 'hfs' DEFAULT_SUB_TYPE = 'vhfs' DEFAULT_DATACENTER = 'local' DEFAULT_ENVIRONMENT = 'LOCAL' def __init__(self) -> None: super().__init__() self.set(self.TYPE_FIELD, self.DEFAULT_TYPE) self.set(self.SUB_TYPE_FIELD, self.DEFAULT_SUB_TYPE) self.set(self.DATACENTER_FIELD, self.DEFAULT_DATACENTER) self.set(self.ENVIRONMENT_FIELD, self.DEFAULT_ENVIRONMENT) class PayloadLogData(LogData): """Extract and store payload log data for parsing and easy retrieval.""" DATA_SCHEMA_VERSION = '1.0.0' LINENO_FIELD = 'lineno' RAW_FIELDS = (LINENO_FIELD,) def __init__(self, record: Optional[logging.LogRecord] = None) -> None: self.freefields = True super().__init__(record) def load(self, record: logging.LogRecord) -> None: super().load(record) self.set('msg', record.getMessage()) self.set('pathname', record.pathname) self.set('class_name', None) self.set('module', record.module) self.set('filename', record.filename) self.set('func_name', record.funcName) self.set(self.LINENO_FIELD, record.lineno) self.set('version', self.DATA_SCHEMA_VERSION) if record.exc_info: exc_type, exc_value, exc_traceback = record.exc_info self.set('strace', ''.join( traceback.format_exception(exc_type, exc_value, exc_traceback))) class ServiceLogData(LogData): """Extract and store service log data for parsing and easy retrieval.""" NAME_FIELD = 'name' THREAD_FIELD = 'thread' FIELDS = (NAME_FIELD, THREAD_FIELD) def load(self, record: logging.LogRecord) -> None: super().load(record) self.set(self.NAME_FIELD, os.getenv('SYSLOG_IDENT') or record.processName) self.set(self.THREAD_FIELD, str(record.thread)) class HostLogData(LogData): """Extract and store host log data for parsing and easy retrieval.""" SERVICE_FIELD = 'service' HOSTNAME_FIELD = 'hostname' IP_FIELD = 'ip' FIELDS = (HOSTNAME_FIELD, IP_FIELD, SERVICE_FIELD) def load(self, record: logging.LogRecord) -> None: super().load(record) self.set(self.HOSTNAME_FIELD, os.getenv('HOSTNAME')) self.set(self.IP_FIELD, IP) serviceLogData = ServiceLogData(record) self.set(self.SERVICE_FIELD, serviceLogData) class EnvelopeRole(LogData): """Store log role (e.g. ''PERFORMANCE' ) for passing to other classes and easy retrieval.""" ROLE_FIELD = 'role' ROLE_DEVELOPMENT = 'DEVELOPMENT' ROLE_PERFORMANCE = 'PERFORMANCE' ROLE_BUSINESS_ANALYTICS = 'BUSINESS_ANALYTICS' FIELDS = (ROLE_FIELD,) DEFAULT_ROLE = ROLE_DEVELOPMENT def __init__(self) -> None: super().__init__() self.set(self.ROLE_FIELD, self.DEFAULT_ROLE) def __str__(self) -> str: return str(self.get(self.ROLE_FIELD)) __repr__ = __str__ class EnvelopeVersion(LogData): """Store Envelope version (e.g. '1.0.0') for passing to other classes and easy retrieval.""" MAJOR_FIELD = 'major' MINOR_FIELD = 'minor' PATCH_FIELD = 'patch' FIELDS = (MAJOR_FIELD, MINOR_FIELD, PATCH_FIELD) RAW_FIELDS = FIELDS DEFAULT_MAJOR = 1 DEFAULT_MINOR = 0 DEFAULT_PATCH = 0 def __init__(self) -> None: super().__init__() self.set(self.MAJOR_FIELD, self.DEFAULT_MAJOR) self.set(self.MINOR_FIELD, self.DEFAULT_MINOR) self.set(self.PATCH_FIELD, self.DEFAULT_PATCH) def __str__(self) -> str: return str(".".join([str(self.get(field)) for field in self.FIELDS])) __repr__ = __str__ @staticmethod def fromString(version: str) -> 'EnvelopeVersion': major, minor, patch = map(int, version.split(".")) envelopeVersion = EnvelopeVersion() envelopeVersion.set(EnvelopeVersion.MAJOR_FIELD, major) envelopeVersion.set(EnvelopeVersion.MINOR_FIELD, minor) envelopeVersion.set(EnvelopeVersion.PATCH_FIELD, patch) return envelopeVersion class EnvelopeDataSchema(LogData): """Store Envelope data schema version (e.g. '1.0.0') for passing to other classes and easy retrieval.""" NAME_FIELD = 'name' MAJOR_FIELD = 'major' MINOR_FIELD = 'minor' PATCH_FIELD = 'patch' VERSION_SEPARATOR = '.' NAME_VERSION_SEPARATOR = '-' FIELDS = (NAME_FIELD, MAJOR_FIELD, MINOR_FIELD, PATCH_FIELD) RAW_FIELDS = FIELDS DEFAULT_NAME = 'default' DEFAULT_MAJOR = 1 DEFAULT_MINOR = 0 DEFAULT_PATCH = 0 def __init__(self) -> None: super().__init__() self.set(self.NAME_FIELD, self.DEFAULT_NAME) self.set(self.MAJOR_FIELD, self.DEFAULT_MAJOR) self.set(self.MINOR_FIELD, self.DEFAULT_MINOR) self.set(self.PATCH_FIELD, self.DEFAULT_PATCH) def __str__(self) -> str: version = str( self.VERSION_SEPARATOR.join( [str(self.get(field)) for field in (self.MAJOR_FIELD, self.MINOR_FIELD, self.PATCH_FIELD)])) return str(self.NAME_VERSION_SEPARATOR.join([self.get(self.NAME_FIELD), version])) __repr__ = __str__ @staticmethod def fromString(data_schema: str) -> 'EnvelopeDataSchema': name, version = map(str, data_schema.split(EnvelopeDataSchema.NAME_VERSION_SEPARATOR)) major, minor, patch = map(int, version.split(EnvelopeDataSchema.VERSION_SEPARATOR)) envelopeDataSchema = EnvelopeDataSchema() envelopeDataSchema.set(EnvelopeDataSchema.NAME_FIELD, name) envelopeDataSchema.set(EnvelopeDataSchema.MAJOR_FIELD, major) envelopeDataSchema.set(EnvelopeDataSchema.MINOR_FIELD, minor) envelopeDataSchema.set(EnvelopeDataSchema.PATCH_FIELD, patch) return envelopeDataSchema log/extra.py000064400000005167147205126350007040 0ustar00# -*- coding: utf-8 -*- import abc import logging.config from typing import Any, Dict, Optional # pylint: disable=W0611 from primordial.log.envelopedata import EnvelopeRole LOG = logging.getLogger(__name__) # Abstract Factory class class LogExtra(metaclass=abc.ABCMeta): """Abstract Base Class that provides a set of methods for extraction, setting and retrieval of 'extra' log data.""" EXTRA_FIELD = 'extra' def __init__(self, **kwargs: str) -> None: self.extra = {} # type: Dict[str, Any] self.set(**kwargs) @staticmethod def getLogExtra(**kwargs: str) -> 'LogExtra': """Factory method to create a new LogExtra instance.""" # Default role to 'DEVELOPMENT' role = EnvelopeRole.DEFAULT_ROLE if EnvelopeRole.ROLE_FIELD in kwargs: role = kwargs[EnvelopeRole.ROLE_FIELD] if role == EnvelopeRole.ROLE_DEVELOPMENT: return DevLogExtra(**kwargs) if role == EnvelopeRole.ROLE_BUSINESS_ANALYTICS: return BALogExtra(**kwargs) if role == EnvelopeRole.ROLE_PERFORMANCE: return PerfLogExtra(**kwargs) raise ValueError("Invalid role " + role) def set(self, **kwargs: str) -> None: """Set fields on a LogExtra instance.""" for k, v in kwargs.items(): if k is not EnvelopeRole.ROLE_FIELD: self.setField(k, v) def setField(self, field: str, value: Any) -> None: """Set an individual field's value. :param field: The field to be set :param value: The value of the field """ self.extra[field] = value def get(self, field: Optional[str] = None) -> Any: if field is None: # Return 'extra' dict in format required for logger return {'extra': self.extra} if field in self.extra: # Return particular field from 'extra' dict return self.extra[field] return None class DevLogExtra(LogExtra): """Log extra data for development.""" def __init__(self, **kwargs: str) -> None: super().__init__(**kwargs) self.extra[EnvelopeRole.ROLE_FIELD] = EnvelopeRole.ROLE_DEVELOPMENT class BALogExtra(LogExtra): """Log extra data for business analytics.""" def __init__(self, **kwargs: str) -> None: super().__init__(**kwargs) self.extra[EnvelopeRole.ROLE_FIELD] = EnvelopeRole.ROLE_BUSINESS_ANALYTICS class PerfLogExtra(LogExtra): """Log extra data for performance data.""" def __init__(self, **kwargs: str) -> None: super().__init__(**kwargs) self.extra[EnvelopeRole.ROLE_FIELD] = EnvelopeRole.ROLE_PERFORMANCE log/logging.yaml000064400000001511147205126350007642 0ustar00--- version: 1 disable_existing_loggers: true formatters: syslog: (): primordial.log.VerticalServiceStructuredFormatter console: (): primordial.log.VerticalServiceSystemdFormatter format: "%(name)-12s: %(levelname)-8s %(message)s" filters: request_header_logging: (): primordial.log.VerticalServiceRequestLoggingFilter handlers: syslog: class: primordial.log.HFSSyslogHandler formatter: syslog filters: - request_header_logging address: [127.0.0.1, 514] facility: local0 socktype: ext://socket.SOCK_STREAM console: class: logging.StreamHandler level: DEBUG formatter: console stream: ext://sys.stdout loggers: primordial: level: DEBUG handlers: - console - syslog propagate: no root: level: DEBUG handlers: - console - syslog log/__pycache__/__init__.cpython-38.pyc000064400000017214147205126350013736 0ustar00U afa$@sddlmZddlmZddlZddlZddlZddlZddlmZmZm Z m Z ddl m Z m Z mZmZddlmZddlZz ddlZWnek rdZYnXddlZddlmZddlmZdd lmZdd lmZmZmZd Z d Z!d Z"ej#dej$dej%dej&dej'dej(dej)dej*diZ+e,e-Z.Gdddej/Z0Gdddej/Z1Gdddej2Z3GdddeZ4e-ddej5ej6ddfe7e7ee e7e fe7e7ee7ee7ddddZ8dS) )datetime) SysLogHandlerN)AF_INET SOCK_DGRAM SOCK_STREAM SocketType)AnyDictOptionalTuple)resource_stream) iso8601_utc)Envelope) ConfigLogData) PerfLogExtra DevLogExtra BALogExtraiz@cee:z<7>z<6>z<4>z<3>z<2>z<1>csTeZdZdZejejejejde e e e ddfddZ e j e ddd Z ZS) "VerticalServiceStructuredFormatterzSpecialized structured logging formatter for HFS vertical services. Takes a log record and configuration data and formats it according to the structure in the Envelope class, returning a json-formatted log message. )log_type log_sub_type datacenter environmentN)rrrrreturncsBtj||ttj|tj|tj|tj||_dSN) super__init__rsetZ TYPE_FIELDZSUB_TYPE_FIELDZDATACENTER_FIELDZENVIRONMENT_FIELD configLogData)selfrrrrargskwargs __class__H/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/log/__init__.pyr7sz+VerticalServiceStructuredFormatter.__init__recordrcCst||j}t|Sr)rZ getEnveloperCOOKIEZgetJSON)rr'Z logenveloper$r$r%formatGsz)VerticalServiceStructuredFormatter.format)__name__ __module__ __qualname____doc__r DEFAULT_TYPEDEFAULT_SUB_TYPEDEFAULT_DATACENTERDEFAULT_ENVIRONMENTstrrlogging LogRecordr) __classcell__r$r$r"r%r0srcs*eZdZdZejedfdd ZZS)VerticalServiceSystemdFormatterz4Systemd logging formatter for HFS vertical services.r&cst|jd}|t|S)N) PREFIX_MAPgetlevelnorr))rr'prefixr"r$r%r)Osz&VerticalServiceSystemdFormatter.format) r*r+r,r-r3r4r2r)r5r$r$r"r%r6Lsr6c@s"eZdZdZejedddZdS)#VerticalServiceRequestLoggingFilterz;A log filter for producing only Flask web request messages.r&cCs<tr8tr8t|dsi|_tjjd}|r8||jd<dS)Nextraz x-request-id request_idT)flaskZhas_request_contexthasattrr=requestheadersr9)rr'r>r$r$r%filterWs   z*VerticalServiceRequestLoggingFilter.filterN)r*r+r,r-r3r4boolrCr$r$r$r%r<Tsr<cseZdZdZddefejefee e e e fe e ddddZ ddfdd Zejdd d d Zddd dZddddZZS)HFSSyslogHandlerzA customized Log handler for HFS syslogs. Given a log record, emits a log message formatted according to the :class:`VerticalServiceStructuredFormatter` class. N localhost)identaddressfacilitysocktypercCsV|ttfkrtdtj||p0tdp0d|_||_ ||_ ||_ d|_ d|_ dS)zProduce a new HFS syslog handler. :param ident: :param address: :param facility: :raises TypeError: For socket types other than TCP or UDP z:HFSSyslogHandler only supports TCP and UDP AF_INET socketsZ SYSLOG_IDENTpythonNF)rr TypeErrorr3HandlerrosgetenvrGrHrIrJsocket unixsocket)rrGrHrIrJr$r$r%rhs  zHFSSyslogHandler.__init__)rcs |jdkr|tdS)z0Ensure socket object, as expected by base class.N)rP _set_socketrcloserr"r$r%rSs zHFSSyslogHandler.closer&c Cs0z||j||j}tt|jtj }t d}|j dk rH|j nd}d}d|t |||j||f}d}||} |d|d} | d} | d| d } z>|jdkr||jtkr|j| |jn |j| Wn&tk r|jd|_YnXWn tk r*||YnXdS) zEmit a log record. The record is formatted, and an RFC-compliant message is sent to the syslog server. No structured data segment is created and message id is always nil ('-'). :param record: The record to be emitted. HOSTNAMENr-z<%d>%d %s %s %s %d %s  asciiutf8)encodePriorityrI mapPriority levelnamer r fromtimestampcreatedpytzutcrNrOprocessSYSLOG_FORMAT_VERSIONrGr)encoderP_connect_socketrJrsendtorHsendallOSErrorrS Exception handleError) rr'prio timestamphostnamepidZ message_idheaderZstructured_datamessageZ ascii_partZ utf8_partZencoded_messager$r$r%emitsB      zHFSSyslogHandler.emitcCs0||jd|jtkr,|j|jdS)Ng?)rRrP settimeoutrJrconnectrHrTr$r$r%res  z HFSSyslogHandler._connect_socketcCstt|j|_dSr)rPrrJrTr$r$r%rRszHFSSyslogHandler._set_socket)r*r+r,r-SYSLOG_DEFAULT_PORTrLOG_USERrr r2r intrrrSr3r4rqrerRr5r$r$r"r%rEas   0rEz logging.yaml)namefilenameoverride_loggersrrrrrc Cs|dk r |ntj}|dk r"|ntj}tt||}|d D]2}| ddkrL||d<||d<||d<||d<qL|dk r|d  |t j |t d dS) a+ Initialize logging, updating the existing log config loggers dict with an override_loggers dict. It's possible, since we have many ways to run the system, that init_logging be called more than once. We allow this, but warn if so. :param name: package name where the logging YAML config file is located :param filename: filename of the logging YAML config :param override_loggers: a dict of overrides to the default log dictconfig :param log_type: :param log_sub_type: :param datacenter: :param environment: N formattersz()z1primordial.log.VerticalServiceStructuredFormatterrrrrloggersT)rr0upperr1yamlZ safe_loadr readdecodevaluesr9updater3config dictConfigcaptureWarnings) rwrxryrrrrr formatterr$r$r% init_loggings  r)9rlogging.handlersrr3logging.configrNrPrrrrtypingrr r r pkg_resourcesr r}r? ImportErrorr`Zprimordial.timeutilsr Zprimordial.log.enveloperZprimordial.log.envelopedatarZprimordial.log.extrarrrrtrcr(DEBUGINFONOTSETWARNINGWARNERRORCRITICALFATALr8 getLoggerr*LOG Formatterrr6Filterr<rEr.r/r2rr$r$r$r%sv          dlog/__pycache__/analyticslogger.cpython-38.pyc000064400000010645147205126350015367 0ustar00U af@slddlmZmZddlmZmZmZmZmZddl m Z ddl m Z m Z mZddlmZGdddeZdS) )Logger LoggerAdapter)AnyDictMutableMappingOptionalTuple)LogExtra)EnvelopeVersion EnvelopeRoleEnvelopeDataSchema)Envelopecs&eZdZdZd&eeeeeddfdd ZeddddZ eddd d Z eddd d Z eddd dZ ee ddddZeeee feeeee ffdddZeddddZeddddZeddddZeddddZeddd d!Zedd"d#Zddd$d%ZZS)'AnalyticsLoggerzmA LoggerAdapter which specifies an interface for log calls and adds default/initial log data into the record.N)loggerenvelope_version data_schemareturncsi|_t||j|dkr(t|_n t||_|dkrFt|_n t||_|t j t |j|t j t |jt |_|t jt |jd|_dSNF)extrasuper__init__r _envelope_version fromStringr _data_schema setExtraAttrr ENVELOPE_VERSION_FIELDstrDATA_SCHEMA_FIELDr role ROLE_FIELDvalidate_data_schema)selfrrr __class__O/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/log/analyticslogger.pyrs    zAnalyticsLogger.__init__)rcCs|tj|SN)_roler ZROLE_PERFORMANCEr!r$r$r% performance&s zAnalyticsLogger.performancecCs|tj|Sr&)r'r ZROLE_DEVELOPMENTr(r$r$r% development+s zAnalyticsLogger.developmentcCs|tj|Sr&)r'r ZROLE_BUSINESS_ANALYTICSr(r$r$r%business_analytics0s z"AnalyticsLogger.business_analyticscCs d|_|S)NT)r r(r$r$r%validate5szAnalyticsLogger.validate)fieldvaluercCs||j|<dSr&)r)r!r-r.r$r$r%r:szAnalyticsLogger.setExtraAttr)msgkwargsrcCsd|kr||dd|kr,||dd|krB||d|tjf|j}|jf|i}||d<| ||fS)Nrrrr) vsr'_validate_schemar Z getLogExtrarsetget_reset)r!r/r0ZlogextraZ new_kwargsr$r$r%process>s  zAnalyticsLogger.process)rrcCs$t||_|tjt|j|S)zAPermanently set envelope_version for the lifetime of this logger.)r rrrr rrr!rr$r$r%rQs z AnalyticsLogger.envelope_version)rrcCs$t||_|tjt|j|S)zs   log/__pycache__/data.cpython-38.pyc000064400000005312147205126350013104 0ustar00U af @sZddlZddlZddlZddlZddlmZmZmZmZe e Z Gdddej dZ dS)N)AnyDictOptionalTuplec@seZdZdZdZdZdeejddddZ dee e ddd Z e e e fd d d Ze d d dZe e e dddZe e dddZejddddZdS)LogDataz Logging dataN)recordreturncCs<i|_t|dsdn|j|_d|_t|tjr8||dS)N freefieldsF)datahasattrr r isinstancelogging LogRecordloadselfrrrD/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/log/data.py__init__s  zLogData.__init__)fieldr cCs&|dkr|jS||jkr"|j|SdS)zRGet the data from a single field. :param field: The field to get N)r )rrrrrgets   z LogData.get)r cCs>i}|D](\}}t|tr0|||<q|||<q|S)zGet a dictionary of the data.)ritemsr rgetDict)rZdatadictkeyvalrrrr&s   zLogData.getDictcCst|S)N)jsondumpsr)rrrrgetJSON1szLogData.getJSON)rvaluer cCsdt|tr0|D]\}}|||||<q|S||jksX|dksX|dksXt|ttfr\|St|S)zParse a value and coerce it into appropriate datatypes. :param field: The name of the field to be parsed :param value: The value of the field. N process_time)r dictrparseVal RAW_FIELDSrboolstr)rrrkvrrrr!4s   zLogData.parseVal)rrcCs8||jks|jr$||||j|<ntd|d|S)zSet a value in the data dictionary. :param field: The field to be set :param value: The value of the field :raises ValueError: If the field name is invalid zNo such field '')FIELDSr r!r ValueError)rrrrrrsetHsz LogData.setcCs ||_dS)zKLoad a log record. :param record: The record to be loaded N)rrrrrrVsz LogData.load)N)N)__name__ __module__ __qualname____doc__r(r"rrrrr$rrrrrr!r*rrrrrr s  r) metaclass)abcrrlogging.configtypingrrrr getLoggerr+LOGABCMetarrrrrs  log/__pycache__/envelope.cpython-38.pyc000064400000012551147205126350014013 0ustar00U af@sddlZddlZddlZddlmZddlmZddlZddlmZddl m Z ddl m Z ddl mZmZmZmZmZeeZGdd d e ejd ZGd d d eZdS) N)datetime)Optional) iso8601_utc)LogExtra)LogData)EnvelopeVersion EnvelopeRole ConfigLogData HostLogDataPayloadLogDatac@s8eZdZdZdZdZedeje e e ddddZ dS) EnvelopezCustomized envelope for sending log data. Given a log record, envelope version and configuration data, constructs a log message in the required format which can be returned as a json. Zenvelope_versionZ data_schemaN)record configLogDataenvelopeVersionreturnc Cst|tji}d}t|tr`ztjf|}Wn2tk r^}ztdt |W5d}~XYnXt|t st }t|tr| t j dk r| t j }t |}| t j}|dkrt||||Stdt |t ||t S)aA factory method for producing an Envelope instance. :param record: The log record :param configLogData: The logging data :param envelopeVersion: The version of the envelope :returns: A new Envelope object :rtype: :class:`Envelope` N&Unable to process extra log fields: %szszEnvelopeV1.loadConfigcCs t|j|_||j|jdS)z%Load the payload from the log record.N)r r r8r7 PAYLOAD_FIELDrGr$r$r%rAs zEnvelopeV1.loadPayloadcCs||jt|jdS)zLoad the host data.N)r7 HOST_FIELDr r rGr$r$r%r@szEnvelopeV1.loadHostcCs:tt|jjtj}||j|||j |jj dS)z)Load relevant fields from the log record.N) rr fromtimestampr createdpytzutcr7TIMESTAMP_FIELDSEVERITY_FIELD levelname)r:r2r$r$r%r?szEnvelopeV1.loadRecordFieldsc Cst|jtrz|jtjD]z\}}||jkr6q"||jkr||jkrt|t rv|D]\}}|j ||q\qt dq| ||q"|j ||q"Wn2t k r}ztdt|W5d}~XYnXdS)zLoad any "extra" data.zPayload is not of type 'dict'rN)rr rrritemsEXTRA_IGNORE_FIELDSEXTRA_MODIFIABLE_FIELDSrLrr8r7 ValueErrorrrrr)r:kvkeyvalr!r$r$r%rBs      zEnvelopeV1.loadExtra)'r&r'r(r)rCrHrIr rrMrLrDrrErFrSrJrRrKZROLE_DEVELOPMENTZ DEFAULT_ROLEZFIELDSrWr*rVr,r-rr rr6r9r=r>rAr@r?rB __classcell__r$r$r;r%rCsd   r)abcr,logging.configrtypingrrPZprimordial.timeutilsrZprimordial.log.extrarZprimordial.log.datarZprimordial.log.envelopedatarrr r r getLoggerr&rABCMetar rr$r$r$r%s      .log/__pycache__/envelopedata.cpython-38.pyc000064400000016110147205126350014640 0ustar00U af@sddlmZddlZddlZddlZddlZddlmZddlm Z e e Z GdddeZ GdddeZGd d d eZGd d d eZGd ddeZGdddeZGdddeZdS))OptionalN)LogData)IPcsReZdZdZdZdZdZdZeeeefZdZ dZ dZ d Z d d fd d Z ZS) ConfigLogDatazUStore configuration log data for passing to other logging classes and easy retrieval.typeZsub_typeZ datacenter environmentZhfsZvhfslocalZLOCALNreturncsNt||j|j||j|j||j|j||j |j dSN) super__init__set TYPE_FIELD DEFAULT_TYPESUB_TYPE_FIELDDEFAULT_SUB_TYPEDATACENTER_FIELDDEFAULT_DATACENTERENVIRONMENT_FIELDDEFAULT_ENVIRONMENTself __class__L/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/log/envelopedata.pyr s  zConfigLogData.__init__)__name__ __module__ __qualname____doc__rrrrFIELDSrrrrr __classcell__rrrrrs rcsTeZdZdZdZdZefZd eej ddfdd Z ej ddfdd Z Z S) PayloadLogDatazBExtract and store payload log data for parsing and easy retrieval.z1.0.0linenoNrecordr csd|_t|dS)NT)Z freefieldsr r rr&rrrr *szPayloadLogData.__init__c st||d||d|j|dd|d|j|d|j|d|j||j|j |d|j |j r|j \}}}|dd t |||dS) Nmsgpathname class_namemodulefilename func_nameversionZstrace)r loadr getMessager)r+r,funcName LINENO_FIELDr$DATA_SCHEMA_VERSIONexc_infojoin tracebackformat_exception)rr&exc_type exc_value exc_tracebackrrrr0.s     zPayloadLogData.load)N) rrrr r4r3 RAW_FIELDSrlogging LogRecordr r0r"rrrrr#$s r#cs:eZdZdZdZdZeefZejddfdd Z Z S)ServiceLogDatazBExtract and store service log data for parsing and easy retrieval.namethreadNr%cs>t|||jtdp"|j||jt|j dS)NZ SYSLOG_IDENT) r r0r NAME_FIELDosgetenv processName THREAD_FIELDstrrAr'rrrr0Ds zServiceLogData.load) rrrr rBrFr!r=r>r0r"rrrrr?>s r?cs@eZdZdZdZdZdZeeefZej ddfdd Z Z S) HostLogDataz?Extract and store host log data for parsing and easy retrieval.ZservicehostnameipNr%csHt|||jtd||jtt|}||j |dS)NHOSTNAME) r r0rHOSTNAME_FIELDrCrDIP_FIELDrr? SERVICE_FIELD)rr&ZserviceLogDatarrrr0Qs  zHostLogData.load) rrrr rNrLrMr!r=r>r0r"rrrrrHJs  rHcsReZdZdZdZdZdZdZefZeZ ddfdd Z e dd d Z e Z ZS) EnvelopeRolezVStore log role (e.g. ''PERFORMANCE' ) for passing to other classes and easy retrieval.roleZ DEVELOPMENTZ PERFORMANCEZBUSINESS_ANALYTICSNr cst||j|jdSr )r r r ROLE_FIELD DEFAULT_ROLErrrrr ds zEnvelopeRole.__init__cCst||jSr )rGgetrQrrrr__str__hszEnvelopeRole.__str__)rrrr rQZROLE_DEVELOPMENTZROLE_PERFORMANCEZROLE_BUSINESS_ANALYTICSr!rRr rGrT__repr__r"rrrrrOYsrOcsreZdZdZdZdZdZeeefZeZdZ dZ dZ ddfd d Z e dd d ZeZee dd ddZZS)EnvelopeVersionzVStore Envelope version (e.g. '1.0.0') for passing to other classes and easy retrieval.majorminorpatchrNr cs>t||j|j||j|j||j|jdSr ) r r r MAJOR_FIELD DEFAULT_MAJOR MINOR_FIELD DEFAULT_MINOR PATCH_FIELD DEFAULT_PATCHrrrrr {s zEnvelopeVersion.__init__cstdfddjDS)N.csg|]}t|qSrrGrS.0fieldrrr sz+EnvelopeVersion.__str__..)rGr6r!rrrrrTszEnvelopeVersion.__str__)r.r cCsJtt|d\}}}t}|tj||tj||tj||S)Nra)mapintsplitrVrr[r]r_)r.rWrXrYZenvelopeVersionrrr fromStrings zEnvelopeVersion.fromString)rrrr r[r]r_r!r<r\r^r`r rGrTrU staticmethodrjr"rrrrrVns rVcseZdZdZdZdZdZdZdZdZ eeeefZ e Z dZ d Z d Zd Zd d fd d Zed ddZeZeeddddZZS)EnvelopeDataSchemazbStore Envelope data schema version (e.g. '1.0.0') for passing to other classes and easy retrieval.r@rWrXrYra-defaultrZrNr csNt||j|j||j|j||j|j||j |j dSr ) r r rrB DEFAULT_NAMEr[r\r]r^r_r`rrrrr s  zEnvelopeDataSchema.__init__csFtjfddjjjfD}tjj|gS)Ncsg|]}t|qSrrbrcrrrrfsz.EnvelopeDataSchema.__str__..) rGVERSION_SEPARATORr6r[r]r_NAME_VERSION_SEPARATORrSrB)rr.rrrrTs zEnvelopeDataSchema.__str__) data_schemar cCsptt|tj\}}tt|tj\}}}t}|tj||tj ||tj ||tj ||Sr ) rgrGrirlrqrhrprrBr[r]r_)rrr@r.rWrXrYZenvelopeDataSchemarrrrjszEnvelopeDataSchema.fromString)rrrr rBr[r]r_rprqr!r<ror\r^r`r rGrTrUrkrjr"rrrrrls$ rl)typingrr=logging.configrCr7Zprimordial.log.datarZprimordial.constantsr getLoggerrLOGrr#r?rHrOrVrlrrrrs     "log/__pycache__/extra.cpython-38.pyc000064400000006210147205126350013314 0ustar00U afw @sddlZddlZddlmZmZmZddlmZe e Z Gdddej dZ Gddde ZGd d d e ZGd d d e ZdS) N)AnyDictOptional) EnvelopeRolec@speZdZdZdZeddddZeeddddZeddd d Z ee dd d d Z de ee dddZ dS)LogExtrazmAbstract Base Class that provides a set of methods for extraction, setting and retrieval of 'extra' log data.extraNkwargsreturncKsi|_|jf|dSN)rsetselfr rE/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/log/extra.py__init__szLogExtra.__init__cKsftj}tj|kr|tj}|tjkr.tf|S|tjkrBtf|S|tjkrVtf|St d|dS)z1Factory method to create a new LogExtra instance.z Invalid role N) rZ DEFAULT_ROLE ROLE_FIELDROLE_DEVELOPMENT DevLogExtraROLE_BUSINESS_ANALYTICS BALogExtraROLE_PERFORMANCE PerfLogExtra ValueError)r rolerrr getLogExtras        zLogExtra.getLogExtracKs,|D]\}}|tjk r|||qdS)z"Set fields on a LogExtra instance.N)itemsrrsetField)rr kvrrrr %s z LogExtra.set)fieldvaluer cCs||j|<dS)zSet an individual field's value. :param field: The field to be set :param value: The value of the field Nr)rr r!rrrr+szLogExtra.setField)r r cCs*|dkrd|jiS||jkr&|j|SdS)Nrr")rr rrrget3s    z LogExtra.get)N)__name__ __module__ __qualname____doc__Z EXTRA_FIELDstrr staticmethodrr rrrr#rrrrr sr) metaclasscs(eZdZdZeddfdd ZZS)rzLog extra data for development.Nrc s tjf|tj|jtj<dSr )superrrrrrr  __class__rrr?szDevLogExtra.__init__r$r%r&r'r(r __classcell__rrr,rr=srcs(eZdZdZeddfdd ZZS)rz&Log extra data for business analytics.Nrc s tjf|tj|jtj<dSr )r+rrrrrr r,rrrFszBALogExtra.__init__r.rrr,rrDsrcs(eZdZdZeddfdd ZZS)rz$Log extra data for performance data.Nrc s tjf|tj|jtj<dSr )r+rrrrrr r,rrrMszPerfLogExtra.__init__r.rrr,rrKsr)abclogging.configloggingtypingrrrZprimordial.log.envelopedatar getLoggerr$LOGABCMetarrrrrrrrs  1requests/__init__.py000064400000000000147205126350010523 0ustar00requests/adapters.py000064400000000727147205126350010607 0ustar00# -*- coding: utf-8 -*- from requests.adapters import HTTPAdapter class CustomHostnameHTTPSAdapter(HTTPAdapter): """Allow for setting a custom hostname on an adapter.""" def __init__(self, custom_hostname: str) -> None: self.custom_hostname = custom_hostname super().__init__() def cert_verify(self, conn, url, verify, cert): conn.assert_hostname = self.custom_hostname return super().cert_verify(conn, url, verify, cert) requests/__pycache__/__init__.cpython-38.pyc000064400000000250147205126350015020 0ustar00U af@sdS)NrrrM/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/requests/__init__.pyrequests/__pycache__/adapters.cpython-38.pyc000064400000001646147205126350015076 0ustar00U af@s ddlmZGdddeZdS)) HTTPAdaptercs4eZdZdZeddfdd ZfddZZS)CustomHostnameHTTPSAdapterz2Allow for setting a custom hostname on an adapter.N)custom_hostnamereturncs||_tdSN)rsuper__init__)selfr __class__M/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/requests/adapters.pyr sz#CustomHostnameHTTPSAdapter.__init__cs|j|_t||||Sr)rassert_hostnamer cert_verify)r connurlverifycertr r r r sz&CustomHostnameHTTPSAdapter.cert_verify)__name__ __module__ __qualname____doc__strrr __classcell__r r r r rsrN)Zrequests.adaptersrrr r r r s service/__init__.py000064400000000000147205126350010310 0ustar00service/base_daemon.py000064400000003661147205126350011026 0ustar00# -*- coding: utf-8 -*- import logging from subprocess import PIPE, call, CalledProcessError from typing import Any, Dict # pylint: disable=W0611 LOGGER = logging.getLogger(__name__) class BaseDaemon: """A base implementation of a daemon""" def __init__(self, name: str, command_fmt: str) -> None: self.name = name self.command_fmt = command_fmt def _run(self, action: str, check_return_code: bool = True, pipes: bool = True) -> Any: """Run an action for the daemon. :param action: The action to be run :param check_return_code: Whether to error on non-zero return codes :param pipes: :raises CalledProcessError: If `check_return_code` is True, and a non-zero return code occurrs """ LOGGER.info('%sing %s', action, self.name) kwargs = {'universal_newlines': True, 'timeout': 10} # type: Dict[str, Any] if pipes: kwargs.update({'stdout': PIPE, 'stderr': PIPE}) cmd = self.command_fmt.format(name=self.name, action=action) result = call(cmd.split(), **kwargs) LOGGER.info(result) if check_return_code: if result != 0: raise CalledProcessError(returncode=result, cmd=cmd) return result def status(self) -> Any: """Check the status of the daemon.""" return self._run('status', check_return_code=False) def start(self) -> Any: """Start the daemon.""" # Must not connect pipes to stdin/out, or subprocess will never # complete, hanging on communicate() call. return self._run('start', pipes=False) def stop(self) -> Any: """Stop the daemon.""" return self._run('stop') def restart(self) -> Any: """Restart the daemon.""" # Must not connect pipes to stdin/out, or subprocess will never # complete, hanging on communicate() call. return self._run('restart', pipes=False) service/registration.py000064400000032147147205126350011304 0ustar00# -*- coding: utf-8 -*- from argparse import ArgumentParser from datetime import datetime from collections import defaultdict from functools import partial import json import logging import os import os.path import platform import signal import socket import subprocess from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union # pylint: disable=W0611 from uuid import uuid1 from kazoo.client import KazooClient, KazooState from primordial.constants import CANONICAL_TIMESTRING_FORMAT from primordial.flow import retry, RetryError from primordial.settings import (ZK_HOSTS, GD_ZKRUN_VIRTUALENV_PATH, GD_ZKRUN_COMMAND, GD_ZKRUN_NAME, GD_ZKRUN_HOSTS, GD_ZKRUN_SERVICE_TYPE, GD_ZKRUN_PORT, GD_ZKRUN_SSL_PORT, GD_ZKRUN_LOCATIONS) SIGNALS = defaultdict(int) # type: Dict[int, int] # map pid to running ZKSTART_STATUS = {} LOGGER = logging.getLogger(__name__) def get_zkhosts(): """Gets the zkhosts from the default settings""" return ZK_HOSTS LocationsType = Optional[Union[str, Iterable[str]]] class ZkRegistrar: """Implementation of a basic Zookeeper ephemeral node registrar.""" locations = None # type: LocationsType def __init__(self, listener: Optional[Callable] = None, name: Optional[str] = None, zkhosts: Optional[str] = None, service_type: str = "registrations", port: Optional[int] = None, ssl_port: Optional[int] = None, locations: LocationsType = None) -> None: """Create a zookeeper client and hang on to the listener. :param listener: The zookeeper listener action; is activated by the kazoo client when updates occur to watched zookeeper parameters. :param name: The path base for this service. :param zkhosts: The zkhost or hosts (list) to use with the zk client; if none set, a local function is used that uses hfs best practices to find the zk host setting for the machine this is running on. :param service_type: verticals for REST interfaces and registrations for Kafka listeners, pollers, etc. :param port: port the service is listening on if any :param ssl_port: ssl port if any :param locations: URLs that correspond to this server if any """ if zkhosts is None: zkhosts = get_zkhosts() if isinstance(zkhosts, (list, tuple)): zkhosts = ",".join(zkhosts) self.listener = listener or self.default_zk_state_handler self.name = name self.zk = KazooClient(hosts=zkhosts) self.zk_uuid = str(uuid1()) self.service_type = service_type self.port = port self.ssl_port = ssl_port self.locations = locations self.connection_was_lost = False self.need_to_register = True def default_zk_state_handler(self, state: str) -> None: """A default state handler function""" LOGGER.debug('Zookeeper state change: %s (%s)', state, self.zk.client_state) # The state transition we need to handle is LOST->CONNECTED, in which # case we need to re-register the service in ZooKeeper. if state == KazooState.LOST: self.connection_was_lost = True self.need_to_register = False elif state == KazooState.CONNECTED and self.connection_was_lost: self.connection_was_lost = False self.need_to_register = True def register(self) -> None: """Register the listener""" self.zk.add_listener(self.listener) self.zk.start() if self.locations is None: locations = [] # type: Iterable[str] elif not isinstance(self.locations, (list, tuple)): locations = [self.locations] # type: ignore else: locations = self.locations zk_register_data = { 'address': platform.node(), 'id': self.zk_uuid, 'name': self.name, 'locations': locations, 'registrationTimeUTC': datetime.utcnow().strftime(CANONICAL_TIMESTRING_FORMAT), # pylint: disable=E1120 'serviceType': 'DYNAMIC' } # type: Dict[str, Any] if self.port is not None: zk_register_data['port'] = self.port if self.ssl_port is not None: zk_register_data['sslPort'] = self.ssl_port self.zk.ensure_path('/service/%s/%s' % (self.service_type, self.name)) self.zk.create('/service/%s/%s/%s' % (self.service_type, self.name, self.zk_uuid), json.dumps(zk_register_data).encode('utf-8'), ephemeral=True) self.need_to_register = False def unregister(self) -> None: """Unregister the listener and stop""" self.zk.delete('/service/%s/%s/%s' % (self.service_type, self.name, self.zk_uuid)) self.zk.remove_listener(self.listener) self.zk.stop() def shell_run(command: str) -> subprocess.Popen: """A hyper simplistic shell command runner. Linux only. Uses bash as /bin/bash. NB: this does NOT change the current working directory of the shell or the parent process; any command you run will have to be on the PATH or part of the entered virtualenv, etc. :param command: This is a command that will be run in a subshell, verbatim. The caller is fully responsible for all related security concerns. This function will not perform any processing on the string, e.g.no string substitutions, no interpolations, etc. It will simply create a shell, execute the command on a single input to the shell, and then manage the process. Shell metacharacters and expansions ** will ** be processed as per standard shell semantics. E.g. .. code-block:: python uwsgi_exe = ("source %(virtualenv)s/bin/activate && " "cd %(virtualenv)s/.. && " "uwsgi --master --lazy-apps --processes 5 " "--die-on-term --module wsgi:application --http :80" % {'virtualenv': vpath}) :returns: The structure returned by subprocess.Popen: https://docs.python.org/3/library/subprocess.html#subprocess.Popen """ return subprocess.Popen(["/bin/bash", "-c", command]) # pylint: disable=consider-using-with def enter_virtualenv_and_run(virtualenv_path: str, command: str) -> subprocess.Popen: """Enter a virtual environment and run a command. Simplifies a common python use case of entering a virtualenv and running a local shell command. NB: this does NOT change the current working directory of the shell or the parent process; any command you run will have to be on the PATH or part of the entered virtualenv. :param virtualenv_path: A string representing the absolute path to a virtualenv. :param command: As per shell_run. :returns: The structure returned by subprocess.Popen: https://docs.python.org/3/library/subprocess.html#subprocess.Popen :raises ValueError: if the virtualenv path appears to be in error. """ vpath = os.path.abspath(virtualenv_path) if not os.path.exists(os.path.join(vpath, 'bin', 'activate')): raise ValueError('Specified path (%s) does not look like a python virtualenv!' % virtualenv_path) venv_command = ("source %(virtualenv_path)s/bin/activate && %(command)s" % {'virtualenv_path': virtualenv_path, 'command': command}) return shell_run(venv_command) def bind_signal_handlers(signals: List[int], signal_handler_func: Callable) -> None: """Bind one or more signals to a signal handler. :param signals: A list of signals to be bound :param signal_handler_func: The handler to bind to the signals :raises ValueError: on invalid signal number """ for signal_ in signals: signal.signal(signal_, signal_handler_func) def default_signal_handler(signum: int, _) -> None: """By default, count calls to a signal. :param signum: The signal which was called """ SIGNALS[signum] += 1 def set_default_signals() -> None: """Bind default signal handlers.""" bind_signal_handlers([signal.SIGINT, signal.SIGTERM], default_signal_handler) def wait_for_process(pid: int, sleep_secs: float, max_attempts: int) -> Tuple[int, int]: """Wait for a process to finish running. :param pid: The process ID on which to wait :param sleep_secs: The number of seconds to sleep between checks :param max_attempts: The number of times to attempt checking on the pid """ def wait_for_pid(mypid): w_process_id, w_exit_status = os.waitpid(mypid, os.WNOHANG) if w_process_id == 0: raise Exception("Not done yet") return w_process_id, w_exit_status try: process_id, exit_status = retry(partial(wait_for_pid, pid), sleep_secs=sleep_secs, max_attempts=max_attempts) except RetryError as _: process_id, exit_status = None, None return process_id, exit_status def check_tcp_port(port: int) -> bool: """Check to see if a local port is accepting TCP connections. :param port: The port number to be checked. """ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) s.settimeout(1) ret = False try: s.connect(('', port)) s.shutdown(socket.SHUT_RDWR) ret = True except socket.timeout: pass finally: s.close() return ret def zk_start(args: Optional[List[str]] = None) -> ZkRegistrar: """Start a process wrapped in a watcher and register with zookeeper. Steps: 1. Get the ephemeral registration data from command-line arguments or environment variables 2. Get the command to run from command-line arguments or environment variables 3. Run the command. 4. If successful, register with zookeeper. 5. Wait for the command to complete. 6. De-register with zookeeper. Command-line arguments: * --virtualenv-path * --command * --name * --zkhosts * --service_type (verticals, registrations) * --port * --ssl-port * --locations Environment variables (from settings.py): * GD_ZKRUN_VIRTUALENV_PATH * GD_ZKRUN_COMMAND * GD_ZKRUN_NAME * ZK_HOSTS * GD_ZKRUN_HOSTS * GD_ZKRUN_SERVICE_TYPE * GD_ZKRUN_PORT * GD_ZKRUN_SSL_PORT * GD_ZKRUN_LOCATIONS Methodology: * Command-line arguments override environment variables. """ parser = ArgumentParser(description="Start a process with a zookeeper registration") parser.add_argument('--virtualenv-path', help="Path to a python venv") parser.add_argument('--command', required=True, help="Shell command to run, properly escaped") parser.add_argument('--name', required=True, help="Service name (zk base node)") parser.add_argument('--zkhosts', action='append', help="Zookeeper hosts") parser.add_argument('--service_type', required=True, help="Service type, registrations or verticals") parser.add_argument('--port', type=int, help="Port of service if any") parser.add_argument('--ssl-port', type=int, help="Ssl port of service if any") parser.add_argument('--locations', action='append', help="Urls for MCP use if any") res = parser.parse_args(args=args) virtualenv_path = res.virtualenv_path or GD_ZKRUN_VIRTUALENV_PATH or None command = res.command or GD_ZKRUN_COMMAND or None name = res.name or GD_ZKRUN_NAME or None zkhosts = res.zkhosts or GD_ZKRUN_HOSTS or ZK_HOSTS or None service_type = res.service_type or GD_ZKRUN_SERVICE_TYPE or None port = res.port or GD_ZKRUN_PORT or None ssl_port = res.ssl_port or GD_ZKRUN_SSL_PORT or None locations = res.locations or GD_ZKRUN_LOCATIONS or None if virtualenv_path is not None: LOGGER.info("Running venv run %s { %s } { %s }", enter_virtualenv_and_run, virtualenv_path, command) pid = enter_virtualenv_and_run(virtualenv_path, command).pid # type: ignore else: LOGGER.info("Running shell run %s { %s }", shell_run, command) pid = shell_run(command).pid # type: ignore LOGGER.info("Started process %s", pid) ZKSTART_STATUS[pid] = True def zkstart_signal_handler(_signum, _frame, runpid=None): LOGGER.info("Signal handler called with %s, %s", _signum, _frame) ZKSTART_STATUS[runpid] = False bind_signal_handlers([signal.SIGINT, signal.SIGTERM], partial(zkstart_signal_handler, runpid=pid)) registrar = ZkRegistrar(name=name, zkhosts=zkhosts, service_type=service_type, port=port, # type: ignore ssl_port=ssl_port, locations=locations) # type: ignore registrar.register() while ZKSTART_STATUS[pid] is True: if registrar.need_to_register is True: registrar.register() process_id, exit_status = wait_for_process(pid, 5, 3) if process_id is not None: LOGGER.info('Process %s exited with status %s', pid, exit_status) break LOGGER.info("Clean shutdown of %s", pid) registrar.unregister() return registrar # this is harmless, and handy for testing service/__pycache__/__init__.cpython-38.pyc000064400000000247147205126350014613 0ustar00U af@sdS)NrrrL/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/service/__init__.pyservice/__pycache__/base_daemon.cpython-38.pyc000064400000004246147205126350015314 0ustar00U af@sHddlZddlmZmZmZddlmZmZee Z GdddZ dS)N)PIPEcallCalledProcessError)AnyDictc@speZdZdZeeddddZdeeeeddd Zed d d Z ed d dZ ed ddZ ed ddZ dS) BaseDaemonz!A base implementation of a daemonN)name command_fmtreturncCs||_||_dS)N)rr )selfrr r O/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/service/base_daemon.py__init__ szBaseDaemon.__init__T)actioncheck_return_codepipesr cCsvtd||jddd}|r.|ttd|jj|j|d}t|f|}t||rr|dkrrt ||d|S) aRun an action for the daemon. :param action: The action to be run :param check_return_code: Whether to error on non-zero return codes :param pipes: :raises CalledProcessError: If `check_return_code` is True, and a non-zero return code occurrs z%sing %sT )universal_newlinestimeout)stdoutstderr)rrr) returncodecmd) LOGGERinforupdaterr formatrsplitr)r rrrkwargsrresultr r r _runs   zBaseDaemon._run)r cCs|jdddS)zCheck the status of the daemon.statusF)rr r r r r r!%szBaseDaemon.statuscCs|jdddS)zStart the daemon.startFrr"r#r r r r$)szBaseDaemon.startcCs |dS)zStop the daemon.stopr"r#r r r r&/szBaseDaemon.stopcCs|jdddS)zRestart the daemon.restartFr%r"r#r r r r'3szBaseDaemon.restart)TT) __name__ __module__ __qualname____doc__strrboolrr r!r$r&r'r r r r r sr) logging subprocessrrrtypingrr getLoggerr(rrr r r r s service/__pycache__/registration.cpython-38.pyc000064400000030036147205126350015565 0ustar00U afg4@sddlmZddlmZddlmZddlmZddlZddlZddl Z ddl Z ddl Z ddl Z ddl Z ddlZddlmZmZmZmZmZmZmZmZddlmZddlmZmZdd lmZdd lm Z m!Z!dd l"m#Z#m$Z$m%Z%m&Z&m'Z'm(Z(m)Z)m*Z*m+Z+ee,Z-iZ.e/e0Z1d d Z2eee3ee3fZ4GdddZ5e3ej6dddZ7e3e3ej6dddZ8ee,eddddZ9e,ddddZ:ddddZ;e,ed"d#d$Z?d(eee3e5d%d&d'Z@dS)))ArgumentParser)datetime) defaultdict)partialN)AnyCallableDictIterableListOptionalTupleUnion)uuid1) KazooClient KazooState)CANONICAL_TIMESTRING_FORMAT)retry RetryError) ZK_HOSTSGD_ZKRUN_VIRTUALENV_PATHGD_ZKRUN_COMMAND GD_ZKRUN_NAMEGD_ZKRUN_HOSTSGD_ZKRUN_SERVICE_TYPE GD_ZKRUN_PORTGD_ZKRUN_SSL_PORTGD_ZKRUN_LOCATIONScCstS)z*Gets the zkhosts from the default settings)rrrP/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/service/registration.py get_zkhosts src @sreZdZdZdZdeeeeeeeeeeee ddddZ edddd Z dd d d Z dd d dZ dS) ZkRegistrarz=Implementation of a basic Zookeeper ephemeral node registrar.N registrations)listenernamezkhosts service_typeportssl_port locationsreturncCsx|dkrt}t|ttfr&d|}|p.|j|_||_t|d|_ t t |_ ||_ ||_||_||_d|_d|_dS)a!Create a zookeeper client and hang on to the listener. :param listener: The zookeeper listener action; is activated by the kazoo client when updates occur to watched zookeeper parameters. :param name: The path base for this service. :param zkhosts: The zkhost or hosts (list) to use with the zk client; if none set, a local function is used that uses hfs best practices to find the zk host setting for the machine this is running on. :param service_type: verticals for REST interfaces and registrations for Kafka listeners, pollers, etc. :param port: port the service is listening on if any :param ssl_port: ssl port if any :param locations: URLs that correspond to this server if any N,)hostsFT)r isinstancelisttuplejoindefault_zk_state_handlerr"r#rzkstrrzk_uuidr%r&r'r(connection_was_lostneed_to_register)selfr"r#r$r%r&r'r(rrr__init__,s    zZkRegistrar.__init__)stater)cCsJtd||jj|tjkr*d|_d|_n|tjkrF|jrFd|_d|_dS)z A default state handler functionzZookeeper state change: %s (%s)TFN) LOGGERdebugr1Z client_staterZLOSTr4r5Z CONNECTED)r6r8rrrr0Os z$ZkRegistrar.default_zk_state_handlerr)cCs|j|j|j|jdkr(g}n t|jttfsB|jg}n|j}t |j |j |t tdd}|jdk r|j|d<|jdk r|j|d<|jd|j|j f|jjd|j|j |j ft|dd d d |_dS) zRegister the listenerNZDYNAMIC)addressidr#r(ZregistrationTimeUTCZ serviceTyper&ZsslPortz/service/%s/%s/service/%s/%s/%szutf-8T)Z ephemeralF)r1Z add_listenerr"startr(r,r-r.platformnoder3r#rutcnowstrftimerr&r'Z ensure_pathr%createjsondumpsencoder5)r6r(Zzk_register_datarrrregister[s0        zZkRegistrar.registercCs8|jd|j|j|jf|j|j|jdS)z Unregister the listener and stopr>N)r1deleter%r#r3Zremove_listenerr"stop)r6rrr unregistervszZkRegistrar.unregister)NNNr!NNN)__name__ __module__ __qualname____doc__r(r rr2int LocationsTyper7r0rHrKrrrrr (s, # r )commandr)cCstdd|gS)aA hyper simplistic shell command runner. Linux only. Uses bash as /bin/bash. NB: this does NOT change the current working directory of the shell or the parent process; any command you run will have to be on the PATH or part of the entered virtualenv, etc. :param command: This is a command that will be run in a subshell, verbatim. The caller is fully responsible for all related security concerns. This function will not perform any processing on the string, e.g.no string substitutions, no interpolations, etc. It will simply create a shell, execute the command on a single input to the shell, and then manage the process. Shell metacharacters and expansions ** will ** be processed as per standard shell semantics. E.g. .. code-block:: python uwsgi_exe = ("source %(virtualenv)s/bin/activate && " "cd %(virtualenv)s/.. && " "uwsgi --master --lazy-apps --processes 5 " "--die-on-term --module wsgi:application --http :80" % {'virtualenv': vpath}) :returns: The structure returned by subprocess.Popen: https://docs.python.org/3/library/subprocess.html#subprocess.Popen z /bin/bashz-c) subprocessPopen)rRrrr shell_run}srU)virtualenv_pathrRr)cCsFtj|}tjtj|dds0td|d||d}t|S)aEnter a virtual environment and run a command. Simplifies a common python use case of entering a virtualenv and running a local shell command. NB: this does NOT change the current working directory of the shell or the parent process; any command you run will have to be on the PATH or part of the entered virtualenv. :param virtualenv_path: A string representing the absolute path to a virtualenv. :param command: As per shell_run. :returns: The structure returned by subprocess.Popen: https://docs.python.org/3/library/subprocess.html#subprocess.Popen :raises ValueError: if the virtualenv path appears to be in error. binactivatez;Specified path (%s) does not look like a python virtualenv!z6source %(virtualenv_path)s/bin/activate && %(command)s)rVrR)ospathabspathexistsr/ ValueErrorrU)rVrRZvpathZ venv_commandrrrenter_virtualenv_and_runs  r^)signalssignal_handler_funcr)cCs|D]}t||qdS)zBind one or more signals to a signal handler. :param signals: A list of signals to be bound :param signal_handler_func: The handler to bind to the signals :raises ValueError: on invalid signal number N)signal)r_r`Zsignal_rrrbind_signal_handlerssrb)signumr)cCst|d7<dS)zYBy default, count calls to a signal. :param signum: The signal which was called N)SIGNALS)rc_rrrdefault_signal_handlersrgr;cCsttjtjgtdS)zBind default signal handlers.N)rbraSIGINTSIGTERMrgrrrrset_default_signalssrj)pid sleep_secs max_attemptsr)c CsXdd}ztt||||d\}}Wn*tk rN}z d\}}W5d}~XYnX||fS)zWait for a process to finish running. :param pid: The process ID on which to wait :param sleep_secs: The number of seconds to sleep between checks :param max_attempts: The number of times to attempt checking on the pid cSs*t|tj\}}|dkr"td||fS)Nrz Not done yet)rYwaitpidWNOHANG Exception)ZmypidZ w_process_idZ w_exit_statusrrr wait_for_pidsz&wait_for_process..wait_for_pid)rlrm)NNN)rrr)rkrlrmrq process_id exit_statusrfrrrwait_for_processs rt)r&r)cCspttjtjtj}|dd}z>z"|d|f|tjd}Wntj k r\YnXW5|X|S)zpCheck to see if a local port is accepting TCP connections. :param port: The port number to be checked. rdFT) socketAF_INET SOCK_STREAM IPPROTO_TCP settimeoutcloseconnectshutdown SHUT_RDWRtimeout)r&sretrrrcheck_tcp_ports    r)argsr)cCs tdd}|jddd|jdddd |jd dd d |jd d dd|jdddd |jdtdd|jdtdd|jdd dd|j|d}|jptpd}|jptpd}|jpt pd}|j pt pt pd}|j ptpd}|jptpd}|jptpd} |jp tp d} |dk r4tdt||t||j} ntdt|t|j} td| dt| <d&dd} ttjtjgt| | d t ||||| | d!} | !t| dkr| j"dkr| !t#| d"d#\}}|dk rtd$| |qqtd%| | $| S)'aStart a process wrapped in a watcher and register with zookeeper. Steps: 1. Get the ephemeral registration data from command-line arguments or environment variables 2. Get the command to run from command-line arguments or environment variables 3. Run the command. 4. If successful, register with zookeeper. 5. Wait for the command to complete. 6. De-register with zookeeper. Command-line arguments: * --virtualenv-path * --command * --name * --zkhosts * --service_type (verticals, registrations) * --port * --ssl-port * --locations Environment variables (from settings.py): * GD_ZKRUN_VIRTUALENV_PATH * GD_ZKRUN_COMMAND * GD_ZKRUN_NAME * ZK_HOSTS * GD_ZKRUN_HOSTS * GD_ZKRUN_SERVICE_TYPE * GD_ZKRUN_PORT * GD_ZKRUN_SSL_PORT * GD_ZKRUN_LOCATIONS Methodology: * Command-line arguments override environment variables. z-Start a process with a zookeeper registration) descriptionz--virtualenv-pathzPath to a python venv)helpz --commandTz&Shell command to run, properly escaped)requiredrz--namezService name (zk base node)z --zkhostsappendzZookeeper hosts)actionrz--service_typez(Service type, registrations or verticalsz--portzPort of service if any)typerz --ssl-portzSsl port of service if anyz --locationszUrls for MCP use if any)rNz!Running venv run %s { %s } { %s }zRunning shell run %s { %s }zStarted process %scSstd||dt|<dS)Nz!Signal handler called with %s, %sF)r9infoZKSTART_STATUS)Z_signumZ_framerunpidrrrzkstart_signal_handler8sz(zk_start..zkstart_signal_handler)r)r#r$r%r&r'r(z Process %s exited with status %szClean shutdown of %s)N)%r add_argumentrP parse_argsrVrrRrr#rr$rrr%rr&rr'rr(rr9rr^rkrUrrbrarhrirr rHr5rtrK)rparserresrVrRr#r$r%r&r'r(rkrZ registrarrrrsrrrzk_startsT(          r)N)Aargparserr collectionsr functoolsrrEloggingrYos.pathr@rarvrStypingrrrr r r r r uuidrZ kazoo.clientrrZprimordial.constantsrZprimordial.flowrrZprimordial.settingsrrrrrrrrrrPrer getLoggerrLr9rr2rQr rTrUr^rbrgrjfloatrtboolrrrrrrs>    (  , U wsgi/__init__.py000064400000000000147205126350007621 0ustar00wsgi/cherrytools.py000064400000006001147205126350010446 0ustar00# -*- coding: utf-8 -*- from typing import Callable, Optional import ssl import cherrypy from primordial.config import Config def start_cherry(config: Config, api: Callable, path: str) -> None: """Start CherryPy running an API. :param config: The configuration to use for CherryPy :param api: The API callable to be started :param path: The path under which the API should respond to requests """ cherrypy.tree.graft(api, path) run_cherrypy(config) def start_cherry_https(config: Config, api: Callable, path: str) -> None: """Start CherryPy running an API on HTTPS only. This is a backport of how ServiceGroup & DNS verticals start. :param config: The configuration to use for CherryPy :param api: The API callable to be started :param path: The path under which the API should respond to requests """ cherrypy.tree.graft(api, path) cert_file = config.get("server_cert") pkey_file = config.get("server_key") ca_certs_file = config.get("server_ca") run_cherrypy( config, cert_path=cert_file, key_path=pkey_file, ca_certs_path=ca_certs_file) def run_cherrypy( config: Config, cert_path: Optional[str] = None, key_path: Optional[str] = None, ca_certs_path: Optional[str] = None) -> None: """Run CherryPy. Called by `start_cherry`. :param config: The configuration to use for running CherryPy """ restart_trigger = config.get('restart_trigger') cherrypy.engine.autoreload.files.add(restart_trigger) # Only reload when the restart trigger changes (not other files) # TODO non-local only cherrypy.engine.autoreload.match = restart_trigger server_host = config.get('server.host') https_port = config.get_int('server.https.port') if https_port and cert_path is not None: cherrypy.config.update({ 'server.socket_host': server_host, 'server.socket_port': https_port, 'server.ssl_module': 'builtin', 'server.ssl_certificate': cert_path, 'server.ssl_private_key': key_path, 'server.ssl_context': ssl_context(cert_path, key_path, ca_certs_path)}) else: server_port = config.get_int('server.port') cherrypy.config.update({'server.socket_host': server_host, 'server.socket_port': server_port}) cherrypy.engine.start() cherrypy.engine.block() def ssl_context(certificate, private_key, ca_certs): """Create a server SSL context requiring client certificate authentication. :param certificate: Path to server certificate. :param private_key: Path to server private key. :param ca_certs: Path to certificate authority chain against which client certificates are validated. """ context = ssl.create_default_context( purpose=ssl.Purpose.CLIENT_AUTH, cafile=ca_certs ) context.load_cert_chain(certificate, private_key) context.verify_mode = ssl.CERT_REQUIRED return context wsgi/falcontools.py000064400000001156147205126350010422 0ustar00# -*- coding: utf-8 -*- import json from json.decoder import JSONDecodeError import logging import falcon LOG = logging.getLogger(__name__) def json_body(req: falcon.Request, *_) -> None: """Decode JSON request body and attach to request as `body`. :param req: The request to be processed :raises falcon.HTTPBadRequest: In the case of invalid JSON """ body = req.bounded_stream.read().decode('utf-8') try: req.body = json.loads(body) if body else {} except JSONDecodeError as ex: raise falcon.HTTPBadRequest( code='BAD_JSON', description=str(ex)) wsgi/__pycache__/__init__.cpython-38.pyc000064400000000244147205126350014121 0ustar00U af@sdS)NrrrI/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/wsgi/__init__.pywsgi/__pycache__/cherrytools.cpython-38.pyc000064400000005443147205126350014745 0ustar00U af @sddlmZmZddlZddlZddlmZeeeddddZeeeddddZ deeeeeeedd d d Z d d Z dS))CallableOptionalN)Config)configapipathreturncCstj||t|dS)zStart CherryPy running an API. :param config: The configuration to use for CherryPy :param api: The API callable to be started :param path: The path under which the API should respond to requests N)cherrypytreegraft run_cherrypy)rrrr L/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/wsgi/cherrytools.py start_cherry srcCs@tj|||d}|d}|d}t||||ddS)a*Start CherryPy running an API on HTTPS only. This is a backport of how ServiceGroup & DNS verticals start. :param config: The configuration to use for CherryPy :param api: The API callable to be started :param path: The path under which the API should respond to requests Z server_certZ server_keyZ server_ca) cert_pathkey_path ca_certs_pathN)r r r getr )rrr cert_fileZ pkey_fileZ ca_certs_filer r rstart_cherry_httpss    r)rrrrrc Cs|d}tjjj||tjj_|d}|d}|rh|dk rhtj ||d||t |||dn|d}tj ||dtj tj dS) znRun CherryPy. Called by `start_cherry`. :param config: The configuration to use for running CherryPy restart_triggerz server.hostzserver.https.portNbuiltin)server.socket_hostserver.socket_portzserver.ssl_modulezserver.ssl_certificatezserver.ssl_private_keyzserver.ssl_contextz server.port)rr) rr ZengineZ autoreloadfilesaddmatchZget_intrupdate ssl_contextstartblock)rrrrrZ server_hostZ https_portZ server_portr r rr -s(        r cCs*tjtjj|d}|||tj|_|S)a4Create a server SSL context requiring client certificate authentication. :param certificate: Path to server certificate. :param private_key: Path to server private key. :param ca_certs: Path to certificate authority chain against which client certificates are validated. )purposecafile)sslcreate_default_contextPurpose CLIENT_AUTHload_cert_chain CERT_REQUIRED verify_mode)Z certificateZ private_keyca_certscontextr r rrPs r)NNN) typingrrr#r Zprimordial.configrstrrrr rr r r rs    #wsgi/__pycache__/falcontools.cpython-38.pyc000064400000001530147205126350014704 0ustar00U afn@sDddlZddlmZddlZddlZeeZejddddZ dS)N)JSONDecodeError)reqreturnc Gsb|jd}z|r t|ni|_Wn4tk r\}ztjdt |dW5d}~XYnXdS)zDecode JSON request body and attach to request as `body`. :param req: The request to be processed :raises falcon.HTTPBadRequest: In the case of invalid JSON zutf-8ZBAD_JSON)code descriptionN) Zbounded_streamreaddecodejsonloadsbodyrfalconZHTTPBadRequeststr)r_r exrL/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/wsgi/falcontools.py json_body sr) r Z json.decoderrloggingr getLogger__name__LOGRequestrrrrrs   __pycache__/__init__.cpython-38.pyc000064400000000237147205126350013152 0ustar00U af@sdS)NrrrD/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/__init__.py__pycache__/cacheutils.cpython-38.pyc000064400000003310147205126350013532 0ustar00U afz@sDddlmZmZddlmZmZdZddZefeedddZd S) ) lru_cachewraps)datetime timedeltacs$i_tfdd}|S)zMA general-use memoizer decorator for class, object, function; supports kwargscs.t|t|}|kr&|||<|S)zActual implementation)str)argskwargskeycacheobjF/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/cacheutils.pymemoizer szmemoize..memoizer)r r)r rrr rmemoize s r)secondsmaxsizecsfdd}|S)a1A decorator that wraps lru_cache and allows the setting of a lifetime in seconds, after which the cache will expire :param seconds: The number of seconds after which the cache will expire :param maxsize: The maximum number of entries in the cache before it will start dropping old entries csBtdtd_tj_tfdd}|S)N)r)rcs0tjkr&tj_||S)N)rutcnow expiration cache_clearlifetime)rr funcrr wrapped_func#sz.wrapper_cache..wrapped_func)rrrrrrr)rrrrrr wrapper_caches  z&timed_lru_cache..wrapper_cacher)rrrrrrtimed_lru_cachesrN) functoolsrrrrZDEFAULT_MAXSIZErintrrrrrs__pycache__/config.cpython-38.pyc000064400000010047147205126350012660 0ustar00U afm@sJddlmZddlmZmZmZmZmZGdddZGdddeZ dS)) ConfigParser)AnyCallableDictListOptionalc@sleZdZdZd eeedddZd eeeedddZ deee e ddd Z deee e dd d Z dS)ConfigzBase configuration object.NkeydefaultreturncCsttddS)z Get the config option as a string :param key: config option name :param default: default value if no value exists in the config :return: option value z.get()N)NotImplementedError__name__selfr r rB/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/config.pyget sz Config.getcCst|||S)z Get the config option as a boolean :param key: config option name :param default: default value if no value exists in the config :return: option value )boolrrrrrget_boolszConfig.get_boolcCst|||S)z Get the config option as an integer :param key: config option name :param default: default value if no value exists in the config :return: option value )intrrrrrget_intszConfig.get_intcCst|||S)z Get the config option as a float :param key: config option name :param default: default value if no value exists in the config :return: option value )floatrrrrr get_float(szConfig.get_float)N)N)N)N)r __module__ __qualname____doc__strrrrrrrrrrrrrrrs    rc@seZdZdZdZdZgZdeeeeee ee fddddZ ee dd d Z de ee e d d d Zdee e dddZdeeeedddZdeeeedddZdeeeedddZdS) FileConfigzVConfig class that loads configuration options from the locally-cached ZooKeeper configz/var/cache/hfs/config.inizconfig-defaultN)service_prefixfilenamedefaultsr cCsx|dkr|j}t|jdd|_|j|g|_|rb|d}|rb|jd|d| q>|jd||_ dS)N)default_section interpolation.) DEFAULT_FILENAMErr"configreadprefixessplitappendjoinpopr!)rrr r!Z prefix_partsrrr__init__9s    zFileConfig.__init__)r r cCs ||S)z Get a config key value as a direct attribute of this class. :param key: The config key name :return: The config key value, if it exists, or None )r)rr rrr __getattr__KszFileConfig.__getattr__)getfuncr r r cCsN|dkr|jr|j|}|jD]$}||}||j|dd}|r qJq |}|S)N)fallback)r!rr)r")rr0r r prefixZ prefixed_keyvalrrr_getTs  zFileConfig._getr cCs||jj||SN)r4r'rrrrrraszFileConfig.getcCs||jj||Sr5)r4r' getbooleanrrrrrdszFileConfig.get_boolcCs||jj||Sr5)r4r'getintrrrrrgszFileConfig.get_intcCs||jj||Sr5)r4r'getfloatrrrrrjszFileConfig.get_float)NNN)N)N)N)N)N)rrrrr&r"r)rrrrr.r/rr4rrrrrrrrrrrr3s&   rN) configparserrtypingrrrrrrrrrrrs ,__pycache__/constants.cpython-38.pyc000064400000002514147205126350013427 0ustar00U af@sbdZz ddlZWnek r$YnXddlZddlZddlZdZdZdZe dddZ e Z dS) z2This module holds constants needed for primordial.Nz%Y-%m-%dT%H:%M:%S.%fZz%Y-%m-%dT%H:%M:%S.%fz%Y-%m-%dT%H:%M:%SZ)returnc CszRddttdD}|rP|D]}tt|jr&|WSq&|dWSWntjk rhYnXzJttjtj }t d dd d}t t|d|d d WStttjfk rYd SXd S) z#Get the IP address of this machine.cSsg|]}|ds|qS)z127.) startswith).0iprE/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/constants.py s zgetIP..rZ256sasciiZeth0iz 127.0.0.1N)socketgethostbyname_ex gethostname ipaddress ip_addressstr is_privategaierrorAF_INET SOCK_DGRAMstructpackencode inet_ntoafcntlioctlfilenoIOError RuntimeError)ipsrsZipstrrrrgetIPs" r") __doc__r ImportErrorrrr ZCANONICAL_TIMESTRING_FORMATZ#CANONICAL_TIMESTRING_FORMAT_NO_ZULUZ+CANONICAL_TIMESTRING_FORMAT_NO_MICROSECONDSrr"ZIPrrrrs __pycache__/context.cpython-38.pyc000064400000013221147205126350013074 0ustar00U af@stddlZddlmZddlmZmZmZGdddZeZed eee efeee efeedddZ dS) N)contextmanager)AnyOptionalMappingcsleZdZdZddZeedddZeedfdd Zed d d Z eedd dZ ed ddZ Z S)Contexta Context instances can be used to hold state and values during the execution life cycle of a python application without needing to change the existing API of various interacting classes. State/values can be updated in one part of the application and be visible in another part of the application without requiring to pass this state thru the various methods of the call stack. Context also supports maintenance of global and thread specific state which can be very useful in a multi-threaded applications like Flask/Falcon based web applications or the NES Journal reader. State stored at the global level is visible in every concurrently executed thread, while thread specific state is isolated to the corresponding thread of execution. This can useful again in a multi-threaded application like a web-app where each incoming request is processed by a separate thread and things like request headers, authentication user context is thread specific and isolated to the thread handling a particular request. Example usage: -- In the main thread of an application's start up code we might want to inject some global state like so. # In some start-up file called app.py from primordial.context import Context MY_APP_CTX = Context() # Instantiate some shared object jwt_fetcher = JWTFetcher(some_config) MY_APP_CTX.set_global('jwt_fetcher', jwt_fetcher) # In a thread that's handling a particular HTTP request # handle_req.py from app import MY_APP_CTX MY_APP_CTX.user = User() MY_APP_CTX.token = copy_token_from_header() # In a third file somewhere down the line of request processing # some_file_called_by_controller.py from app import MY_APP_CTX def some_func(): # All of these are magically available. # some_func's function signature didn't require to be changed MY_APP_CTX.jwt_fetcher.get() MY_APP_CTX.user.name == 'jDoe' MY_APP_CTX.token.is_valid() cCsi|_t|_dSN)_global threadinglocal_local)selfr C/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/context.py__init__2szContext.__init__)namereturncCsBzt|j|WStk r<||jkr6|j|YSYnXdSr)getattrr AttributeErrorrr rr r r __getattr__6s  zContext.__getattr__)rvaluecs(|dkrt||St|j||dS)N)rr )super __setattr__setattrr r rr __class__r rr>szContext.__setattr__)rcCs>zt|j|Wn(tk r8||jkr,|j|=YnXdSr)delattrr rrrr r r __delattr__Cs  zContext.__delattr__cCs||j|<dSr)rrr r r set_globalKszContext.set_globalcCs|j|dSr)rpoprr r r unset_globalNszContext.unset_global) __name__ __module__ __qualname____doc__rstrrrrrrr! __classcell__r r rrrs+r) local_vals global_valsctxc cs|r|nt}|r|ni}|r |ni}|D]\}}t|||q,|D]\}}|||qJz |VW5|D]}t||qp|D]}||qXdS)a^ Python context-manager for managing the life-cycle of state stored in a context. This context manager allows for state to be both stored in a context and also cleans up this state when the context-manager is exited. Usage: # In some python module file1.py from primordial import Context ctx = Context() # In some python module file2.py from threading import Thread from primordial import make_context from file1 import ctx from file3 import fn3 def fn2(): global_vals = {'v1': 'abc', v2: 'def'} # Set some global values with make_context(global_vals=global_value, ctx): # Kick of fn3 in a new thread t1 = Thread(target=fn3, args=[]) t1.start() t1.join() fn2() # In some python module file3.py from primordial import make_context from file1 import ctx from file4 import fn4 def fn3(): # v2 will shadow the value that was set globally local_vals = {'v3': 'ghi', v2: 'jkl'} # Set some thread specific values # Once this function returns, ctx.v3 and ctx.v2 are not available for access with make_context(local_vals=local_value, ctx): fn4() # We can still access the globally set state here even after the above context manager # has exited. ctx.v1 ctx.v2 # The globally set v2 # In some python module file3.py from file1 import ctx def fn4(): # All of the accesses are valid ctx.v1 ctx.v2 # This accesses the local thread specific v2 ctx.v3 N)CTXitemsrrrr!)r(r)r*kvr r r make_contextUs;     r/)NNN) r contextlibrtypingrrrrr+r&r/r r r rs L__pycache__/encryptor.cpython-38.pyc000064400000024575147205126350013453 0ustar00U af,@sddlZddlZddlZddlmZmZddlmZddlm Z m Z m Z m Z ddl mZddlmZddlmZmZddlmZmZdd lmZmZe ed d d Ze eefed ddZGdddZGdddZGdddZ dS)N) b64decode b64encode)Path)AnyStrOptionalUnioncast)x509)Fernet)hashes serialization)MGF1OAEP)aes_cbc_pkcs7_encryptaes_cbc_pkcs7_decrypt)datareturncCst|tr|d}tt|S)a Convert a Python `str` object to a `bytes` object. If the parameter is already a `bytes` object, return it unmodified. :param data: The object to be converted :return: The converted object, or the original object if it was not a `str` object utf-8) isinstancestrencoderbytes)rrE/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/encryptor.py _make_bytess  r)key_pathrcCsB|dkrtdt|tr"t|}|s:tdj|d|S)a. Convenience function to load the content of a key or cert file and return its contents. :param key_path: Path to the key/cert file to be loaded :return: The file contents as a bytes object :raises: ValueError if the key_path parameter is None of doesn't point to an existing file Nz!key_path parameter cannot be Nonez$key path '{key_path}' does not existr) ValueErrorrrris_fileformat read_bytesrrrrload_key_content!s  r!c@s<eZdZdZeeeedddZeeeedddZ dS) Encryptor)encrypted_data secret_keyrcCsPtt|}|d|j}||jd}t|}t|}t|||dS)a Decrypt encrypted data using the PKCS7 symmetric decryption algorithm :param encrypted_data: Base-64 encoded byte array containing encrypted data, which is a combination of the salt and the actual data :param secret_key: Secret value used to generate an encryption key :return: Decrypted, plain text value Nr)rr_Encryptor__iv_sizehashlibsha256digestrdecode)clsr$r%Z decoded_dataivrsecret_key_byteshashed_secret_keyrrrdecrypt9s zEncryptor.decrypt)unencrypted_datar%rc CsFt|j}t|}t|}t|}t|||\}}t||S)a# Encrypts data using the PKCS7 symmetric encryption algorithm :param unencrypted_data: Data to be encrypted :param secret_key: Secret value used to generate an encryption key :return: Base-64 encoded byte array containing encrypted value ) osurandomr&rr'r(r)rr) r+r0r%Ziv_bytesZplain_text_bytesr-r.r,r$rrrencryptKs zEncryptor.encryptN) __name__ __module__ __qualname__r& classmethodrrr/rr3rrrrr"6s r"c@s`eZdZdZeZeeededdZ e e e e edddZe e e e e dd d ZdS) SmallPayloadEncryptora Utility class that provides methods to encrypt and decrypt small-ish payloads via an asymmetric (public/private key) algorithm. The definition of "small" depends on the size of the encryption key and the type of padding algorithm used. For example, given a key size of 4096 bytes and the type of padding used by this set class, the maximum size of a payload that can be encrypted is 447 bytes. ) algorithmN)Zmgfr9labelr$decryption_key_contentrcCsJ|dkr dS|dkrtdtt|}tj|dd}|||jdS)a Decrypts data encrypted by the `encrypt()` method in this class. :param encrypted_data: The data to be decrypted :param decryption_key_content: The content of the OpenSSL private key file corresponding to the public cert used to encrypt the data :return: The decrypted data :raises: ValueError if decryption_key_content` is None N$decryption_key_content can't be None)passwordr)rrrr Zload_pem_private_keyr/_SmallPayloadEncryptor__paddingr*)r+r$r<Zdecryption_keyrrrr/hs  zSmallPayloadEncryptor.decryptr0encryption_key_contentrcCsH|dkr dS|dkrtdt|}t|}|}t|||jS)a Encrypts any small payload using an RSA asymmetric key algorithm. The maximum size of the payload depends on the size of the encryption key. For example, given a key size of 4096 bits, the maximum size of the payload that can be encrypted is 447 bytes. :param unencrypted_data: The data to be encrypted :param encryption_key_content: The content of the OpenSSL X509 public certificate that will be used to encrypt the data :return: The base64 encoded and encrypted data as a bytes object :raises: ValueError if the payload size is too large :raises: ValueError if `encryption_key_content` is None N$encryption_key_content can't be None)rrr Zload_pem_x509_certificateZ public_keyrr3r?)r+r0rAZencryption_certZencryption_keyrrrr3~s zSmallPayloadEncryptor.encrypt)r4r5r6__doc__r SHA256Z&_SmallPayloadEncryptor__hash_algorithmrr r?r7rrrrr/r3rrrrr8]sr8c@sDeZdZdZeeeeedddZ eeeeedddZ dS) LargePayloadEncryptora This class provides methods to encrypt and decrypt large payloads via the Fernet symmetric encryption algorithm. The `encrypt()` method automatically generates a key for encryption. That key is then encrypted using the asymmetric public/private key algorithm of the `SmallPayloadEncrypter.encrypt()` method and is included in the resulting byte stream returned by this classes' `encrypt()` method. The "receiving" endpoint must then extract the Fernet key from the byte stream and use the corresponding private key of public/private key pair to decrypt the Fernet key. The decrypted Fernet key can then be used to decrypt the remainder of the payload. The only known restriction on payload size is that the payload must fit into memory. r;cCs|dkr dS|dkrtdt|}tt|d}|d}|dkrRtdt|}ttt ||}t|}t tt |}| |d ddS)a Decrypts data encrypted by the `encrypt()` method of this class. The decryption algorithm is 1. Decode the base-64 representation of the JSON object 2. Load the JSON into a Python dictionary 3. Extract the encrypted Fernet key from the JSON object and decrypt it using our asymmetric decryption algorithm, i.e., the same algorithm we use to decrypt passwords. 4. Extract the encrypted data from the JSON object and decrypt it using the Fernet decryption algorithm. :param encrypted_data: The data to be decrypted :param decryption_key_content: The content of the OpenSSL private key file corresponding to the public cert used to encrypt the data :return: The decrypted data as a `str` object :raises: ValueError if the decryption key is missing from the `encrypted_data` payload :raises: ValueError if decryption_key_content` is None Nr=rkeyz0token decryption key is missing from the payloadtoken)rrjsonloadsrr*getrrr8r/r rr)r+r$r<Z json_objectZencrypted_token_keyZdecrypted_token_keyfernet_encryptorrrrr/s zLargePayloadEncryptor.decryptr@cCs~|dkr dS|dkrtdt|tr0|d}t}t|}t||d|t t |dd}t t |dS)a Encrypts arbitrary data. This method uses a symmetric encryption algorithm (Fernet) to encrypt the data. This algorithm is capable of encrypting much larger payloads than asymmetric algorithms like RSA, which are limited by the key size and padding, if used. The encryption process is 1. Generate a random encryption key 2. Use that key to encrypt the original data. 3. Encrypt the key generated in step 1 by our asymmetric encryption algorithm, i.e., the same algorithm we use to encrypt passwords. This step may or may not use the same public/private keys we use for password encryption. 4. Create a Python dictionary with two entries: key: the encrypted Fernet key token: the data that was encrypted with the Fernet key Both the dictionary keys and values must be of type `str` to be JSON serializable. 5. Serialize the dictionary as a JSON string 6. Return a base-64 encoded representation of the JSON. :param unencrypted_data: The data to be encrypted :param encryption_key_content: The content of the OpenSSL X509 public certificate that will be used to encrypt the data :return: The encrypted key/text pair as a base-64 encoded `bytes` object :raises: ValueError if `encryption_key_content` is None NrBr)rFrG)rrrrr Z generate_keyr8r3r*rrrrHdumps)r+r0rArFrKresultrrrr3s  zLargePayloadEncryptor.encryptN) r4r5r6rCr7rrrrr/r3rrrrrEs  'rE)!r'rHr1base64rrpathlibrtypingrrrrZ cryptographyr Zcryptography.fernetr Zcryptography.hazmat.primitivesr r Z1cryptography.hazmat.primitives.asymmetric.paddingr rZoscrypto.symmetricrrrrrr!r"r8rErrrrs   '<__pycache__/fileutils.cpython-38.pyc000064400000015264147205126350013421 0ustar00U af@sddlZddlZddlZddlZddlmZmZmZzddlm Z Wne k rXYnXzddl m Z Wne k r~YnXddl mZddlZzddlZddlZWne k rYnXddlmZmZdZdZeeZejed d d Zdeeeeefdd d dZeddddZeddddZ eeddddZ!deeeeddddZ"d eeeeeeddddZ#dS)!N)AnyOptionalTuple)getpwnam)getgrnam)rmtree)get_file_group get_file_userg?)in_timereturncCst|tdddS)zpReturn integer seconds since epoch. :param in_time: Datetime :returns: Integer seconds since epoch i)intdatetime total_seconds)r rE/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/fileutils.pydatetime_to_epoch_seconds#sr)filenametimesr c Cs&t|dt||W5QRXdS)a0Set the atime and mtime of a file. .. note:: Linux (MAYBE Windows) ONLY. This does not work on Mac OSX. :param filename: The file to touch :param times: a two-tuple of (atime, mtime) where these are integer seconds since epoch; see os.utimes :raises OSError: if file doesn't exist aN)openosutime)rrrrrtouch,s r)pathr c CsBz t|Wn0tk r<}ztd||W5d}~XYnXdS)zySimple rmtree wrapper :param path: path to recursively remove :raises: IOError if the path can't be removed Failed to remove %sN)rOSErrorIOError)rerrrrm_path:s r )rr c CsDzt|Wn0tk r>}ztd||W5d}~XYnXdS)zuSimple remove wrapper :param filename: filename to remove :raises: IOError if the file can't be removed rN)rremoverr)rrrrrrm_fileFsr")rdatar c Cs0tjtj|d\}}t|d}t|}zz|t | dWnft k rz}zt d||f|W5d}~XYn4t k r}zt d||f|W5d}~XYnXW5|Xt|}|j|krt d||j|fzt||Wn6t k r*}zt d||f|W5d}~XYnXdS)z*A utility helper for the atomic file utils)dirwbutf8z$Failed to write data to path %s (%s)Nz1Unexpectedly failed to write data to path %s (%s)z4Failed to write correct number of bytes (%s, %s, %s))tempfilemkstemprrdirnamefdopenlenclosewritestrencoderr Exceptionstatst_sizerename)rr#Zoutfile_handleZ tmpfilenameoutfileZdatalenrZ stat_structrrr _write_fileRs& "(    r5)rr#add_all_access_userr cCstj|rt|t|||dk r~td|\}}}t|tj}| }| tj t j ||d|dt|tj|dS)a4Safely/atomically write a file on Windows. Write a file in such a way that the data is guaranteed to be complete and not intermixed with that of another process. This implementation will NOT clobber the target file if it exists already; instead, it will fail immediately. This is a windows-specific implementation that has Windows locking semantics for open file handles and rather than changing the file's ownership, setting add_all_access_user will cause the file to have a read-write ACE added to the ACL for the directory/file for the target user. Note that in both this function and atomically_write_file, the fact that the file has locking or not at the OS level is NOT being relied on in e.g. archon's file utilities for leasing or exclusive access. Instead, this implementation uses tmpfile to guarantee uniqueness of the source of information, and then a simple atomic mv to replace the destination. Windows, unlike Linux, could support a true OS level file locking layer for exclusive access, and so a future Windows specific file utility would be feasible for true single-host, global locking for use in e.g. archon. From https://stackoverflow.com/questions/12168110/setting-folder-permissions-in-windows-using-python :param filename: filename to write :param data: data to put in file :param add_all_access_user: the user if any to add a FILE_ALL_ACCESS ACE for :raises pywintypes.error: on failure to modify the file's ACL; pywintypes.error on user not found; pywintypes.error on file not found Nr r)rrexistsr"r5 win32securityZLookupAccountNameZGetFileSecurityZDACL_SECURITY_INFORMATIONZGetSecurityDescriptorDaclZAddAccessAllowedAceZ ACL_REVISIONconZFILE_ALL_ACCESSZSetSecurityDescriptorDaclZSetFileSecurity)rr#r6Zuserx_Zfile_sdZdaclrrrwin_atomically_write_filels  r<)rr# file_owner file_groupr cCs\t||d}|pt}|dk r*t|j}d}|p6t}|dk rJt|j}t|||dS)aSafely/atomically write a file. Write to a tmpfile then do os.rename(); Note that this is for Unix systems only, as the implicit contract is that the destination might exist and will be overwritten. This contract is violated on Windows. Also even on Unix, the file system of the source and dest must be the same; since we're in the same directory for this move, that constraint is satisfied. We use tempfile to make it so that we don't collide on the source tmp write -- different processes or threads will not select the same tmpfile. Last man in wins for the move -- there's no general way to sync this across processes or hosts using a filesystem store. :param filename: filename to ultimately write :param data: data to put in file :param file_owner: If set, will change file to this owner if permission is available :param file_group: If set, will change file to this group if permission is available :raises: IOError on failure; OSError on permission change without appropriate permissions N) r5r rpw_uidrrgr_gidrchown)rr#r=r>file_uidZfile_group_gidrrratomically_write_files     rD)N)N)NN)$rloggingros.pathtypingrrrpwdr ImportErrorgrprshutilrr'r9Z ntsecurityconr:Zprimordial.settingsrr ZDELAY_INCREASE_SECSZ MAX_RETRIES getLogger__name__LOGGERrrr.rr r"r5r<rDrrrrsL      &__pycache__/flow.cpython-38.pyc000064400000011312147205126350012356 0ustar00U af @sddlZddlmZddlmZmZmZmZmZdZ dZ e e Z GdddeZdeeeeed d d Zdd de efeeeeeeeeeefedddZdS)N)sleep)AnyCallableTupleTypeUniong?c@seZdZdZdS) RetryErrorz9A special type which signals the failure of a retry loop.N)__name__ __module__ __qualname____doc__rr@/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/flow.pyr sr ?) test_function max_tests sleep_secs count_truesreturncCs~d}d}d}||kr"td|}||kr\|}|dkrJ|d7}||krJq\|d7}t|q"||krttd||f|||fS)aAttempt test_function over and over, waiting for a true value. Try a maximum of max_tests times. Sleep sleep_secs in between each test. Receive count_trues before moving on. wait_for_true is designed specifically to handle a design principle behind AWS or other clustered services: that you might succeed when making a request once and then fail very soon after. For example, when you make a REST API call against S3 to see if a bucket exists, the fact that you get a "True" response does not guarantee that you will get that same response to a request made very soon after. In other words, it's for cases in which the goal is to produce a wait time for an eventually consistent external service to resolve your request. That's why wait_for_true lets you specify a threshold of how many trues you want to get before you're satisfied. :param test_function: a function to run whose result we will test for truthiness :param max_tests: limit to how many times we'll try test_function :param sleep_secs: how long to wait between tests :param count_trues: how many true results we need until we're totally true; this is useful with e.g. cluster tests, where we want a quorum of true answers before we're happy that the entire cluster is consistent (e.g. s3 put) :returns: the return value of the test function, the number of time it tried, and how many true results it found :raises RetryError: if the function "never" returned sufficiently many trues rNz9count_trues > max_tests, bumping max_tests to count_truesTz6wait_for_true never succeeded %s times for function %s)LOGGERwarningrr )rrrrZ itercountretZtruesrrr wait_for_trues"  rcCs|tS)N)DELAY_INCREASE_SECS)xrrr<r)action_functionrbackoff max_attemptsexceptions_to_ignorerc Csd}||krz|WS|k r}zN|d7}||krLtd|t|f|td||t|t|||}W5d}~XYqXqdS)aRetry an e.g. network connection until it doesn't throw an exception of any kind. This is for single-success retry cases (e.g. wow, that TCP connection didn't get established because of high latency, let's dial back and try again in a bit). The action_function is expected to be of a sort that any expected exceptions are caught, as this will retry under /any/ exception. The return value, if any, from the action_function is returned by retry on success. sleep_secs is the number of seconds to sleep between failures, and backoff defaults to adding DELAY_INCREASE_SECS to the prior value each time. max_attempts is the limit of the number of retries in any case. On failure to complete the action_function without exception, raises RetryError. :param action_function: a function to run that we will retry if it raises any exception :param sleep_secs: how long to wait between tries :param backoff: a function that will expand the sleep duration based on what iteration we are on :param max_attempts: limit to how many times we'll try action_function :param exceptions_to_ignore: exception type or types to ignore; by default, ignores all Exception-derived exceptions :returns: the return value of the test function, the number of time it tried, and how many true results it found :raises RetryError: if the function "never" returned without an exception rrzFailure to retry %s: %sz.retry: pausing %s secs before retrying %s (%s)N)r strrdebugr)r rr!r"r#Zattemptserrrretry:sr')rrr)loggingtimertypingrrrrrrZ MAX_RETRIES getLoggerr r Exceptionr intfloatrr'rrrrs$  *__pycache__/jsonutils.cpython-38.pyc000064400000007544147205126350013455 0ustar00U af @sddlZddlZddlmZmZddlZddlZddlmZddlm Z m Z m Z ddl m Zddl mZddlmZdZd ZeeZe e e fe e e fed d d Ze e e fed ddZe edddZee dddZGdddeZedZ GdddeZ!dS)N) JSONEncoder JSONDecoder) import_module)AnyCallableDict)validate) exceptions)CANONICAL_TIMESTRING_FORMATg?) json_dict schema_dictreturncCsF|dk s t|dk stzt||WdStjk r@YdSXdS)aTest that json_dict dictionary conforms to the swagger schema provided. :param json_dict: A dictionary of values :param schema_dict: A dict representation of swagger schema :returns: True or False :raises AssertionError: if either is None NTF)AssertionError json_validatejson_exceptionsZValidationError)r r rE/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/jsonutils.pyvalidate_swaggers   r)r rcCstj|tdddS)aYReturn a pretty-formatted string containing the values in json_dict. :param json_dict: A dictionary or other object that is serializable as a json string :returns: a (unicode) string representation with a default format of sorted keys and indent of 4 spaces :raises TypeError: if the thing in question cannot be json serialized T),:)cls sort_keys separators)jsondumpsDateTimeEncoder)r rrrpretty_json_str&sr)funcrcCsd|j|jfS)zCreate a string for a python function in a module. Class and object methods not supported currently. :param func: a python function :returns: a string representing this function z%s.%s) __module____name__)rrrr func_to_str0sr!)func_strrcCs*|d}|}td|}t||S)a=Use importlib to turn a string into a function. At the moment, this is limited to module functions and not classes. :param func_str: the string representation of a function, e.g. ``tests.test_workflow.ex_tst_func`` :returns: a python function :raises ImportError: if the function does not exist .)splitpoprjoingetattr)r"Z funcnotationsymbolmodulerrr str_to_func;s r*c@s eZdZdZeedddZdS)rUses our canonical format.)objrcCs(t|tjr|t}n t||}|SN) isinstancedatetimestrftimer rdefault)selfr,retrrrr1Ls   zDateTimeEncoder.defaultN)r r __qualname____doc__rr1rrrrrJsrz:^(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2}).(\d{6})Z$cs8eZdZdZfddZeeefedddZZS)DateTimeDecoderr+cstj|d|ji|dS)N object_hook)super__init__dict_to_object)r2argskwargs __class__rrr9ZszDateTimeDecoder.__init__)drcCs<|D].\}}t|trt|rtj|t||<q|Sr-)itemsr.strDATETIME_REGEXmatchr/strptimer )r2r?keyvaluerrrr:]szDateTimeDecoder.dict_to_object) r rr4r5r9rrr: __classcell__rrr=rr6Xs r6)"r/rrrloggingre importlibrtypingrrrZ jsonschemarrr rZprimordial.constantsr ZDELAY_INCREASE_SECSZ MAX_RETRIES getLoggerr LOGGERboolrrArr!r*rcompilerBr6rrrrs&     "   __pycache__/settings.cpython-38.pyc000064400000002066147205126350013255 0ustar00U af@sddlZdZdZddZddZeddZedZed Zed Z ed eZ ed Z ed Z edZ edZeddZeddZeddZz ddlTWnek rYnXdS)NcCstSz$Indirection for testing and settings) FILE_USERrrD/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/settings.py get_file_user srcCstSr) FILE_GROUPrrrrget_file_groupsrZK_HOSTSz127.0.0.1:2181GD_ZKRUN_VIRTUALENV_PATHGD_ZKRUN_COMMAND GD_ZKRUN_NAMEGD_ZKRUN_HOSTSGD_ZKRUN_SERVICE_TYPE GD_ZKRUN_PORTGD_ZKRUN_SSL_PORTGD_ZKRUN_LOCATIONSSSL_PRIVATE_KEYz/opt/hfs/ssl/server.keySSL_CERTIFICATEz/opt/hfs/ssl/server.crt SSL_CA_LISTz#/opt/hfs/ssl/client-trust-chain.crt)*)osrrrrgetenvr r r r r rrrrrrrZlocal_settings ImportErrorrrrrs(             __pycache__/sizes.cpython-38.pyc000064400000033520147205126350012551 0ustar00U af 1 @sddlZddlmZmZmZddddddd d d g ZGd d d ZGdddeZGdddeZGdddeZ GdddeZ GdddZ dS)N)ListOptionalTupleBKiBMiBGiBTiBPiBEiBZZiBZYiBc @seZdZdZdTeeeeeeeddddZeeeefddd Z edd d Z edd d Z edddZ edddZ edddZeedddZeedddZeedddZeedddZeedddZeedddZeedd d!Zd"d#Zd$d%Zd&d'Zd(d)Zd*d+Zd,d-Zd.d/Zd0d1Zd2d3Z d4d5Z!d6d7Z"d8d9Z#d:d;Z$dd?Z&d@dAZ'dBdCZ(dDdEZ)dFdGZ*dHdIZ+dJdKZ,dLdMZ-dNdOZ.dPdQZ/dRdSZ0dS)UByteSizez*Encapsulation for tracking sizes in bytes.rN)BytesKiBytesMiBytesGiBytesTiBytesPiBytesEiBytesreturncCsVt|dt|dt|dt|dt|dt|dt||_dS)N)int_bytes)selfr rrrrrrrA/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/sizes.py__init__ s zByteSize.__init__rcCsNg}|j}tD]4}|d@}|r.|d||f|dL}|dkrqDq|pLdgS)Nir )r)r _sizeUnitsinsert)rsresZcsizesizeZsvalrrr _breakdownszByteSize._breakdowncCsddd|DS)N,cSsg|] }d|qS%d%sr.0Zeachrrr *sz$ByteSize.__str__..joinr#rrrr__str__)szByteSize.__str__cCs$d|jjddd|DS)Nz{}({})r$cSs g|]\}}d|pd|qS)z {}ytes={}r)format)r(VUrrrr)/sz%ByteSize.__repr__..)r. __class____name__r+r#r,rrr__repr__,szByteSize.__repr__cCs@|}dt|dkr"|ddn|ddd|ddfS)z-Return the size in simple form (int ceiling).r&r)r#len)rbdrrr simpleSizeStr2szByteSize.simpleSizeStrcCs|}t|dksRt|ddt|dddkr^t|dddkr^d|dS|ddd|ddd}d|}|dr|dd |ddS||ddS) zAReturn the size in floating point form to two significant digits.r4rrr&@z%.2fz.00N)r#r5rindexendswith)rr6ZfvZrsprrr approxSizeStr7s   zByteSize.approxSizeStrcCsddd|DS)z%Return the size in full detail units.r$cSsg|] }d|qSr%rr'rrrr)Esz(ByteSize.fullSizeStr..r*r,rrr fullSizeStrCszByteSize.fullSizeStrcCs|jSNrr,rrrr GszByteSize.BytescCs |jdSNr8r r,rrrrKszByteSize.KiBytescCs |jdSr@)rr,rrrrOszByteSize.MiBytescCs |jdSr@)rr,rrrrSszByteSize.GiBytescCs |jdSr@)rr,rrrrWszByteSize.TiBytescCs |jdSr@)rr,rrrr[szByteSize.PiBytescCs |jdSr@)rr,rrrr_szByteSize.EiBytescCs|jSr>r?r,rrr __trunc__cszByteSize.__trunc__cCs|jSr>r?r,rrr __index__fszByteSize.__index__cCsD|dkr|jdkSz|j|jkWStk r>|j|kYSXdSNrrAttributeErrorrorrr__eq__js  zByteSize.__eq__cCsD|dkr|jdkSz|j|jkWStk r>|j|kYSXdSrDrErGrrr__ne__rs  zByteSize.__ne__cCsD|dkr|jdkSz|j|jkWStk r>|j|kYSXdSrDrErGrrr__gt__zs  zByteSize.__gt__cCsD|dkr|jdkSz|j|jkWStk r>|j|kYSXdSrDrErGrrr__ge__s  zByteSize.__ge__cCs>|dkr dSz|j|jkWStk r8|j|kYSXdS)NFrErGrrr__lt__s zByteSize.__lt__cCsD|dkr|jdkSz|j|jkWStk r>|j|kYSXdSrDrErGrrr__le__s  zByteSize.__le__cCs,t|dr||j|jS||j|SNr)hasattrr1rrGrrr__add__s zByteSize.__add__cCs.t|dr|j|j7_n|j|7_|SrO)rPrrGrrr__iadd__s zByteSize.__iadd__cCs<t|dr$|jtd|j|jdS|jtd|j|dS)NrrrA)rPr1maxrrGrrr__sub__s zByteSize.__sub__cCs4t|dr"|jt|jd8_n|j|8_|S)Nrr)rPrrSrGrrr__isub__s zByteSize.__isub__cCs|j|j|dSNrAr1rrGrrr__mul__szByteSize.__mul__cCs|j|j|dSrVrWrGrrr__rmul__szByteSize.__rmul__cCs|j|9_|Sr>r?rGrrr__imul__szByteSize.__imul__cCs*t|tr|j|jS|t|j|Sr> isinstancer rr1rrGrrr __floordiv__s  zByteSize.__floordiv__cCs ||jt|tr|jn|Sr>)r1rr\r rGrrr__mod__szByteSize.__mod__cCs||||fSr>rrGrrr __divmod__szByteSize.__divmod__cCs.t|tr|jd|jS|t|j|S)Ng?r[rGrrr __truediv__s zByteSize.__truediv__cCs|j|_t|j|_|Sr>rrrGrrr__idiv__s zByteSize.__idiv__cCs ||Sr>)rbrGrrr __itruediv__szByteSize.__itruediv__cCs|j|_t|j|_|Sr>rarGrrr __ifloordiv__s zByteSize.__ifloordiv__cCs|j|;_t|j|_|Sr>rarGrrr__imod__s zByteSize.__imod__cCs t|jSr>boolrr,rrr __nonzero__szByteSize.__nonzero__cCs t|jSr>rfr,rrr__bool__szByteSize.__bool__)rrrrrrr)1r2 __module__ __qualname____doc__rrrrstrr#r-r3r7r<r=propertyr floatrrrrrrrBrCrIrJrKrLrMrNrQrRrTrUrXrYrZr]r^r_r`rbrcrdrerhrirrrrr s~    r c@seZdZdZdS)MemSizez+Encapsulation for tracking amount of memoryNr2rjrkrlrrrrrpsrpc@seZdZdZdS)DiskSizez_invValrZ invalid_valuerrrrszInvalidMemSize.__init__cCsdt|jS)NzInvalidMemSize "%s"rmrur,rrrr-szInvalidMemSize.__str__cCsdt|jS)NzInvalidMemSize(%s)reprrur,rrrr3szInvalidMemSize.__repr__cCs t|tSr>r\rsrGrrrrIszInvalidMemSize.__eq__cCs t|t Sr>rzrGrrrrJszInvalidMemSize.__ne__Nr2rjrkrr-r3rIrJrrrrrss rsc@s4eZdZddZddZddZddZd d Zd S) InvalidDiskSizecCs ||_dSr>rtrvrrrr szInvalidDiskSize.__init__cCsdt|jS)NzInvalidDiskSize "%s"rwr,rrrr- szInvalidDiskSize.__str__cCsdt|jS)NzInvalidDiskSize(%s)rxr,rrrr3szInvalidDiskSize.__repr__cCs t|tSr>r\r|rGrrrrIszInvalidDiskSize.__eq__cCs t|t Sr>r}rGrrrrJszInvalidDiskSize.__ne__Nr{rrrrr|s r|c@seZdZdZdeddddZddZed d d Zee d d d Z ee d ddZ ee d ddZ ed ddZ ed ddZd ee edddZd!ee eedddZd"ee eedddZdS)# ByteSizeszEncapsulation for an aggregation of byte size values. The naturalUnits indicates which units to report sizes in if the size is not indicated or is a close approximation of those units (naturalUnits should be one of _sizeUnits) rN) naturalUnitsrcCs0td|_d|_td|_d|_||_d|_dSrD)r _accum_minSize_maxSize _numSizes_natUnit _sumUnits)rrrrrr"s   zByteSizes.__init__cCsdt|tst|d}|j|7_|jd7_|jdksD||jkrJ||_||jkrZ||_d|_|S)NrAr4)r\r rrrrrrGrrrrR*s   zByteSizes.__iadd__rcCs t|jSr>)rmrr,rrrr-6szByteSizes.__str__cCs|jdkrtdS|jSrD)rr r,rrr minimumSize9szByteSizes.minimumSizecCs|jSr>)rr,rrr maximumSize=szByteSizes.maximumSizecCs|jdkrtdS|j|jSrD)rr rr,rrr averageSizeAszByteSizes.averageSizecCs|ttd}tttdddD]T}td|d}||jkr>q |dt|jkrj|j|dkrj|jSt|S|jS)Nr4rrAd)r5rranger rr:r)rZsuIZbaseSizerrr_calcSummaryUnitsEs  "  zByteSizes._calcSummaryUnitscCs|jdkr||_|jSr>)rrr,rrr summaryUnitsPs  zByteSizes.summaryUnits)ofSizercCs|dkr|jS|S)zrReturn the specified size (or total size) in full detail units. :param: The size to be displayed N)rr=)rrrrrr=Us zByteSizes.fullSizeStrF)r withUnitsrc Csf|dkr|jSt|j|j|j|j|j|j|j |j d| }|rZd|| fSt t |S)agReturns the specified size (or total size) in simple form (int ceiling) based on the summary Units. The withUnits can be used to enable or suppress reporting of units in the output (units are summaryUnits() in either case). :param ofSize: The size to simplify :param withUnits: Whether to include units in the output Nrrrrrr r r r&)rr7mathceilr rrrrrrrrmrrrrvalrrrr7^s  zByteSizes.simpleSizeStrc Csd|dkr|jSt|j|j|j|j|j|j|j|j d| dd}d||r\| ndfS)aGive a string representation of a close approximation of the size. Returns the specified size (or total size) in floating point form to two significant digits, based on the on the summary Units. The withUnits can be used to enable or suppress reporting of units in the output (units are summaryUnits() in either case). :param ofSize: The size to be represented :param withUnits: Whether to include units in the output Nrr8z%.2f%sr) rr<rr rrrrrrrrrrrr<us$  zByteSizes.approxSizeStr)r)N)NF)NF)r2rjrkrlrmrrRr-rnr rrrrrrr=rgr7r<rrrrr~s   r~) rtypingrrrrr rprrrsr|r~rrrrsc__pycache__/timeutils.cpython-38.pyc000064400000003137147205126350013434 0ustar00U af@sdddlmZddlZddlmZddlmZmZeje dddZ d ejeej e e d d d ZdS) )OptionalN)utc)CANONICAL_TIMESTRING_FORMAT+CANONICAL_TIMESTRING_FORMAT_NO_MICROSECONDS)in_timereturncCst|tdddS)zu Return integer seconds since epoch. :param in_time: Datetime :returns: Integer seconds since epoch i)intdatetime total_seconds)rr E/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/timeutils.pydatetime_to_epoch_seconds srT)dttz microsecondsrcCsP|rtnt}|jdkr2|dkr&td|j|d}|jtk rF|t}||S)a Return a string representation of a datetime in ISO8601 format (YYYY-MM-DDTHH:MM:SS.ssssssZ) in the UTC (Z) timezone. :param dt: The datetime object. :param tz: The timezone to assume when coverting a naive datetime to UTC (Required if `dt` is naive). :param microseconds: Whether to include microseconds in the representation. Defaults to `True`. :returns: ISO8601-formatted string representation :raises ValueError: If the datetime is naive, and no tz is provided Nz7`tz` param must be provided if datetime object is naive)tzinfo)rrr ValueErrorreplaceUTC astimezonestrftime)rrrZtformatr r r iso8601_utcs     r)NT)typingrr ZpytzrrZprimordial.constantsrrr rrboolstrrr r r r s   __pycache__/utils.cpython-38.pyc000064400000012132147205126350012550 0ustar00U af' @sNddlZddlmZmZddlmZddlmZmZm Z m Z m Z m Z m Z dZeeZGdddeZe ee edd d Zeeefe eeeefd d d Zeeefe eeeefd ddZdedeeedddZdddeeedddZefedddfe e edfeee e egefe e egefdddZdS)N)partialupdate_wrapper)sleep)AnyDictIterableOptionalTupleTypeCallablec@seZdZdZdS)RetriesExhaustedErrorz9A special type which signals the failure of a retry loop.N)__name__ __module__ __qualname____doc__rrA/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/utils.pyr sr )hostnamereturncCsF|dkr dS|dr&|dd}|dD]}|r0|Sq0|S)zAbbreviate hostname for use on a Windows machine. :param hostname: the hostname :returns: the first non-empty domain in the hostname, excluding "www." Nzwww..)lower startswithsplit)rdomainrrrabbreviate_hostname_for_windowss  r)dict_keysrcsfdd|DS)a Filter a dictionary to contain only a certain set of keys. :param dict_: The original dictionary to be filtered. :param keys: A list, or other iterable, containing the desired dictionary keys. :returns: A dictionary containing only the desired keys. csi|]\}}|kr||qSrr.0kvrrr +szsubdict..itemsrrrr#rsubdict$sr(csfdd|DS)zFilter a dictionary to omit a set of keys. :param dict_: The original dictionary to be filtered. :param keys: An iterable containing the keys to omit. :returns: A dictionary with the desired keys omitted. csi|]\}}|kr||qSrrrr#rrr$5sz subdict_omit..r%r'rr#r subdict_omit.sr)) curr_attempt max_attempts)r*r+rcOs||kSNr)r*r+_args_kwargsrrr _should_retry8sr/)r* sleep_secs)r*r1rcOs|Sr,r)r*r1r-r.rrr _retry_after=sr2.) on_ex_classesr+r1 should_retry retry_aftercs4ptt|dptt|dfdd}|S)aDecorator that adds retry on error functionality to a function. Currently the retry strategy is 'linear' on errors. i.e. this function waits a set period of time before retrying the failed function again. :param on_ex_classes: A tuple of exceptions to retry on. By default, its all exceptions that derive from the 'Exception' class. :param max_attempts: Limit to how many times we'll retry :param sleep_secs: How long to wait between retries. :param should_retry: A predicate which when called will return a boolean saying whether the call should be retried or not. This parameter overrides the max_attempts parameter and gives more control to dynamically choose on if we need to continue retrying a call. :param retry_after: A callable that returns how long to wait between retries. This parameter overrides the sleep_secs parameter and gives more control to dynamically choose the wait time. :returns: This returns a decorator function that actually provides the retry functionality. )r+)r1cs.tddDfdd}t|S)Ncss|]}t|tr|VqdSr,) issubclass Exception)r Zex_clsrrr [s z.retry_this..wrapper..c sd}z ||WSk r}zx|d7}tdt|j|d|i|sfdj}t|||d|i|}td|jt|t|W5d}~XYqXqdS)Nrr0z)Exception (%s) occured while executing %sr*zMax attempts exhausted for {}z:Sleeping %s secs before retrying %s, due to exception (%s))LOGGERerrorstrrformatr debugr)Zpargskwargsr*emsgZs_secs) ex_classesfr5r4rrnew_func]s"   z-retry_this..wrapper..new_func)tupler)rBrCr3r5r4)rArBrwrapperZszretry_this..wrapper)rr/r2)r3r+r1r4r5rFrrEr retry_thisAsrG)logging functoolsrrtimertypingrrrrr r r Z MAX_RETRIES getLoggerrr9r7r r;rr(r)intboolr/r2rGrrrrs6 $ & &  __pycache__/validator.cpython-38.pyc000064400000017307147205126350013406 0ustar00U afL$@sddlZddlZddlmZmZddlmZmZmZmZm Z m Z m Z ddl m Z ddlmZmZmZddlmZe e eeefZe eefZGdddeZd*eeeed d d Zd+eeeed ddZeeZeedddZeedddZeedddZeedddZ eedddZ!e"dZ#eedddZ$d,e ee eee eddd Z%d-eeefeeefeeeefd!d"d#Z&d.e e ee eeefeed$d%d&Z'eed'd(d)Z(dS)/N)datetime timedelta)AnyCallableDictIterableListOptionalUnion)UUID)SchemaUrlMultipleInvalid)CANONICAL_TIMESTRING_FORMATc@seZdZdZdS)BadParameterExceptionz/To be raised when a validation operation fails.N)__name__ __module__ __qualname____doc__rrE/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/validator.pyrsrF) validator param_valuecoerce_returnc Cspt|r`z ||}Wn4tk rH}ztd||f|W5d}~XYnX|dkrX|}n|}|Std|dS)aRun a validation operation. Validate a particular parameter with a particular validator and possibly coerce the value into the validator's return type. :param validator: The validator to be run :param param_value: The value to be validated :param coerce_: Whether to return a type coerced value :raises ValueError: If the parameter could not be validated. z#Parameter %s failed validation (%s)NTz6Cannot use a non-callable as a parameter validator: %s)callable Exceptionr ValueError)rrrmyvaleretrrrvalidates  $r!)param_validatorrrrc Cs|dk rt|tr|D]X}|dkr2|dkr2|Szt|||d}|WStk rl}zW5d}~XYqXqtd||ft|||dS|S)aValidate a parameter. :param param_validator: The validator (or list of validators) to be run :param param_value: The value to be validated :param coerce_: Whether to return a type coerced value :raises ValueError: If the parameter could not be validated. Nrz&No validator in list validated %s (%s)) isinstancerr!rr)r"rrrretval_rrrvalidate_param1s  r')valrc CsFz t|Wn4tk r@}ztd||f|W5d}~XYnX|S)aValidate that a string looks like a URL. url is intended to be used like str or int to be a basic callable that will except on type mismatch or non- coercible value. :param val: The value to be checked :raises ValueError: If the value does not look like a URL. zNot a url: %s (%s)N) URL_SCHEMArr)r(rrrrurlUs  $r*cCs.zt|t}WdStk r(YdSXdS)zValidate that we can parse a datetime from a string. Catch exception and return false if strptime doesn't work. :param val: The value to be checked TFN)rstrptimerr)r(r&rrrparseable_datetimehs  r,cCs&t|tst|r|Std|dS)zValidate that a value represents a datetime. :param val: The value to be checked :raises ValueError: If the value does not represent a datetime. zNot a datetime: %sN)r$rr,rr(rrr is_datetimevsr.cCst|tr|Std|dS)zValidate that a value is a timedelta. :param val: The value to be checked :raises ValueError: If the value is not a timedelta. zNot a timedelta: %sN)r$rrr-rrr is_timedeltas r/cCs&t|tr|ikr|Std|dS)zValidate that a value is a non-empty dictionary. :param val: The value to be checked :raises ValueError: If the value is not a dictionary, or is empty. zNot a non-empty dict: %sN)r$dictrr-rrris_non_empty_dictsr1z^[a-z_][a-z0-9_]*$cCs$t|r|Stdtj|fdS)zValidate that argument is a valid Postgres identifier. :param val: The value to be checked :raises ValueError: If the value is not a valid Postgres identifier. z"Not a valid Postgres name (%s): %sN)POSTGRES_NAME_REGEXmatchrpatternr-rrris_postgres_names  r5)positional_argspositional_arg_valuesrrc Cstg}t|D]b\}}z ||}Wn&tk rF}zd}W5d}~XYnX|dk r`t|||d}n|}||q |S)azValidate a list of positional arguments. If we run out of stated positionals, we simply dump the originals unvalidated (we're saying the validators are optional) :param positional_args: The validators to be run, in order, against the values :param positional_arg_values: The values to be validated :param coerce_: Whether to return type coerced values Nr#) enumerate IndexErrorr'append) r6r7routargsindexZmyvaluerr&rrrrinternal_validate_positionalss   r=) keyword_argskeyword_arg_valuesrrc Csri}|D]`\}}z ||}Wn&tk rF}zd}W5d}~XYnX|dk r`t|||d}n|}|||<q |S)arValidate a dictionary of keyword arguments. If there is no matching validator for a particular keyword, then the original value is passed along in the output unvalidated. :param keyword_args: The validators to be run against the values :param keyword_arg_values: The values to be validated :param coerce_: Whether to return type coerced values Nr#)itemsKeyErrorr') r>r?rZoutdictkeyvaluerr&rrrrinternal_validate_keywordss   rD) positionalskeywordsrrcsfdd}|S)a>Either coerce the arguments in the suggested way or die with error back to the client. :param positionals: A list of validators to be run against positional arguments :param keywords: A dictionary of validators to be run against keyword arguments :param coerce_: Whether to return type coerced values cs tfdd}|S)ZInnerc sz<dk rt|d}n|}dk r6t|d}n|}Wn<tk rx}ztdt|t|f|W5d}~XYnX||S)zActual wrapperNr#zFailed to validate: %s, %s)r=rDrrstr)argskwargsr;Z outkwargsr)rfunctionrFrErrwrappers,z3coerce_or_error..decorator..wrapper) functoolswraps)rJrKrrFrE)rJr decoratorsz"coerce_or_error..decoratorr)rErFrrOrrNrcoerce_or_errors rP) uuid_maybercCs*zt|WdStk r$YdSXdS)z]Validate that a value represents a UUID. :param uuid_maybe: The value to be checked TFN)r r)rQrrris_uuids rR)F)F)F)F)NNF))rLrerrtypingrrrrrr r uuidr Z voluptuousr r rZprimordial.constantsrZ ValidatorTyperGZ DateTimeTyperrboolr!r'r)r*r,r.r/r1compiler2r5r=rDrPrRrrrrsR$   !          __pycache__/zkconfig.cpython-38.pyc000064400000007347147205126350013236 0ustar00U afw@sddlZddlZddlmZddlmZddlmZddlm Z m Z m Z ddl m Z ddlmZddlmZGd d d eZdS) N)deque)Any) KazooClient)ConnectionLoss NoNodeErrorZookeeperError)KazooTimeoutError)Config) retry_thisc@seZdZdZdZedZeddddZ dd Z d d Z e e eejeefd d dZddZddZdeeedddZdS)ZKConfigzFConfig class that loads configuration options from a ZooKeeper clusterz/configz\s*(\S+?)\s*=\s*(\S+)\s*N)hostsreturncCs t|dd|_i|_|dS)NT)r Z read_only)r_client_config load_config)selfr rD/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/zkconfig.py__init__szZKConfig.__init__cCs|j|jS)z This turns this class/object into a context manager for connections to the zookeeper cluster. This starts up the connection and returns the client as the resource being managed. )rstart)rrrr __enter__s zZKConfig.__enter__cCs2z|j|jWntk r,YnXdS)ze This method takes care of releasing the client connection to the zookeeper cluster. N)rstopclose Exception)rZ _exc_typeZ_exc_valZ_exc_tbrrr__exit__&s  zZKConfig.__exit__)Z on_ex_classesc Cs8|*}||D]}|j|||qW5QRXdS)z Load HFS config data including config at the various namespaces and flatten it out into a dict representation. N) enumeraterupdateget_config_at_path)r zk_clientpathrrrr1szZKConfig.load_configccsJt|jg}|rF|}|V||D]}d||}||q(q dS)z Generate all child paths, starting at a particular path. The starting path is also included in this enumeration. z{}/{}N)rROOTpopleftZ get_childrenformatappend)rrZpaths_to_traverserchildZ child_pathrrrr<s  zZKConfig.enumeratec Csvi}|ddd}||\}}|d}|dD]8}|j|} | r8| \} } d|| g} | || <q8|S)a, Get the data (which is interpreted as config uploaded to that path/namespace) at a particular zookeeper path. Parse out the data looking for lines that look like key=value. Generate a dict out of this normalizing the key appropriately. Finally return back this dict. /Nzutf-8 .)splitgetdecode CONFIG_LINEmatchgroupsjoin) rrrconfigZ path_sectionsZzk_valueZ_zk_statZ stored_configliner-kvZ config_keyrrrrKs    zZKConfig.get_config_at_path)keydefaultr cCs|j||S)z Get the config option as a string :param key: config option name :param default: default value if no value exists in the config :return: option value )rr*)rr4r5rrrr*csz ZKConfig.get)N)__name__ __module__ __qualname____doc__r recompiler,strrrrr rrsocketerrorrrrrrrr*rrrrr s    r )r:r= collectionsrtypingrZ kazoo.clientrZkazoo.exceptionsrrrZkazoo.handlers.threadingrZprimordial.configr Zprimordial.utilsr r rrrrs      __pycache__/zookeeper.cpython-38.pyc000064400000034671147205126350013427 0ustar00U afs5@sdZddlZddlZddlmZddlmZmZddlm Z ddl m Z ddl m Z ddlmZdd lmZdd lmZd Zefeed d dZGdddZGdddeZGdddeZGdddeZGdddZGdddeZGdddZdS)zZookeeper functions N)Enum)Any Generator)NamedTemporaryFile)contextmanager) KazooClient)ConnectionLoss)KazooTimeoutError) retry_thisz/etc/sysconfig/zookeeper)zk_config_filereturnc Csvtdd}|srt|tjrft|ddd4}|D]$}|dr6t| dd}q6W5QRX|srt d|S) z Lookup the zookeeper nodes and return them as a comma-separated string :returns: The zookeeper nodes as a comma-separated string ZZK_HOSTSNrzUTF-8)encoding=z"Could not retrieve Zookeeper Hosts) osgetenvaccessR_OKopen readlinesstrip startswitheval partitionZookeeperException)r ZnodesZzkcfglinerE/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/zookeeper.py lookup_hostss  rc@seZdZdZefedddZeee ddfdddZ ee d d d Z ee d d d Zdee e dddZeeddddZdee ddddZddddZedddZdS) Zookeeperz$Represents a connection to Zookeeperr cCs"||_||_t|jd|_dS)z Initialise a Zookeeper connection :param zk_config_file: The path to the zookeeper config file (if not /etc/sysconfig/zookeeper) hostsN)r rzk_hostsrclientselfr rrr__init__,s zZookeeper.__init__Nr ccs&|jz |jVW5|XdS)zz Yield a started zookeeper client in a context manager :yields: A started instance of KazooClient N)r%start stop_sessionr'rrr_start_session_cm5s  zZookeeper._start_session_cm)noder c Cs.|}||dk W5QRSQRXdS)z Check if node exists :param node: Name of zookeeper node :returns: (bool) Whether the node exists N)r-existsr'r.zkrrrr/As zZookeeper.existsc Cs.|}||dW5QRSQRXdS)zq Get the node value :param node: Name of zookeeper node :returns: The node value rN)r-getr0rrrr2Js z Zookeeper.get)r.defaultr c Cs@|.}||dk r2||dW5QRSW5QRX|S)a5 Get a node value if it exists. If it does not exist, return the default value specified :param node: Name of zookeeper node :param default: The default value to return if the node does not exist :returns: The node value or the default value if the node does not exist Nr)r-r/r2)r'r.r3r1rrrget_or_defaultSs $zZookeeper.get_or_default)r.valuer c Cs6|$}|||||W5QRSQRXdS)z~ Set the node value :param node: Name of zookeeper node :param value: Value of zookeeper node N)r-Z ensure_pathset)r'r.r5r1rrrr6_s  z Zookeeper.setF)r. recursiver c Cs4|"}||dk r&|j||dW5QRXdS)z Delete the node, if it exists :param node: Name of zookeeper node :param recursive: Whether to delete the node and all child nodes N)r7)r-r/delete)r'r.r7r1rrrr8is zZookeeper.deletecCs|j|jdS)z+End and close the current zookeeper sessionN)r%stopcloser,rrrr+ss zZookeeper.stop_sessioncCs t|jS)z Lookup the zookeeper nodes and return them as a comma-separated string :returns: The zookeeper nodes as a comma-separated string )rr r,rrrrxszZookeeper.lookup_hosts)N)F)__name__ __module__ __qualname____doc__ZK_CONFIG_FILEstrr(rrrr-boolr/rr2r4bytesr6r8r+rrrrrr )s       r cs@eZdZdZedeedfddZeedddZZ S) ZKFilea Creates a named temporary file with the contents of one or more znodes. Useful for APIs that only accept file paths rather than file-like objects. Warning: Because the file is a temp file there is a possibility of it being deleted by some tempfile cleanup mechanisms (tmpwatch, etc.). In this case it may be a better idea to read the znode into memory and create the temp file from that each time you use it. r!)znodesr csDtj|dt|_|D]}|r|j||q|jdS)z Load the znode contents into a temporary file :param znodes: An expanded list of zookeeper node names :param zk_config_file: The zookeeper config file (if not the default of /etc/sysconfig/zookeeper) r!N)superr(rfilewriter2flush)r'r rDZznode __class__rrr(s zZKFile.__init__r)cCs|jjS)zNGet the filename for the temporary file storing the contents of the zk node(s))rFnamer,rrrrKsz ZKFile.name) r;r<r=r>r?r@r(propertyrK __classcell__rrrIrrCs rCc@s eZdZdS)rNr;r<r=rrrrrsrc@s$eZdZdZdZdZdZdZdZdS)ZKModezH Enum to represent the mode a zk node is currently operating in rN) r;r<r=r>LEADERFOLLOWER STANDALONEERRORUNKNOWNrrrrrOs rOc@seZdZdZedddZddZddZd d Ze d d Z e d dZ e e eejfddeedddZe edddZe edddZe edddZe edddZdS)ZkEnsembleNodez This class represents a single node in a zookeeper cluster and holds to the status/mode that node is operating in. Values are leader, follower, standalone, error or unknown. hostcCs*t|d|_|d\|_|_tj|_dS)z Initialize the class. :param host: The host:port connection for the node whose mode/state this class holds. r":N)r_ZkEnsembleNode__clientsplit_ZkEnsembleNode__host_ZkEnsembleNode__portrOrX_ZkEnsembleNode__mode)r'r[rrrr(s zZkEnsembleNode.__init__cCs|j|jS)z This turns this class/object into a context manager for connections to the zookeeper node. This starts up the connection and returns the client as the resource being managed. )r]r*r,rrr __enter__s zZkEnsembleNode.__enter__cCs2z|j|jWntk r,YnXdS)zb This method takes care of releasing the client connection to the zookeeper node. N)r]r9r: Exception)r'Z _exc_typeZ_exc_valZ_exc_tbrrr__exit__s  zZkEnsembleNode.__exit__cCsd|j|jS)< Python repr implementation for this class. zHost: {}, Mode: {})formatr[moder,rrr__repr__szZkEnsembleNode.__repr__cCs|jSN)r_r,rrrr[szZkEnsembleNode.hostcCs|jSri)r`r,rrrportszZkEnsembleNode.port)Z on_ex_classesF)forcer c Cs|s|jtjk r|jS|f}|jdd}|dkrt|jdd}ttdd|dd}|d d }t||_ntj|_W5QRX|jS) aB This method returns the mode the zk node is currently operating in. If a nodes mode has already been fetched, then this method returns the cached mode/status. To initiate a re-fetch of the status, use the force parameter. :param force: Force re-fetch of the zk node's status/mode sruok)cmdZimokssrvrcSs |dS)NzMode: )r)lrrrz+ZkEnsembleNode.fetch_mode.. rN) rarOrXcommandlistfilterr^upperrW)r'rkZ zk_clientZruokZsrvrZ mode_linemrrr fetch_modes    zZkEnsembleNode.fetch_moder)cCs|jS)z? Property to return the internally cached mode )rar,rrrrgszZkEnsembleNode.modecCs |jtjkS)z\ Python property to check if the node is currently operating as a follower. )rgrOrUr,rrr is_followerszZkEnsembleNode.is_followercCs |jtjkS)zZ Python property to check if the node is currently operating as a leader. )rgrOrTr,rrr is_leaderszZkEnsembleNode.is_leadercCs |jtjkS)za Python property to check if the node is currently operating in standalone mode. )rgrOrVr,rrr is_standalone szZkEnsembleNode.is_standaloneN)F)r;r<r=r>r@r(rbrdrhrLr[rjr rr socketerrorrArOrwrgrxryrzrrrrrYs&     rYc@s eZdZdS)ZkEnsembleStatusNotGatheredNrNrrrrr}sr}c@seZdZdZefedddZeddZeddZ de d d d Z ee d ddZ ee d ddZ ee d ddZddZdS)ZookeeperEnsemblez This class is used to represent a zookeeper ensemble/cluster and test if there is currently a quorum in the cluster. r!cCsf||_t||_|jd|_dd|jD|_tt|jdd|_g|_ g|_ g|_ | dS)z Initialise a Zookeeper connection :param zk_config_file: Path to the zookeeper config file (default /etc/sysconfig/zookeeper) ,cSsg|]}t|dqS)rZ)rY).0hrrr )sz.ZookeeperEnsemble.__init__..rrPN) r rZzk_hosts_conn_strr^r$ensemble_nodesintlenSIZE_FOR_QUORUM_leaders _followers _standalone gather_statusr&rrrr(s zZookeeperEnsemble.__init__cCs t|jS)zS Python property that returns the number of nodes in this cluster. )rrr,rrr ensemble_size1szZookeeperEnsemble.ensemble_sizecCs t|jS)zU Python property that returns the number of followers in the quorum. )rrr,rrrfollowers_size8sz ZookeeperEnsemble.followers_sizeF)rkc Csv|jD](}z||Wqtk r,YqXqttdd|j|_ttdd|j|_ttdd|j|_dS)a Method to gather the status of the nodes in the ensemble. Note, currently if the node has a cached status, then that is what's used. We dont force a re-fetch. :param force: Force re-fetch of the zk node's status/mode cSs|jSri)ryr.rrrrnNroz1ZookeeperEnsemble.gather_status..cSs|jSri)rxrrrrrnOrocSs|jSri)rzrrrrrnProN)rrwrcrsrtrrr)r'rkr.rrrr?s zZookeeperEnsemble.gather_statusr)cCst|jdkS)zZ Python property to check if the cluster is operating in standalone mode. r)rrr,rrris_in_standalone_modeRsz'ZookeeperEnsemble.is_in_standalone_modecCst|jdkS)zx Python property to check if the cluster has a split brain, perhaps as a result of a network partition. rP)rrr,rrrhas_split_brainYsz!ZookeeperEnsemble.has_split_braincCs*|j o(|j o(t|jt|j|jkS)z Python property to check if the cluster currently has quorum. Make sure gather_status has been called before we call this method. )rrrrrrr,rrr has_quorum`s zZookeeperEnsemble.has_quorumcCsddd|jD}|jr&d}d}n8|jrVd|jdj}ddd|jD}nd}d}d d d|jD}d ||j |j|||S) rez, cSsg|] }|jqSrrZ)rnrrrrrsz.ZookeeperEnsemble.__repr__..rV{}rcSsg|] }|jqSrrZ)rfrrrrysz SPLIT BRAINz cSsg|]}d|qS)r)rf)rr.rrrr~szPNodes: {} Ensemble mode? {} Has Quorum? {} Leader: {} Followers: {} Details: {})joinrrrrfrr[r)r'Zzk_nodesZleaderZ followersdetailsrrrrhns$zZookeeperEnsemble.__repr__N)F)r;r<r=r>r?r@r(rLrrrArrrrrhrrrrr~s   r~)r>rr{enumrtypingrrtempfiler contextlibrZ kazoo.clientrZkazoo.exceptionsrZkazoo.handlers.threadingr Zprimordial.utilsr r?r@rr rCrcrrOrYr}r~rrrrs&       W i