var/opt/nydus/ops/primordial/flow.py000064400000011256147205274170013602 0ustar00# -*- coding: utf-8 -*- import logging from time import sleep from typing import Any, Callable, Tuple, Type, Union DELAY_INCREASE_SECS = 1.0 MAX_RETRIES = 3 LOGGER = logging.getLogger(__name__) class RetryError(Exception): """A special type which signals the failure of a retry loop.""" def wait_for_true(test_function: Callable, max_tests: int = 10, sleep_secs: float = 0.5, count_trues: int = 3) -> Any: """Attempt test_function over and over, waiting for a true value. Try a maximum of max_tests times. Sleep sleep_secs in between each test. Receive count_trues before moving on. wait_for_true is designed specifically to handle a design principle behind AWS or other clustered services: that you might succeed when making a request once and then fail very soon after. For example, when you make a REST API call against S3 to see if a bucket exists, the fact that you get a "True" response does not guarantee that you will get that same response to a request made very soon after. In other words, it's for cases in which the goal is to produce a wait time for an eventually consistent external service to resolve your request. That's why wait_for_true lets you specify a threshold of how many trues you want to get before you're satisfied. :param test_function: a function to run whose result we will test for truthiness :param max_tests: limit to how many times we'll try test_function :param sleep_secs: how long to wait between tests :param count_trues: how many true results we need until we're totally true; this is useful with e.g. cluster tests, where we want a quorum of true answers before we're happy that the entire cluster is consistent (e.g. s3 put) :returns: the return value of the test function, the number of time it tried, and how many true results it found :raises RetryError: if the function "never" returned sufficiently many trues """ itercount = 0 ret = None trues = 0 if count_trues > max_tests: LOGGER.warning("count_trues > max_tests, bumping max_tests to count_trues") max_tests = count_trues while itercount < max_tests: ret = test_function() if ret is True: trues += 1 if trues >= count_trues: break itercount += 1 sleep(sleep_secs) if trues < count_trues: raise RetryError("wait_for_true never succeeded %s times for function %s" % (count_trues, test_function)) return ret, itercount, trues def retry(action_function: Callable, sleep_secs: float = 1.0, backoff: Callable = lambda x: x + DELAY_INCREASE_SECS, max_attempts: int = MAX_RETRIES, exceptions_to_ignore: Union[Type[Exception], Tuple[Type[Exception]]] = Exception) -> Any: """Retry an e.g. network connection until it doesn't throw an exception of any kind. This is for single-success retry cases (e.g. wow, that TCP connection didn't get established because of high latency, let's dial back and try again in a bit). The action_function is expected to be of a sort that any expected exceptions are caught, as this will retry under /any/ exception. The return value, if any, from the action_function is returned by retry on success. sleep_secs is the number of seconds to sleep between failures, and backoff defaults to adding DELAY_INCREASE_SECS to the prior value each time. max_attempts is the limit of the number of retries in any case. On failure to complete the action_function without exception, raises RetryError. :param action_function: a function to run that we will retry if it raises any exception :param sleep_secs: how long to wait between tries :param backoff: a function that will expand the sleep duration based on what iteration we are on :param max_attempts: limit to how many times we'll try action_function :param exceptions_to_ignore: exception type or types to ignore; by default, ignores all Exception-derived exceptions :returns: the return value of the test function, the number of time it tried, and how many true results it found :raises RetryError: if the function "never" returned without an exception """ attempts = 0 while attempts < max_attempts: try: return action_function() except exceptions_to_ignore as e: # pylint: disable=W0703 attempts += 1 if attempts >= max_attempts: raise RetryError("Failure to retry %s: %s" % (action_function, str(e))) from e LOGGER.debug("retry: pausing %s secs before retrying %s (%s)", sleep_secs, action_function, str(e)) sleep(sleep_secs) sleep_secs = backoff(sleep_secs)