diff --git a/bin/CraftOS/OsUtilsBase.py b/bin/CraftOS/OsUtilsBase.py index dae84fc3c..ec215441b 100644 --- a/bin/CraftOS/OsUtilsBase.py +++ b/bin/CraftOS/OsUtilsBase.py @@ -1,64 +1,68 @@ import abc import os import platform import sys from CraftOS.OsDetection import OsDetection class OsUtilsBase(OsDetection, metaclass=abc.ABCMeta): @abc.abstractstaticmethod def rm(path, force=False): """ Removes a file""" pass @abc.abstractstaticmethod def rmDir(path, force=False): """ Removes a file""" pass @abc.abstractstaticmethod def getFileAttributes(path): """ Returns the attributes""" pass @abc.abstractstaticmethod def removeReadOnlyAttribute(path): """ Removes the readonly flag""" pass def setConsoleTitle(title): """ Set the console title """ return True @staticmethod def supportsSymlinks() -> bool: return True @staticmethod def toWindowsPath(path : str) -> str: path = os.path.normpath(path) return path.replace("/", "\\") @staticmethod def toUnixPath(path : str) -> str: path = os.path.normpath(path) return path.replace("\\", "/") @staticmethod def toMSysPath(path): path = OsUtilsBase.toUnixPath(path) drive, path = os.path.splitdrive(path) if drive: return f"/{drive[0].lower()}{path}" return path @staticmethod def toNativePath(path : str) -> str: """Return a native path""" pass @staticmethod def enableAnsiColors(): pass + + @staticmethod + def killProcess(name : str="*", prefix : str=None) -> bool: + pass diff --git a/bin/CraftOS/unix/osutils.py b/bin/CraftOS/unix/osutils.py index ca5b39dac..26814814a 100644 --- a/bin/CraftOS/unix/osutils.py +++ b/bin/CraftOS/unix/osutils.py @@ -1,48 +1,53 @@ import os import shutil import sys import CraftOS.OsUtilsBase from CraftCore import CraftCore class OsUtils(CraftOS.OsUtilsBase.OsUtilsBase): @staticmethod def rm(path, force=False): CraftCore.log.debug("deleting file %s" % path) try: os.remove(path) return True except OSError as e: CraftCore.log.warning("could not delete file %s: %s" % (path, e)) return False @staticmethod def rmDir(path, force=False): CraftCore.log.debug("deleting directory %s" % path) try: shutil.rmtree(path) return True except OSError: return OsUtils.rm(path, force) return False @staticmethod def isLink(path): return os.path.islink(path) @staticmethod def removeReadOnlyAttribute(path): return False @staticmethod def setConsoleTitle(title): sys.stdout.buffer.write(b"\x1b]0;") sys.stdout.buffer.write(bytes(title, "UTF-8")) sys.stdout.buffer.write(b"\x07") sys.stdout.flush() return True @staticmethod def toNativePath(path : str) -> str: return OsUtils.toUnixPath(path) + + @staticmethod + def killProcess(name : str="*", prefix : str=None) -> bool: + CraftCore.log.warning("killProcess is not implemented") + return True diff --git a/bin/CraftOS/win/osutils.py b/bin/CraftOS/win/osutils.py index 78acbba0b..166d4b71b 100644 --- a/bin/CraftOS/win/osutils.py +++ b/bin/CraftOS/win/osutils.py @@ -1,64 +1,74 @@ import tempfile import ctypes import os import platform +import subprocess import CraftOS.OsUtilsBase from CraftCore import CraftCore class FileAttributes(): # https://msdn.microsoft.com/en-us/library/windows/desktop/gg258117(v=vs.85).aspx FILE_ATTRIBUTE_READONLY = 0x1 FILE_ATTRIBUTE_REPARSE_POINT = 0x400 class OsUtils(CraftOS.OsUtilsBase.OsUtilsBase): @staticmethod def rm(path, force=False): CraftCore.log.debug("deleting file %s" % path) if force: OsUtils.removeReadOnlyAttribute(path) return ctypes.windll.kernel32.DeleteFileW(path) != 0 @staticmethod def rmDir(path, force=False): CraftCore.log.debug("deleting directory %s" % path) if force: OsUtils.removeReadOnlyAttribute(path) return ctypes.windll.kernel32.RemoveDirectoryW(path) != 0 @staticmethod def isLink(path): return os.path.islink(path) \ | OsUtils.getFileAttributes(path) & FileAttributes.FILE_ATTRIBUTE_REPARSE_POINT # Detect a Junction @staticmethod def getFileAttributes(path): return ctypes.windll.kernel32.GetFileAttributesW(path) @staticmethod def removeReadOnlyAttribute(path): attributes = OsUtils.getFileAttributes(path) return ctypes.windll.kernel32.SetFileAttributesW(path, attributes & ~ FileAttributes.FILE_ATTRIBUTE_READONLY) != 0 @staticmethod def setConsoleTitle(title): return ctypes.windll.kernel32.SetConsoleTitleW(title) != 0 @staticmethod def supportsSymlinks(): with tempfile.TemporaryDirectory() as tmp: testFile = os.path.join(tmp, "CRAFT_LINK_TEST") return CraftCore.cache.getCommandOutput(f"cmd", f"/C mklink {testFile} {__file__}", testName="CRAFT_LINK_TEST")[0] == 0 @staticmethod def toNativePath(path : str) -> str: return OsUtils.toWindowsPath(path) @staticmethod def enableAnsiColors(): # tell Windows 10 that we do ansi ctypes.windll.kernel32.SetConsoleMode(ctypes.windll.kernel32.GetStdHandle(-11), 7) + + @staticmethod + def killProcess(name : str="*", prefix : str=None) -> bool: + if not prefix: + prefix = CraftCore.standardDirs.craftRoot() + out = subprocess.run(f"powershell -NoProfile -ExecutionPolicy ByPass -Command \"& {{" + + f"Get-Process '{name}' | Where-Object {{$_.Path -like '{prefix}*'}} | Stop-Process}}\"", shell=True, stderr=subprocess.STDOUT) + CraftCore.log.debug(f"killProcess {out.args}: {out.stdout} {out.returncode}") + return out.returncode == 0 diff --git a/bin/test/test_OsUtils.py b/bin/test/test_OsUtils.py index cc495c2f5..c5b53a210 100755 --- a/bin/test/test_OsUtils.py +++ b/bin/test/test_OsUtils.py @@ -1,20 +1,54 @@ +import subprocess +import os +import sys +import signal import tempfile import unittest import CraftTestBase +import utils from CraftOS.osutils import OsUtils +from CraftCore import CraftCore class OsUtilsTest(CraftTestBase.CraftTestBase): def test_rm(self): _, fileName = tempfile.mkstemp() OsUtils.rm(fileName) def test_rmDir(self): dirName = tempfile.mkdtemp() OsUtils.rmDir(dirName) + def test_killProcess(self): + # TODO: find a better test app than the cmd windows + with tempfile.TemporaryDirectory() as tmp1: + with tempfile.TemporaryDirectory() as tmp2: + test1 = os.path.join(tmp1, "craft_test.exe") + test2 = os.path.join(tmp2, "craft_test.exe") + cmd = CraftCore.cache.findApplication("cmd") + self.assertEqual(utils.copyFile(cmd, test1, linkOnly=False), True) + self.assertEqual(utils.copyFile(cmd, test2, linkOnly=False), True) + process = subprocess.Popen([test1,"/K"], startupinfo=subprocess.CREATE_NEW_PROCESS_GROUP) + process2 = subprocess.Popen([test2,"/K"], startupinfo=subprocess.CREATE_NEW_PROCESS_GROUP) + try: + self.assertEqual(process.poll(), None) + self.assertEqual(process2.poll(), None) + self.assertEqual(OsUtils.killProcess("craft_test", tmp2), True) + self.assertEqual(process.poll(), None) + #ensure that process 2 was killed + self.assertNotEquals(process2.poll(), None) + except subprocess.SubprocessError as e: + CraftCore.log.warning(e) + finally: + process.kill() + process2.kill() + process.wait() + process2.wait() + + + if __name__ == '__main__': unittest.main()