var/opt/nydus/ops/customer_local_ops/operating_system/windows.py000064400000071520147205654770021463 0ustar00from pathlib import Path from typing import Any, Dict, List, Union, Tuple import json import logging import os import re from customer_local_ops import NydusResult, Ops, OpType from customer_local_ops.exceptions import DecryptError from customer_local_ops.operating_system import POWERSHELL_PATH from customer_local_ops.util import b64str, execute from customer_local_ops.util.execute import runCommand, run_powershell LOG = logging.getLogger(__name__) def run_powershell_file(script: str, *args: Any, **kw: Any): return execute.run_powershell_file(POWERSHELL_PATH / script, *args, **kw) class Windows(Ops): DISK_UTILIZATION_PATH = r"C:\\" # double backslash still required by Python syntax op_type = OpType.OPERATING_SYSTEM DISTRO = 'Windows' QEMU_PACKAGE_NAME = 'virtio-win.iso' DEVCON = 'devcon.exe' def _get_vm_tag(self): """Return this VM's tag.""" command_get_cn = r""" $s = Get-childitem -Path Cert:\LocalMachine\My\ | Where-Object {$_.Issuer -match "Nydus Customer Services"} | Select-Object -ExpandProperty Subject $s = $s -replace "(CN=)(.*?),.*",'$2' $s """ return run_powershell(command_get_cn, 'get_vm_tag', True) def _run_powershell_op(self, script_file: Union[str, Path], op_name: str, **kw: Any) -> NydusResult: """Run a PowerShell script and return a Nydus result for the outcome. :param script_file: PowerShell script name, no path (must be in operating_system/powershell/) :param op_name: name of the Nydus operation :param kw: additional keyword arguments to pass to run_powershell_file :returns: standard CLO Nydus result with success based on script exit code (see build_result_from_cmd_output) """ tag = kw.pop('tag', op_name) return self.build_result_from_cmd_output( *run_powershell_file(script_file, tag, **kw), op_name) def add_user(self, payload: Dict[str, Any], unused: Any = None) -> NydusResult: """Create a user that can access this server over Remote Desktop. :param payload: dictionary containing username and encrypted_password :returns: result of the operation """ if payload.get('fail_if_exists', False): raise NotImplementedError('fail_if_exists:True not implemented') op_name = 'add_user' username = payload['username'] encrypted_password = payload['encrypted_password'] try: password = self.decrypt(encrypted_password) except DecryptError as ex: return False, self.build_result_dict(ex.outs, ex.errs, op_name) # Having trouble decoding a UTF-8-encoded string with code points above 127 (like 両) on the other side. # Base64-encoding password to limit character set in transit to [a-zA-Z0-9+/=]. password_b64 = b64str(password) exit_code, outs, errs = run_powershell_file( 'add_user.ps1', op_name, script_file_args=[username, 'Remote Desktop Users'], stdin=password_b64, quiet=True) outs = outs.replace(password_b64, '') # Strip stdin echo from stdout return exit_code == 0, self.build_result_dict(outs, errs, op_name) def add_user_to_group(self, username: str, group: str) -> NydusResult: """Add a user to a group. :param username: name of user to add to the group :param group: user will be added to this group :returns: result of the operation """ return self._run_powershell_op( 'add_user_to_group.ps1', 'add_user_to_group', script_file_args=[username, group]) def _user_needs_logout(self, username) -> bool: """Check if the specified user needs to log out :param username: name of user to check :returns: True or False """ get_sessions_command = 'query user' _, outs, _ = run_powershell(get_sessions_command, 'get_active_sessions') if username and (username.lower() in outs): return True return False def remove_user(self, username): op_name = 'remove_user' # check whether user is not performed logout is_user_active = self._user_needs_logout(username) if is_user_active: return False, self.build_result_dict("User is still active.", "User needs to perform logout before user can be removed.", op_name) command = 'net user %s /DELETE ; Remove-Item -Path "C:\\Users\\%s" -Recurse -Force' % (username, username) exit_code, outs, errs = run_powershell(command, op_name) return exit_code == 0, self.build_result_dict(outs, errs, op_name) def change_password(self, payload: Dict[str, Any]) -> NydusResult: """Change a user's password. :param payload: dictionary containing username and new encrypted_password :returns: result of the operation """ op_name = 'change_password' username = payload['username'] encrypted_password = payload['encrypted_password'] try: password = self.decrypt(encrypted_password) except DecryptError as ex: return False, self.build_result_dict(ex.outs, ex.errs, op_name) # Having trouble decoding a UTF-8-encoded string with code points above 127 (like 両) on the other side. # Base64-encoding password to limit character set in transit to [a-zA-Z0-9+/=]. password_b64 = b64str(password) exit_code, outs, errs = run_powershell_file( 'change_password.ps1', op_name, script_file_args=[username], stdin=password_b64, quiet=True) outs = outs.replace(password_b64, '') # Strip stdin echo from stdout return exit_code == 0, self.build_result_dict(outs, errs, op_name) def configure_mta(self, payload, unused=None): # Function is split in order that control panel ops can call underlying os function directly, # without incurring op overhead such as formatted retry results # The `unused` param is an artifact of Archon workflows requiring an I/O # chain for sequencing. # op_name = 'configure_mta' return self.do_configure_mta(payload, op_name) def enable_admin(self, username, unused=None): LOG.info("Adding user sudo permissions for %s via administrators group", username) exit_code, outs, errs = runCommand(['net', 'localgroup', 'administrators', username, '/add'], "add user %s to administrators group" % username) op_name = 'enable_admin' if exit_code != 0: if 'already a member of the group' in errs or 'name is not a member of the group' in errs: LOG.info("Group already properly set.") else: return False, self.build_result_dict(outs, errs, op_name) return self.build_result_dict(outs, errs, op_name) def disable_admin(self, username, unused=None): LOG.info('Removing user sudo permissions for %s ' 'via administrators group', username) op_name = 'disable_admin' commands = [ ('add RDC Access', ['net', 'localgroup', 'Remote Desktop Users', username, '/add']), ('remove Admin Access', ['net', 'localgroup', 'administrators', username, '/delete']), ] for purpose, cmd in commands: exit_code, outs, errs = runCommand(cmd, purpose) if exit_code != 0: if 'already a member of the group' in errs or 'name is not a member of the group' in errs: LOG.info('Group already properly set.') else: return False, self.build_result_dict(outs, errs, op_name) return self.build_result_dict(outs, errs, op_name) def disable_all_admins(self, unused=None): # The `unused` param is an artifact of Archon workflows requiring an I/O # chain for sequencing. # op_name = 'disable_all_admins' command = r""" net user tempHfsAdmin /delete $usersToKeep=@("admin", "Administrator", "cloudbase-init", "DefaultAccount", "Guest", "IME_ADMIN", "IME_USER", "IUSRPLESK_atmail", "IUSRPLESK_horde", "IUSRPLESK_smwebmail", "IUSRPLESK_sqladmin", "IUSR_FS_PUBLIC", "IUSR_FS_UNLISTED", "IWAM_FILESHARING", "IWAM_plesk(default)", "IWAM_sitepreview", "nydus", "Plesk Administrator", "psaadm") Get-CimInstance -ClassName win32_group -Filter "name = 'administrators'" | ` Get-CimAssociatedInstance -Association win32_groupuser | ` %{ if($usersToKeep -notcontains $_.name) { $userName=$_.name "Adding user to 'Remote Desktop Users' group: $userName" net localgroup "Remote Desktop Users" $userName /add "Removing user from 'Administrators' group: $userName" net localgroup "administrators" $userName /delete } }""" exit_code, outs, errs = run_powershell(command, 'Disable All Admins') return exit_code == 0, self.build_result_dict(outs, errs, op_name) def shutdown_clean(self, unused=None): # The `unused` param is an artifact of Archon workflows requiring an I/O # chain for sequencing. # raise NotImplementedError def snapshot_clean(self, payload, unused=None): # pylint: disable=W0221 if not payload['clean']: return op_name = 'snapshot_clean' command = r""" start-transcript -path "C:\Windows\temp\snapClean.txt" $dirs = "C:\\thespian\\director*","C:\\nydus\\log*" Foreach ($dir in $dirs) { $programFolder=try{((Get-ChildItem $dir) | Sort-Object lastwritetime -desc)[0].fullname}catch{"NA"}; "Removing Logs: $($programFolder)\*.log" if(Test-Path $programFolder){ remove-item "$($programFolder)\\*.log" -force -ErrorAction SilentlyContinue; } } $name = 'testaccount' $acct = New-Object Security.Principal.NTAccount($name) $sid = $acct.Translate([Security.Principal.SecurityIdentifier]).Value Get-WmiObject Win32_UserProfile -Filter "sid='${sid}'" | ForEach-Object { $_.Delete() } "Removing Bootscript" $bootScript="C:\\Windows\\Temp\\bootscript.ps1"; if(Test-Path $bootScript){ remove-item $bootScript -force} #clear any non-default users $usersToKeep=@("admin", "Administrator", "cloudbase-init", "DefaultAccount", "Guest", "IME_ADMIN", "IME_USER", "IUSRPLESK_atmail", "IUSRPLESK_horde", "IUSRPLESK_smwebmail", "IUSRPLESK_sqladmin", "IUSR_FS_PUBLIC", "IUSR_FS_UNLISTED", "IWAM_FILESHARING", "IWAM_plesk(default)", "IWAM_sitepreview", "nydus", "Plesk Administrator", "psaadm") Get-WmiObject -query "select Name from Win32_UserAccount where LocalAccount='True'" | ?{$usersToKeep -notcontains $_.name} | %{ $userName=$_.name $cn = [ADSI]"WinNT://$($env:Computername)" "Removing user: $userName" try{ $cn.Delete('User',"$userName") }catch{ $_.Exception.Message } } #clear event logs: function Clear-All-Event-Logs ($ComputerName="localhost"){ $Logs = Get-EventLog -ComputerName $ComputerName -List | ForEach {$_.Log} $Logs | ForEach {Clear-EventLog -Comp $ComputerName -Log $_ } Get-EventLog -ComputerName $ComputerName -List } Clear-All-Event-Logs #remove old cloud-init logs remove-item "C:\Program Files\Cloudbase Solutions\Cloudbase-Init\log\*.log" #Disable password policy secedit /export /cfg c:\secpol.cfg (gc C:\secpol.cfg).replace("PasswordComplexity = 1", "PasswordComplexity = 0") | Out-File C:\secpol.cfg secedit /configure /db c:\windows\security\local.sdb /cfg c:\secpol.cfg /areas SECURITYPOLICY rm -force c:\secpol.cfg -confirm:$false #add hostname setting $cloudconfpath="C:\Program Files\Cloudbase Solutions\Cloudbase-Init\conf\cloudbase-init.conf" (gc $cloudconfpath) -replace "plugins=cloudbaseinit.plugins.windows.removeqxl.RemoveQXLPlugin",` "plugins=cloudbaseinit.plugins.common.sethostname.SetHostNamePlugin,cloudbaseinit.plugins.windows.removeqxl.RemoveQXLPlugin"` | sc -path $cloudconfpath stop-transcript """ r = run_powershell(command, 'snapShot Cleanup') # return (r[0] == 0,) exit_code, outs, errs = r[0], r[1], r[2] return exit_code == 0, self.build_result_dict(outs, errs, op_name) def snapshot_prep(self, payload): # The goal here is to make the VM only restart gdh-vds if it is on the same IP as it was shut down as. # When a VM is brought up from a published snapshot, we don't want gdh-vds to start # Adding this snapShotIP.txt file alters how serviceRunner.ps1 processes. op_name = 'snapshot_prep' ip = payload['internalAddress'] installLoc = r'C:\Windows\Temp' try: self.write_file(installLoc + r"\snapShotIP.txt", ip) except IOError as ex: LOG.exception("Error writing snapShotIP.sh for %s: %s", ip, ex) return False, self.build_result_dict('', str(ex), op_name) success_message = "snapShotIP.txt written for {0}".format(ip) LOG.info(success_message) return self.build_result_dict(success_message, '', op_name) def do_configure_mta(self, payload, op_name, intermediate_result: Dict[str, Any] = None): relay = payload.get('relay_address') LOG.debug("configuring MTA for %s", relay) if relay is None: return self.build_result_dict( 'No relay; skipping MTA configuration', '', op_name) smtp_registry = r"HKLM\SOFTWARE\Wow6432Node\Mail Enable\Mail Enable\Connectors\SMTP" exit_code, outs, errs = runCommand(['reg', 'add', smtp_registry, '/v', "Forward All Outbound Host", '/d', relay, '/f'], "set mail relay") if exit_code != 0: return False, self.build_result_dict(outs, errs, op_name) exit_code, outs, errs = runCommand(['reg', 'add', smtp_registry, '/v', "Forward All Outbound Enabled", '/d', '1', '/f'], "enable mail relay") if exit_code != 0: # pylint: disable=no-else-return return False, self.build_result_dict(outs, errs, op_name) else: return self.build_result_dict(outs, errs, op_name) def configure_ips(self, vm_address, addresses, gateway, unused=None): op_name = 'configure_ips' exit_code, outs, errs = run_powershell_file( 'setNetwork.ps1', 'configure network', script_file_args=[vm_address] + addresses) if exit_code != 0: LOG.error("failed to configure ips: %s (%s)", outs, errs) return False, self.build_result_dict(outs, errs, op_name) return self.build_result_dict(outs, errs, op_name) def change_hostname(self, payload, unused=None): op_name = 'change_hostname' command = r""" [string]$hostname='""" + payload['hostname'] + """'; if($hostname.tolower().StartsWith('www.')){ $hostname=$hostname.substring(4) } if($hostname.IndexOf('.') -gt -1){ $hostSet=$false; $hostname.Split('.') | %{ if($_.Length -gt 0 -and $hostSet -eq $false){ $hostname=$_; $hostSet=$true; } } } Rename-Computer -NewName $hostname -Force; """ exit_code, outs, errs = run_powershell(command, 'changeHostname') return exit_code == 0, self.build_result_dict(outs, errs, op_name) def _get_cpu_utilization(self): _, outs, _ = run_powershell_file( 'cpu_utilization.ps1', "get_utilization|get_cpu_utilization") utilization = json.loads(outs) cpu = float(utilization['cpuTimePercent']) return { 'cpuUsed': cpu } def _get_memory_utilization(self): _, outs, _ = run_powershell_file( 'memory_utilization.ps1', "get_utilization|get_memory_utilization") utilization = json.loads(outs) memory_total = int(utilization['ramTotalMiB']) memory_free = int(utilization['ramFreeMiB']) memory_used = memory_total - memory_free return { 'memoryTotal': memory_total, 'memoryUsed': memory_used, } def install_panopta(self, payload, *args, **kwargs): # Installs Fortimonitor agent on the server customer_key = payload['customer_key'] template_ids = payload['template_ids'] LOG.info("Installing Fortimonitor agent - Customer Key: %s, Template IDs: %s", customer_key, template_ids) manifest_file_contents = """templates = {template_ids} enable_countermeasures = false""".format(**payload) server_key = payload.get('server_key') if server_key: manifest_file_contents += "\nserver_key = {server_key}".format(server_key=server_key) fqdn = payload.get('fqdn') if fqdn: manifest_file_contents += "\nfqdn = {fqdn}".format(fqdn=fqdn) server_name = payload.get('serverName') if server_name: manifest_file_contents += "\nserver_name = {serverName}".format(serverName=server_name) disable_server_match = payload.get('disable_server_match', False) if disable_server_match: manifest_file_contents += "\ndisable_server_match = true" command = r"""$Manifest = @" {manifest_file_contents} "@ $Manifest | Out-File -FilePath "C:\FortimonitorAgent.manifest" mkdir c:\fortimonitor_temp cd c:\fortimonitor_temp Invoke-WebRequest https://repo.fortimonitor.com/install/win/fortimonitor_agent_windows.ps1 -OutFile c:\fortimonitor_temp\fortimonitor_agent_windows.ps1 # noqa E501 c:\fortimonitor_temp\fortimonitor_agent_windows.ps1 -customer_key {customer_key}"""\ .format(manifest_file_contents=manifest_file_contents, customer_key=customer_key) exit_code, outs, errs = run_powershell(command, 'Install Panopta') return exit_code == 0, self.build_result_dict(outs, errs, 'install_panopta') def _delete_agent(self, agent_name: str) -> Tuple[int, str, str]: """ delete agent if present from the server """ cmd = r"""Get-Package -Name '{agent_name} Agent'""".format(agent_name=agent_name) _, outs, _ = run_powershell(cmd, "Check {agent_name} Installed".format(agent_name=agent_name)) if outs: if agent_name == "Panopta": command = r"""$app = Get-WmiObject -Class Win32_Product | Where-Object { $_.Name -match "Panopta Agent" } $app.Uninstall();""" else: command = r"""$app = Get-WmiObject -Class Win32_Product | Where-Object { $_.Name -match "FortiMonitor Agent" } $app.Uninstall();""" return run_powershell(command, "Delete {agent_name}".format(agent_name=agent_name)) return 0, 'Panopta agent is not installed', '' def _delete_agent_manifest(self, manifest_name: str) -> Tuple[int, str, str]: """ delete agent manifest file """ if os.path.exists(r"C:\{manifest_name}Agent.manifest".format(manifest_name=manifest_name)): command = r"""Remove-Item –path C:\{manifest_name}Agent.manifest –force""".format( manifest_name=manifest_name) return run_powershell(command, "Delete {manifest_name} manifest file".format(manifest_name=manifest_name)) return 0, '{manifest_name} manifest file is not present'.format(manifest_name=manifest_name), '' def _delete_temp_directory(self, temp_dir: str) -> Tuple[int, str, str]: """ delete agent's temporary directory """ if os.path.exists(r"C:\{temp_dir}".format(temp_dir=temp_dir)): command = r"""Remove-Item -path C:\{temp_dir} -Recurse -force""".format( temp_dir=temp_dir) return run_powershell(command, "Delete {temp_dir} directory".format(temp_dir=temp_dir)) return 0, '{temp_dir} directory is not present'.format(temp_dir=temp_dir), '' def delete_panopta(self, *args, **kwargs) -> NydusResult: """ delete panopta and fm-agent if present from the server and removes the panopta/fm-agent manifest file :return result of uninstall operation """ exit_code1, outs, errs = self._delete_agent("Panopta") exit_code2, output, error = self._delete_agent("FortiMonitor") exit_code = exit_code1 + exit_code2 outs = outs + '\n' + output errs = errs + '\n' + error if exit_code != 0: LOG.error('failed to remove panopta/fm-agent agent') return False, self.build_result_dict(outs, errs, 'delete_panopta') try: exit_code1, outs2, errs2 = self._delete_agent_manifest("Panopta") exit_code2, output, error = self._delete_agent_manifest("Fortimonitor") outs2 = outs2 + '\n' + output errs2 = errs2 + '\n' + error outs = outs + '\n' + outs2 errs = errs + '\n' + errs2 exit_code3, outs2, errs2 = self._delete_temp_directory("panopta_temp") exit_code4, output, error = self._delete_temp_directory("fortimonitor_temp") exit_code = exit_code1 + exit_code2 + exit_code3 + exit_code4 outs2 = outs2 + '\n' + output errs2 = errs2 + '\n' + error outs = outs + '\n' + outs2 errs = errs + '\n' + errs2 except (IOError, OSError) as ex: msg = "Failed to unlink Panopta/FortiMonitor manifest file or temporary directory" LOG.exception(msg) return False, self.build_result_dict('', str(ex), 'delete_panopta') return exit_code == 0, self.build_result_dict(outs, errs, 'delete_panopta') def upgrade_panopta(self, *args, **kwargs): """ upgrade panopta on the server :return result of upgrade operation""" cmd = r"""Get-Package -Name 'Panopta Agent'""" exit_code, outs, _ = run_powershell(cmd, "Check Panopta Installed") if outs: command = r"""Invoke-WebRequest https://repo.fortimonitor.com/install/win/fm-upgrade.ps1 -OutFile fm-upgrade.ps1 # noqa E501 pylint: disable=line-too-long .\fm-upgrade.ps1 -autoupdate""" exit_code, outs, errs = run_powershell(command, "Upgrade Panopta") if exit_code != 0: LOG.error('failed to upgrade panopta agent to fortimonitor agent') return False, self.build_result_dict(outs, errs, 'upgrade_panopta') else: exit_code, outs, errs = 0, 'Panopta agent is not installed', '' def get_panopta_server_key(self, *args, **kwargs): """ Look into the Agent.config xml file and find the ServerKey add component. :return: The value of the ServerKey add component in the Agent.config file. """ agent_config = r"""C:\Program Files (x86)\PanoptaAgent\Agent.config""" if not os.path.exists(agent_config): agent_config = r"""C:\Program Files (x86)\FortiMonitorAgent\Agent.config""" cmd = r"""[System.IO.File]::Exists("{agent_config}")""".format(agent_config=agent_config) exit_code, outs, _ = run_powershell(cmd, "Check Config File") if outs == 'False': raise ValueError('Panopta config file not found') command = r"""[xml]$config = Get-Content "{agent_config}" $serverKey = $config.agent.service.add | ? {{$_.key -eq "ServerKey"}} | % {{echo $_.value}} echo $serverKey """.format(agent_config=agent_config) exit_code, outs, _ = run_powershell(command, "Get Server Key") return exit_code == 0, {'outs': outs, 'append_info': False} def update_invalid_resolvers(self, valid_resolvers: List[str], invalid_resolvers: List[str], *args, **kwargs) -> NydusResult: """ If the server has any of the listed invalid resolvers, replace them with the valid resolvers. Otherwise, leave the resolvers as-is. :param valid_resolvers: A list of valid dns nameservers to be added to the server (if needed) :param invalid_resolvers: A list of invalid dns nameservers to be removed from the server :return: A Nydus result """ # Resolver values need to be in a comma-separated list for the powershell script args # e.g. 10.1.1.1,10.1.2.1 valid_resolvers_arg = ','.join(valid_resolvers) invalid_resolvers_arg = ','.join(invalid_resolvers) exit_code, outs, errs = run_powershell_file(script="update_invalid_resolvers.ps1", tag="update_invalid_resolvers", script_file_args=[ '-validResolvers', valid_resolvers_arg, '-invalidResolvers', invalid_resolvers_arg ]) if exit_code != 0: if errs: # Extract plain error message from powershell gubbins. # If format is not as we expect, leave error message as-is. err_array = errs.split('+') err_msg = err_array[0] err_msg_array = err_msg.split(':') if len(err_msg_array) >= 3: errs = err_msg_array[2].strip().replace('\r\n', '') return exit_code == 0, self.build_result_dict(outs, errs, 'update_invalid_resolvers') def enable_winexe(self, *_: Any) -> NydusResult: """Configure the system so it can be accessed remotely with winexe. We use winexe in our integration tests to inspect VMs. :param *_: op chaining arguments :returns: operation result """ return self._run_powershell_op('enable_winexe.ps1', 'enable_winexe') def open_port(self, port: int, *_: Any) -> NydusResult: """Configure the system so the specified port can be accessed remotely. :param port: The port number to open :param *_: op chaining arguments :returns: operation result """ port = int(port) return self._run_powershell_op('configure_port.ps1', 'open_port', script_file_args=[port, "open"]) def close_port(self, port: int, *_: Any) -> NydusResult: """Configure the system so the specified port cannot be accessed remotely. :param port: The port number to close :param *_: op chaining arguments :returns: operation result """ port = int(port) return self._run_powershell_op('configure_port.ps1', 'close_port', script_file_args=[port, "close"]) def install_qemu_agent(self, pypi_url: str, *args): """ Install qemu guest agent on the vm :param pypi_url: The url for the pypi server where the package is located """ package_url = pypi_url + "/-/" + self.DISTRO + "/" + self.QEMU_PACKAGE_NAME return self._run_powershell_op("install_qemu_agent.ps1", "install_qemu_agent", script_file_args=[package_url]) def install_devcon(self, pypi_url: str, *args): """ Install devcon on the vm :param pypi_url: The url for the pypi server where the package is located """ package_url = pypi_url + "/-/" + self.DISTRO + "/" + self.DEVCON return self._run_powershell_op("install_devcon.ps1", "install_devcon", script_file_args=[package_url]) def get_os_info(self, *args: Any) -> Any: """Returns the Operating System information using systeminfo""" op_name = 'get_os_info' os_info = 'NAME=windows\n' command = r""" systeminfo | Select-String "OS" """ exit_code, outs, errs = run_powershell(command, op_name) if exit_code == 0 and outs: for line in outs.split('\n'): if line.startswith('OS Name:'): # Using regular expression to get the version/year. # Example string: "OS Name: Microsoft Windows Server 2022 Standard" pattern = r'\b\d{4}\b' version = re.search(pattern, line) os_info += 'VERSION={version}\n'.format(version=version.group(0)) return os_info return False, self.build_result_dict(outs, errs, op_name) def get_file_info(self, paths: str) -> Dict[str, Any]: """ Op for retrieving file info (if file exists), for now for Windows returning only info whether file exists :param paths: Comma-separated list of paths of files :return: Dict with filenames and files related info (if file present) """ paths_list = map(str.strip, paths.split(",")) def _get_file_dict(path: str) -> Dict[str, Any]: file_loc = Path(os.path.normpath(path)) return { "fileName": path, "exists": file_loc.exists() } out_list = list(map(_get_file_dict, paths_list)) return {"files": out_list} class Windows2016(Windows): pass class Windows2019(Windows): pass class Windows2022(Windows): pass