var/opt/nydus/ops/primordial/sizes.py000064400000030412147205273010013753 0ustar00# -*- coding: utf-8 -*- # pylint: disable=W0212 import math from typing import List, Optional, Tuple # pylint: disable=W0611 _sizeUnits = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB'] class ByteSize: """Encapsulation for tracking sizes in bytes.""" def __init__(self, Bytes: int = 0, KiBytes: int = 0, MiBytes: int = 0, GiBytes: int = 0, TiBytes: int = 0, PiBytes: int = 0, EiBytes: int = 0) -> None: self._bytes = int(Bytes) + ( 1024 * (int(KiBytes) + ( 1024 * (int(MiBytes) + ( 1024 * (int(GiBytes) + ( 1024 * (int(TiBytes) + ( 1024 * (int(PiBytes) + ( 1024 * int(EiBytes)))))))))))) def _breakdown(self) -> List[Tuple[int, str]]: sres = [] # type: List[Tuple[int, str]] csize = self._bytes for size in _sizeUnits: sval = csize & 1023 if sval: sres.insert(0, (sval, size)) csize >>= 10 # the # of bits in 1024 if csize == 0: break return sres or [(0, '')] def __str__(self) -> str: return ','.join(['%d%s' % each for each in self._breakdown()]) def __repr__(self) -> str: return '{}({})'.format( self.__class__.__name__, ','.join(['{}ytes={}'.format(U or 'B', V) for V, U in self._breakdown()]) ) def simpleSizeStr(self) -> str: """Return the size in simple form (int ceiling).""" bd = self._breakdown() return '%d%s' % (bd[0][0] if len(bd) == 1 else bd[0][0] + 1, bd[0][1]) def approxSizeStr(self) -> str: """Return the size in floating point form to two significant digits.""" bd = self._breakdown() if (len(bd) == 1 or (_sizeUnits.index(bd[1][1]) != _sizeUnits.index(bd[0][1]) - 1 and _sizeUnits.index(bd[1][1]) != 'KiB')): return '%d%s' % (bd[0]) fv = ((bd[0][0] * 1024.0) + bd[1][0]) / 1024.0 rsp = '%.2f' % fv if rsp.endswith('.00'): return rsp[:-3] + bd[0][1] return rsp + bd[0][1] def fullSizeStr(self) -> str: """Return the size in full detail units.""" return ','.join(['%d%s' % each for each in self._breakdown()]) @property def Bytes(self) -> int: return self._bytes @property def KiBytes(self) -> float: return self.Bytes / 1024.0 @property def MiBytes(self) -> float: return self.KiBytes / 1024.0 @property def GiBytes(self) -> float: return self.MiBytes / 1024.0 @property def TiBytes(self) -> float: return self.GiBytes / 1024.0 @property def PiBytes(self) -> float: return self.TiBytes / 1024.0 @property def EiBytes(self) -> float: return self.PiBytes / 1024.0 def __trunc__(self): return self._bytes def __index__(self): return self._bytes # Comparisons def __eq__(self, o): if o is None: return self._bytes == 0 try: return self._bytes == o._bytes except AttributeError: return self._bytes == o def __ne__(self, o): if o is None: return self._bytes != 0 try: return self._bytes != o._bytes except AttributeError: return self._bytes != o def __gt__(self, o): if o is None: return self._bytes > 0 try: return self._bytes > o._bytes except AttributeError: return self._bytes > o def __ge__(self, o): if o is None: return self._bytes >= 0 try: return self._bytes >= o._bytes except AttributeError: return self._bytes >= o def __lt__(self, o): if o is None: return False try: return self._bytes < o._bytes except AttributeError: return self._bytes < o def __le__(self, o): if o is None: return self._bytes == 0 try: return self._bytes <= o._bytes except AttributeError: return self._bytes <= o # Arithmetic operations # Add is always two sizes, result is a size def __add__(self, o): if hasattr(o, '_bytes'): return self.__class__(self._bytes + o._bytes) return self.__class__(self._bytes + o) def __iadd__(self, o): if hasattr(o, '_bytes'): self._bytes += o._bytes else: self._bytes += o return self # Subtract is always two sizes, result is a size def __sub__(self, o): if hasattr(o, '_bytes'): return self.__class__(Bytes=max(0, self._bytes - o._bytes)) return self.__class__(Bytes=max(0, self._bytes - o)) def __isub__(self, o): if hasattr(o, '_bytes'): self._bytes -= max(o._bytes, 0) else: self._bytes -= o return self # Can only multiply by another integer, result is always a size def __mul__(self, o): return self.__class__(Bytes=self._bytes * o) def __rmul__(self, o): return self.__class__(Bytes=self._bytes * o) def __imul__(self, o): self._bytes *= o return self # Can Div by another size or an integer, result is the opposite of the divisor def __floordiv__(self, o): if isinstance(o, ByteSize): return self._bytes // o._bytes return self.__class__(int(self._bytes // o)) # Mod is by size or integer, result is always a size def __mod__(self, o): return self.__class__(self._bytes % (o._bytes if isinstance(o, ByteSize) else o)) def __divmod__(self, o): return self // o, self % o def __truediv__(self, o): if isinstance(o, ByteSize): return self._bytes * 1.0 / o._bytes return self.__class__(int(self._bytes / o)) # No rdiv operators for sizes def __idiv__(self, o): self._bytes /= o # only update divide by integer since result should be a size self._bytes = int(self._bytes) return self def __itruediv__(self, o): return self.__idiv__(o) def __ifloordiv__(self, o): self._bytes //= o # only update divide by integer since result should be a size self._bytes = int(self._bytes) return self def __imod__(self, o): self._bytes %= o # only update divide by integer since result should be a size self._bytes = int(self._bytes) return self # Boolean comparisons def __nonzero__(self): return bool(self._bytes) # for Python2 def __bool__(self): return bool(self._bytes) # for Python3 class MemSize(ByteSize): """Encapsulation for tracking amount of memory""" class DiskSize(ByteSize): """Encapsulation for tracking size of persistent storage (disk)""" class InvalidMemSize(MemSize): def __init__(self, invalid_value): # pylint: disable=W0231 self._invVal = invalid_value def __str__(self): return 'InvalidMemSize "%s"' % str(self._invVal) def __repr__(self): return 'InvalidMemSize(%s)' % repr(self._invVal) # let property accessors throw exception on no self._bytes member def __eq__(self, o): return isinstance(o, InvalidMemSize) def __ne__(self, o): return not isinstance(o, InvalidMemSize) class InvalidDiskSize(DiskSize): def __init__(self, invalid_value): # pylint: disable=W0231 self._invVal = invalid_value def __str__(self): return 'InvalidDiskSize "%s"' % str(self._invVal) def __repr__(self): return 'InvalidDiskSize(%s)' % repr(self._invVal) # let property accessors throw exception on no self._bytes member def __eq__(self, o): return isinstance(o, InvalidDiskSize) def __ne__(self, o): return not isinstance(o, InvalidDiskSize) class ByteSizes: """Encapsulation for an aggregation of byte size values. The naturalUnits indicates which units to report sizes in if the size is not indicated or is a close approximation of those units (naturalUnits should be one of _sizeUnits) """ def __init__(self, naturalUnits: str = 'B') -> None: self._accum = ByteSize(0) self._minSize = None # type: Optional[ByteSize] self._maxSize = ByteSize(0) self._numSizes = 0 self._natUnit = naturalUnits self._sumUnits = None # type: Optional[str] def __iadd__(self, o): if not isinstance(o, ByteSize): o = ByteSize(Bytes=o) self._accum += o self._numSizes += 1 if self._minSize is None or o < self._minSize: self._minSize = o if o > self._maxSize: self._maxSize = o self._sumUnits = None return self def __str__(self) -> str: return str(self._accum) @property def minimumSize(self) -> ByteSize: return ByteSize(0) if self._minSize is None else self._minSize @property def maximumSize(self) -> ByteSize: return self._maxSize @property def averageSize(self) -> ByteSize: return ByteSize(0) if self._numSizes == 0 else (self._accum / self._numSizes) def _calcSummaryUnits(self) -> str: suI = len(_sizeUnits) - 1 for suI in range(len(_sizeUnits) - 1, -1, -1): baseSize = ByteSize(Bytes=1024 ** suI) if baseSize > self._accum: continue if suI + 1 == _sizeUnits.index(self._natUnit) and self._accum % baseSize >= 100: return self._natUnit return _sizeUnits[suI] return self._natUnit def summaryUnits(self) -> str: if self._sumUnits is None: self._sumUnits = self._calcSummaryUnits() return self._sumUnits def fullSizeStr(self, ofSize: Optional[ByteSize] = None) -> str: """Return the specified size (or total size) in full detail units. :param: The size to be displayed """ if ofSize is None: return self._accum.fullSizeStr() return ofSize.fullSizeStr() def simpleSizeStr(self, ofSize: Optional[ByteSize] = None, withUnits: bool = False) -> str: """Returns the specified size (or total size) in simple form (int ceiling) based on the summary Units. The withUnits can be used to enable or suppress reporting of units in the output (units are summaryUnits() in either case). :param ofSize: The size to simplify :param withUnits: Whether to include units in the output """ if ofSize is None: return self._accum.simpleSizeStr() val = math.ceil({'': ofSize.Bytes, 'B': ofSize.Bytes, 'KiB': ofSize.KiBytes, 'MiB': ofSize.MiBytes, 'GiB': ofSize.GiBytes, 'TiB': ofSize.TiBytes, 'PiB': ofSize.PiBytes, 'EiB': ofSize.EiBytes}[self.summaryUnits()]) if withUnits: return '%d%s' % (val, self.summaryUnits()) return str(int(val)) def approxSizeStr(self, ofSize: Optional[ByteSize] = None, withUnits: bool = False) -> str: """Give a string representation of a close approximation of the size. Returns the specified size (or total size) in floating point form to two significant digits, based on the on the summary Units. The withUnits can be used to enable or suppress reporting of units in the output (units are summaryUnits() in either case). :param ofSize: The size to be represented :param withUnits: Whether to include units in the output """ if ofSize is None: return self._accum.approxSizeStr() val = int({'': ofSize.Bytes, 'B': ofSize.Bytes, 'KiB': ofSize.KiBytes, 'MiB': ofSize.MiBytes, 'GiB': ofSize.GiBytes, 'TiB': ofSize.TiBytes, 'PiB': ofSize.PiBytes, 'EiB': ofSize.EiBytes}[self.summaryUnits()] * 1024.0) / 1024.0 return '%.2f%s' % (val, self.summaryUnits() if withUnits else '')