# -*- coding: utf-8 -*- import datetime import logging import os import os.path from typing import Any, Optional, Tuple try: from pwd import getpwnam except ImportError: pass # Windows try: from grp import getgrnam except ImportError: pass # Windows from shutil import rmtree import tempfile try: import win32security import ntsecuritycon as con except ImportError: pass # Not-Windows. from primordial.settings import get_file_group, get_file_user DELAY_INCREASE_SECS = 1.0 MAX_RETRIES = 3 LOGGER = logging.getLogger(__name__) def datetime_to_epoch_seconds(in_time: datetime.datetime) -> int: """Return integer seconds since epoch. :param in_time: Datetime :returns: Integer seconds since epoch """ return int((in_time - datetime.datetime(1970, 1, 1)).total_seconds()) def touch(filename: str, times: Optional[Tuple[int, int]] = None) -> None: """Set the atime and mtime of a file. .. note:: Linux (MAYBE Windows) ONLY. This does not work on Mac OSX. :param filename: The file to touch :param times: a two-tuple of (atime, mtime) where these are integer seconds since epoch; see os.utimes :raises OSError: if file doesn't exist """ # stackoverflow.com/questions/1158076/implement-touch-using-python with open(filename, 'a'): # pylint: disable=unspecified-encoding os.utime(filename, times) def rm_path(path: str) -> None: """Simple rmtree wrapper :param path: path to recursively remove :raises: IOError if the path can't be removed """ try: rmtree(path) except OSError as e: raise IOError("Failed to remove %s" % path) from e def rm_file(filename: str) -> None: """Simple remove wrapper :param filename: filename to remove :raises: IOError if the file can't be removed """ try: os.remove(filename) except OSError as e: raise IOError("Failed to remove %s" % filename) from e def _write_file(filename: str, data: Any) -> None: """A utility helper for the atomic file utils""" outfile_handle, tmpfilename = tempfile.mkstemp(dir=os.path.dirname(filename)) outfile = os.fdopen(outfile_handle, 'wb') datalen = len(data) # write to a tmpfile then atomically move try: outfile.write(str(data).encode('utf8')) except OSError as e: raise IOError("Failed to write data to path %s (%s)" % (filename, e)) from e except Exception as e: # pylint: disable=broad-except raise IOError("Unexpectedly failed to write data to path %s (%s)" % (filename, e)) from e finally: outfile.close() # verify written data len stat_struct = os.stat(tmpfilename) if stat_struct.st_size != datalen: raise IOError("Failed to write correct number of bytes (%s, %s, %s)" % (tmpfilename, stat_struct.st_size, datalen)) try: # last man wins os.rename(tmpfilename, filename) except OSError as e: raise IOError("Failed to write data to path %s (%s)" % (filename, e)) from e def win_atomically_write_file(filename: str, data: Any, add_all_access_user: Optional[str] = None) -> None: """Safely/atomically write a file on Windows. Write a file in such a way that the data is guaranteed to be complete and not intermixed with that of another process. This implementation will NOT clobber the target file if it exists already; instead, it will fail immediately. This is a windows-specific implementation that has Windows locking semantics for open file handles and rather than changing the file's ownership, setting add_all_access_user will cause the file to have a read-write ACE added to the ACL for the directory/file for the target user. Note that in both this function and atomically_write_file, the fact that the file has locking or not at the OS level is NOT being relied on in e.g. archon's file utilities for leasing or exclusive access. Instead, this implementation uses tmpfile to guarantee uniqueness of the source of information, and then a simple atomic mv to replace the destination. Windows, unlike Linux, could support a true OS level file locking layer for exclusive access, and so a future Windows specific file utility would be feasible for true single-host, global locking for use in e.g. archon. From https://stackoverflow.com/questions/12168110/setting-folder-permissions-in-windows-using-python :param filename: filename to write :param data: data to put in file :param add_all_access_user: the user if any to add a FILE_ALL_ACCESS ACE for :raises pywintypes.error: on failure to modify the file's ACL; pywintypes.error on user not found; pywintypes.error on file not found """ if os.path.exists(filename): # _write_file() will not replace files on Windows when file already exists rm_file(filename) _write_file(filename, data) # if there is an ACE requested, set it if add_all_access_user is not None: userx, _, _ = win32security.LookupAccountName("", add_all_access_user) file_sd = win32security.GetFileSecurity(filename, win32security.DACL_SECURITY_INFORMATION) dacl = file_sd.GetSecurityDescriptorDacl() dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, userx) file_sd.SetSecurityDescriptorDacl(1, dacl, 0) win32security.SetFileSecurity(filename, win32security.DACL_SECURITY_INFORMATION, file_sd) def atomically_write_file( filename: str, data: Any, file_owner: Optional[str] = None, file_group: Optional[str] = None) -> None: """Safely/atomically write a file. Write to a tmpfile then do os.rename(); Note that this is for Unix systems only, as the implicit contract is that the destination might exist and will be overwritten. This contract is violated on Windows. Also even on Unix, the file system of the source and dest must be the same; since we're in the same directory for this move, that constraint is satisfied. We use tempfile to make it so that we don't collide on the source tmp write -- different processes or threads will not select the same tmpfile. Last man in wins for the move -- there's no general way to sync this across processes or hosts using a filesystem store. :param filename: filename to ultimately write :param data: data to put in file :param file_owner: If set, will change file to this owner if permission is available :param file_group: If set, will change file to this group if permission is available :raises: IOError on failure; OSError on permission change without appropriate permissions """ _write_file(filename, data) file_uid = -1 file_owner = file_owner or get_file_user() if file_owner is not None: file_uid = getpwnam(file_owner).pw_uid file_group_gid = -1 file_group = file_group or get_file_group() if file_group is not None: file_group_gid = getgrnam(file_group).gr_gid os.chown(filename, file_uid, file_group_gid)