var/opt/nydus/ops/primordial/zookeeper.py000064400000032563147205270400014632 0ustar00# -*- coding: utf-8 -*- """\ Zookeeper functions """ import os import socket from enum import Enum from typing import Any, Generator from tempfile import NamedTemporaryFile from contextlib import contextmanager from kazoo.client import KazooClient from kazoo.exceptions import ConnectionLoss # This is the error thrown when the KazooClient.start call times out and the connections wasn't established from kazoo.handlers.threading import KazooTimeoutError from primordial.utils import retry_this ZK_CONFIG_FILE = '/etc/sysconfig/zookeeper' def lookup_hosts(zk_config_file: str = ZK_CONFIG_FILE) -> str: """ Lookup the zookeeper nodes and return them as a comma-separated string :returns: The zookeeper nodes as a comma-separated string """ nodes = os.getenv('ZK_HOSTS', None) if not nodes: if os.access(zk_config_file, os.R_OK): with open(zk_config_file, 'r', encoding='UTF-8') as zkcfg: for line in zkcfg.readlines(): if line.strip().startswith('ZK_HOSTS'): nodes = eval(line.partition('=')[2]) # pylint: disable=eval-used if not nodes: raise ZookeeperException('Could not retrieve Zookeeper Hosts') return nodes class Zookeeper: """Represents a connection to Zookeeper""" def __init__(self, zk_config_file: str = ZK_CONFIG_FILE): """ Initialise a Zookeeper connection :param zk_config_file: The path to the zookeeper config file (if not /etc/sysconfig/zookeeper) """ self.zk_config_file = zk_config_file self.zk_hosts = self.lookup_hosts() self.client = KazooClient(hosts=self.zk_hosts) @contextmanager def _start_session_cm(self) -> Generator[KazooClient, None, None]: """ Yield a started zookeeper client in a context manager :yields: A started instance of KazooClient """ self.client.start() try: yield self.client finally: self.stop_session() def exists(self, node: str) -> bool: """ Check if node exists :param node: Name of zookeeper node :returns: (bool) Whether the node exists """ with self._start_session_cm() as zk: return zk.exists(node) is not None def get(self, node: str) -> Any: """ Get the node value :param node: Name of zookeeper node :returns: The node value """ with self._start_session_cm() as zk: return zk.get(node)[0] def get_or_default(self, node: str, default: Any = None) -> Any: """ Get a node value if it exists. If it does not exist, return the default value specified :param node: Name of zookeeper node :param default: The default value to return if the node does not exist :returns: The node value or the default value if the node does not exist """ with self._start_session_cm() as zk: if zk.exists(node) is not None: return zk.get(node)[0] return default def set(self, node: str, value: bytes) -> None: """ Set the node value :param node: Name of zookeeper node :param value: Value of zookeeper node """ with self._start_session_cm() as zk: zk.ensure_path(node) return zk.set(node, value) def delete(self, node: str, recursive: bool = False) -> None: """ Delete the node, if it exists :param node: Name of zookeeper node :param recursive: Whether to delete the node and all child nodes """ with self._start_session_cm() as zk: if zk.exists(node) is not None: zk.delete(node, recursive=recursive) def stop_session(self) -> None: """End and close the current zookeeper session""" self.client.stop() self.client.close() def lookup_hosts(self) -> str: """ Lookup the zookeeper nodes and return them as a comma-separated string :returns: The zookeeper nodes as a comma-separated string """ return lookup_hosts(self.zk_config_file) class ZKFile(Zookeeper): """ Creates a named temporary file with the contents of one or more znodes. Useful for APIs that only accept file paths rather than file-like objects. Warning: Because the file is a temp file there is a possibility of it being deleted by some tempfile cleanup mechanisms (tmpwatch, etc.). In this case it may be a better idea to read the znode into memory and create the temp file from that each time you use it. """ def __init__(self, *znodes: str, zk_config_file: str = ZK_CONFIG_FILE): """ Load the znode contents into a temporary file :param znodes: An expanded list of zookeeper node names :param zk_config_file: The zookeeper config file (if not the default of /etc/sysconfig/zookeeper) """ super().__init__(zk_config_file=zk_config_file) self.file = NamedTemporaryFile() # pylint: disable=consider-using-with for znode in znodes: if znode: # looks silly, but None is a possibility self.file.write(self.get(znode)) self.file.flush() @property def name(self) -> str: """Get the filename for the temporary file storing the contents of the zk node(s)""" return self.file.name class ZookeeperException(Exception): pass class ZKMode(Enum): """ Enum to represent the mode a zk node is currently operating in """ LEADER = 1 FOLLOWER = 2 STANDALONE = 3 ERROR = 4 UNKNOWN = 5 class ZkEnsembleNode: """ This class represents a single node in a zookeeper cluster and holds to the status/mode that node is operating in. Values are leader, follower, standalone, error or unknown. """ def __init__(self, host: str): """ Initialize the class. :param host: The host:port connection for the node whose mode/state this class holds. """ self.__client = KazooClient(hosts=host) self.__host, self.__port = host.split(':') self.__mode = ZKMode.UNKNOWN def __enter__(self): """ This turns this class/object into a context manager for connections to the zookeeper node. This starts up the connection and returns the client as the resource being managed. """ # If start throws an error it will get propagated up the stack # The start method can throw a KazooTimeoutError error self.__client.start() return self.__client def __exit__(self, _exc_type, _exc_val, _exc_tb): """ This method takes care of releasing the client connection to the zookeeper node. """ try: self.__client.stop() self.__client.close() except Exception: # pylint: disable=W0703 # Squelch any exception we may encounter as part of connection closing pass def __repr__(self): """ Python repr implementation for this class. """ return 'Host: {}, Mode: {}'.format(self.host, self.mode) @property def host(self): return self.__host @property def port(self): return self.__port @retry_this(on_ex_classes=(ConnectionLoss, KazooTimeoutError, socket.error)) def fetch_mode(self, force: bool = False) -> ZKMode: """ This method returns the mode the zk node is currently operating in. If a nodes mode has already been fetched, then this method returns the cached mode/status. To initiate a re-fetch of the status, use the force parameter. :param force: Force re-fetch of the zk node's status/mode """ if not force and self.__mode is not ZKMode.UNKNOWN: return self.__mode with self as zk_client: # If there was an exception while executing the 4 letter command ruok, let it propagate up the stack ruok = zk_client.command(cmd=b"ruok") if ruok == "imok": # If there was an exception while executing the 4 letter command srvr, let it propagate up the stack srvr = zk_client.command(cmd=b"srvr") mode_line = list(filter(lambda l: l.startswith('Mode: '), srvr.split('\n')))[0] m = mode_line[6:] self.__mode = ZKMode[m.upper()] else: self.__mode = ZKMode.ERROR return self.__mode @property def mode(self) -> ZKMode: """ Property to return the internally cached mode """ return self.__mode @property def is_follower(self) -> bool: """ Python property to check if the node is currently operating as a follower. """ return self.mode == ZKMode.FOLLOWER @property def is_leader(self) -> bool: """ Python property to check if the node is currently operating as a leader. """ return self.mode == ZKMode.LEADER @property def is_standalone(self) -> bool: """ Python property to check if the node is currently operating in standalone mode. """ return self.mode == ZKMode.STANDALONE class ZkEnsembleStatusNotGathered(Exception): pass class ZookeeperEnsemble: """ This class is used to represent a zookeeper ensemble/cluster and test if there is currently a quorum in the cluster. """ def __init__(self, zk_config_file: str = ZK_CONFIG_FILE): """ Initialise a Zookeeper connection :param zk_config_file: Path to the zookeeper config file (default /etc/sysconfig/zookeeper) """ self.zk_config_file = zk_config_file # This looks like p3dlhfzk01.cloud.phx3.gdg:2181,p3dlhfzk02.cloud.phx3.gdg:2181,p3dlhfzk03.cloud.phx3.gdg:2181 self.zk_hosts_conn_str = lookup_hosts(zk_config_file) # This looks like ['p3dlhfzk01.cloud.phx3.gdg:2181', 'p3dlhfzk02.cloud.phx3.gdg:2181', # 'p3dlhfzk03.cloud.phx3.gdg:2181'] self.zk_hosts = self.zk_hosts_conn_str.split(",") self.ensemble_nodes = [ZkEnsembleNode(host=h) for h in self.zk_hosts] self.SIZE_FOR_QUORUM = int(len(self.zk_hosts) / 2) + 1 self._leaders = [] # type: ignore self._followers = [] # type: ignore self._standalone = [] # type: ignore self.gather_status() @property def ensemble_size(self): """ Python property that returns the number of nodes in this cluster. """ return len(self.ensemble_nodes) @property def followers_size(self): """ Python property that returns the number of followers in the quorum. """ return len(self._followers) def gather_status(self, force: bool = False): """ Method to gather the status of the nodes in the ensemble. Note, currently if the node has a cached status, then that is what's used. We dont force a re-fetch. :param force: Force re-fetch of the zk node's status/mode """ for node in self.ensemble_nodes: try: node.fetch_mode(force) except Exception: # pylint: disable=W0703 # Squelch any exception we may encounter pass self._leaders = list(filter(lambda node: node.is_leader, self.ensemble_nodes)) self._followers = list(filter(lambda node: node.is_follower, self.ensemble_nodes)) self._standalone = list(filter(lambda node: node.is_standalone, self.ensemble_nodes)) @property def is_in_standalone_mode(self) -> bool: """ Python property to check if the cluster is operating in standalone mode. """ return len(self._standalone) > 0 @property def has_split_brain(self) -> bool: """ Python property to check if the cluster has a split brain, perhaps as a result of a network partition. """ return len(self._leaders) > 1 @property def has_quorum(self) -> bool: """ Python property to check if the cluster currently has quorum. Make sure gather_status has been called before we call this method. """ # The logic for checking if an ensemble has quorum is: # 1. Is not running in standalone mode # 2. There is no split brain i.e. more than one leader # 3. The leader plus follower count is more than the number required for quorum i.e. n/2 rounded up return not self.is_in_standalone_mode and \ not self.has_split_brain \ and (len(self._leaders) + len(self._followers) >= self.SIZE_FOR_QUORUM) def __repr__(self): """ Python repr implementation for this class. """ zk_nodes = ", ".join([n.host for n in self.ensemble_nodes]) if self.is_in_standalone_mode: leader = "STANDALONE" followers = "STANDALONE" elif self.has_quorum: leader = "{}".format(self._leaders[0].host) followers = ", ".join([f.host for f in self._followers]) else: leader = "SPLIT BRAIN" followers = "SPLIT BRAIN" details = "\t\n".join(['{}'.format(node) for node in self.ensemble_nodes]) return 'Nodes: {}\nEnsemble mode? {}\nHas Quorum? {}\nLeader: {}\nFollowers: {}\n\nDetails:\n{}'\ .format( zk_nodes, not self.is_in_standalone_mode, self.has_quorum, leader, followers, details)