U afs5@sdZddlZddlZddlmZddlmZmZddlm Z ddl m Z ddl m Z ddlmZdd lmZdd lmZd Zefeed d dZGdddZGdddeZGdddeZGdddeZGdddZGdddeZGdddZdS)zZookeeper functions N)Enum)Any Generator)NamedTemporaryFile)contextmanager) KazooClient)ConnectionLoss)KazooTimeoutError) retry_thisz/etc/sysconfig/zookeeper)zk_config_filereturnc Csvtdd}|srt|tjrft|ddd4}|D]$}|dr6t| dd}q6W5QRX|srt d|S) z Lookup the zookeeper nodes and return them as a comma-separated string :returns: The zookeeper nodes as a comma-separated string ZZK_HOSTSNrzUTF-8)encoding=z"Could not retrieve Zookeeper Hosts) osgetenvaccessR_OKopen readlinesstrip startswitheval partitionZookeeperException)r ZnodesZzkcfglinerE/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/primordial/zookeeper.py lookup_hostss  rc@seZdZdZefedddZeee ddfdddZ ee d d d Z ee d d d Zdee e dddZeeddddZdee ddddZddddZedddZdS) Zookeeperz$Represents a connection to Zookeeperr cCs"||_||_t|jd|_dS)z Initialise a Zookeeper connection :param zk_config_file: The path to the zookeeper config file (if not /etc/sysconfig/zookeeper) hostsN)r rzk_hostsrclientselfr rrr__init__,s zZookeeper.__init__Nr ccs&|jz |jVW5|XdS)zz Yield a started zookeeper client in a context manager :yields: A started instance of KazooClient N)r%start stop_sessionr'rrr_start_session_cm5s  zZookeeper._start_session_cm)noder c Cs.|}||dk W5QRSQRXdS)z Check if node exists :param node: Name of zookeeper node :returns: (bool) Whether the node exists N)r-existsr'r.zkrrrr/As zZookeeper.existsc Cs.|}||dW5QRSQRXdS)zq Get the node value :param node: Name of zookeeper node :returns: The node value rN)r-getr0rrrr2Js z Zookeeper.get)r.defaultr c Cs@|.}||dk r2||dW5QRSW5QRX|S)a5 Get a node value if it exists. If it does not exist, return the default value specified :param node: Name of zookeeper node :param default: The default value to return if the node does not exist :returns: The node value or the default value if the node does not exist Nr)r-r/r2)r'r.r3r1rrrget_or_defaultSs $zZookeeper.get_or_default)r.valuer c Cs6|$}|||||W5QRSQRXdS)z~ Set the node value :param node: Name of zookeeper node :param value: Value of zookeeper node N)r-Z ensure_pathset)r'r.r5r1rrrr6_s  z Zookeeper.setF)r. recursiver c Cs4|"}||dk r&|j||dW5QRXdS)z Delete the node, if it exists :param node: Name of zookeeper node :param recursive: Whether to delete the node and all child nodes N)r7)r-r/delete)r'r.r7r1rrrr8is zZookeeper.deletecCs|j|jdS)z+End and close the current zookeeper sessionN)r%stopcloser,rrrr+ss zZookeeper.stop_sessioncCs t|jS)z Lookup the zookeeper nodes and return them as a comma-separated string :returns: The zookeeper nodes as a comma-separated string )rr r,rrrrxszZookeeper.lookup_hosts)N)F)__name__ __module__ __qualname____doc__ZK_CONFIG_FILEstrr(rrrr-boolr/rr2r4bytesr6r8r+rrrrrr )s       r cs@eZdZdZedeedfddZeedddZZ S) ZKFilea Creates a named temporary file with the contents of one or more znodes. Useful for APIs that only accept file paths rather than file-like objects. Warning: Because the file is a temp file there is a possibility of it being deleted by some tempfile cleanup mechanisms (tmpwatch, etc.). In this case it may be a better idea to read the znode into memory and create the temp file from that each time you use it. r!)znodesr csDtj|dt|_|D]}|r|j||q|jdS)z Load the znode contents into a temporary file :param znodes: An expanded list of zookeeper node names :param zk_config_file: The zookeeper config file (if not the default of /etc/sysconfig/zookeeper) r!N)superr(rfilewriter2flush)r'r rDZznode __class__rrr(s zZKFile.__init__r)cCs|jjS)zNGet the filename for the temporary file storing the contents of the zk node(s))rFnamer,rrrrKsz ZKFile.name) r;r<r=r>r?r@r(propertyrK __classcell__rrrIrrCs rCc@s eZdZdS)rNr;r<r=rrrrrsrc@s$eZdZdZdZdZdZdZdZdS)ZKModezH Enum to represent the mode a zk node is currently operating in rN) r;r<r=r>LEADERFOLLOWER STANDALONEERRORUNKNOWNrrrrrOs rOc@seZdZdZedddZddZddZd d Ze d d Z e d dZ e e eejfddeedddZe edddZe edddZe edddZe edddZdS)ZkEnsembleNodez This class represents a single node in a zookeeper cluster and holds to the status/mode that node is operating in. Values are leader, follower, standalone, error or unknown. hostcCs*t|d|_|d\|_|_tj|_dS)z Initialize the class. :param host: The host:port connection for the node whose mode/state this class holds. r":N)r_ZkEnsembleNode__clientsplit_ZkEnsembleNode__host_ZkEnsembleNode__portrOrX_ZkEnsembleNode__mode)r'r[rrrr(s zZkEnsembleNode.__init__cCs|j|jS)z This turns this class/object into a context manager for connections to the zookeeper node. This starts up the connection and returns the client as the resource being managed. )r]r*r,rrr __enter__s zZkEnsembleNode.__enter__cCs2z|j|jWntk r,YnXdS)zb This method takes care of releasing the client connection to the zookeeper node. N)r]r9r: Exception)r'Z _exc_typeZ_exc_valZ_exc_tbrrr__exit__s  zZkEnsembleNode.__exit__cCsd|j|jS)< Python repr implementation for this class. zHost: {}, Mode: {})formatr[moder,rrr__repr__szZkEnsembleNode.__repr__cCs|jSN)r_r,rrrr[szZkEnsembleNode.hostcCs|jSri)r`r,rrrportszZkEnsembleNode.port)Z on_ex_classesF)forcer c Cs|s|jtjk r|jS|f}|jdd}|dkrt|jdd}ttdd|dd}|d d }t||_ntj|_W5QRX|jS) aB This method returns the mode the zk node is currently operating in. If a nodes mode has already been fetched, then this method returns the cached mode/status. To initiate a re-fetch of the status, use the force parameter. :param force: Force re-fetch of the zk node's status/mode sruok)cmdZimokssrvrcSs |dS)NzMode: )r)lrrrz+ZkEnsembleNode.fetch_mode.. rN) rarOrXcommandlistfilterr^upperrW)r'rkZ zk_clientZruokZsrvrZ mode_linemrrr fetch_modes    zZkEnsembleNode.fetch_moder)cCs|jS)z? Property to return the internally cached mode )rar,rrrrgszZkEnsembleNode.modecCs |jtjkS)z\ Python property to check if the node is currently operating as a follower. )rgrOrUr,rrr is_followerszZkEnsembleNode.is_followercCs |jtjkS)zZ Python property to check if the node is currently operating as a leader. )rgrOrTr,rrr is_leaderszZkEnsembleNode.is_leadercCs |jtjkS)za Python property to check if the node is currently operating in standalone mode. )rgrOrVr,rrr is_standalone szZkEnsembleNode.is_standaloneN)F)r;r<r=r>r@r(rbrdrhrLr[rjr rr socketerrorrArOrwrgrxryrzrrrrrYs&     rYc@s eZdZdS)ZkEnsembleStatusNotGatheredNrNrrrrr}sr}c@seZdZdZefedddZeddZeddZ de d d d Z ee d ddZ ee d ddZ ee d ddZddZdS)ZookeeperEnsemblez This class is used to represent a zookeeper ensemble/cluster and test if there is currently a quorum in the cluster. r!cCsf||_t||_|jd|_dd|jD|_tt|jdd|_g|_ g|_ g|_ | dS)z Initialise a Zookeeper connection :param zk_config_file: Path to the zookeeper config file (default /etc/sysconfig/zookeeper) ,cSsg|]}t|dqS)rZ)rY).0hrrr )sz.ZookeeperEnsemble.__init__..rrPN) r rZzk_hosts_conn_strr^r$ensemble_nodesintlenSIZE_FOR_QUORUM_leaders _followers _standalone gather_statusr&rrrr(s zZookeeperEnsemble.__init__cCs t|jS)zS Python property that returns the number of nodes in this cluster. )rrr,rrr ensemble_size1szZookeeperEnsemble.ensemble_sizecCs t|jS)zU Python property that returns the number of followers in the quorum. )rrr,rrrfollowers_size8sz ZookeeperEnsemble.followers_sizeF)rkc Csv|jD](}z||Wqtk r,YqXqttdd|j|_ttdd|j|_ttdd|j|_dS)a Method to gather the status of the nodes in the ensemble. Note, currently if the node has a cached status, then that is what's used. We dont force a re-fetch. :param force: Force re-fetch of the zk node's status/mode cSs|jSri)ryr.rrrrnNroz1ZookeeperEnsemble.gather_status..cSs|jSri)rxrrrrrnOrocSs|jSri)rzrrrrrnProN)rrwrcrsrtrrr)r'rkr.rrrr?s zZookeeperEnsemble.gather_statusr)cCst|jdkS)zZ Python property to check if the cluster is operating in standalone mode. r)rrr,rrris_in_standalone_modeRsz'ZookeeperEnsemble.is_in_standalone_modecCst|jdkS)zx Python property to check if the cluster has a split brain, perhaps as a result of a network partition. rP)rrr,rrrhas_split_brainYsz!ZookeeperEnsemble.has_split_braincCs*|j o(|j o(t|jt|j|jkS)z Python property to check if the cluster currently has quorum. Make sure gather_status has been called before we call this method. )rrrrrrr,rrr has_quorum`s zZookeeperEnsemble.has_quorumcCsddd|jD}|jr&d}d}n8|jrVd|jdj}ddd|jD}nd}d}d d d|jD}d ||j |j|||S) rez, cSsg|] }|jqSrrZ)rnrrrrrsz.ZookeeperEnsemble.__repr__..rV{}rcSsg|] }|jqSrrZ)rfrrrrysz SPLIT BRAINz cSsg|]}d|qS)r)rf)rr.rrrr~szPNodes: {} Ensemble mode? {} Has Quorum? {} Leader: {} Followers: {} Details: {})joinrrrrfrr[r)r'Zzk_nodesZleaderZ followersdetailsrrrrhns$zZookeeperEnsemble.__repr__N)F)r;r<r=r>r?r@r(rLrrrArrrrrhrrrrr~s   r~)r>rr{enumrtypingrrtempfiler contextlibrZ kazoo.clientrZkazoo.exceptionsrZkazoo.handlers.threadingr Zprimordial.utilsr r?r@rr rCrcrrOrYr}r~rrrrs&       W i