From ade26e5393499c162c8137673660ba381e65bca4 Mon Sep 17 00:00:00 2001 From: Jun Komoda <45822440+junkmd@users.noreply.github.com> Date: Sun, 15 Dec 2024 11:55:34 +0000 Subject: [PATCH 1/3] Add new `on` to the workflow. --- .github/workflows/poc.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/poc.yml b/.github/workflows/poc.yml index ad27480..702dc11 100644 --- a/.github/workflows/poc.yml +++ b/.github/workflows/poc.yml @@ -2,6 +2,8 @@ name: PoC on: workflow_dispatch: + pull_request: + branches: [main] jobs: build: From 98570e493735db1d9d5802bef79deb7d13628f5c Mon Sep 17 00:00:00 2001 From: Jun Komoda <45822440+junkmd@users.noreply.github.com> Date: Sun, 15 Dec 2024 11:56:38 +0000 Subject: [PATCH 2/3] Copy from https://github.com/python/cpython/blob/0ac40acec045c4ce780cf7d887fcbe4c661e82b7/Lib/test/test_ctypes/test_win32_com_foreign_func.py --- src/win32_com_foreign_func.py | 284 ++++++++++++++++++++++++++++++++++ 1 file changed, 284 insertions(+) create mode 100644 src/win32_com_foreign_func.py diff --git a/src/win32_com_foreign_func.py b/src/win32_com_foreign_func.py new file mode 100644 index 0000000..1ed0f3f --- /dev/null +++ b/src/win32_com_foreign_func.py @@ -0,0 +1,284 @@ +import ctypes +import gc +import sys +import unittest +from ctypes import POINTER, byref, c_void_p +from ctypes.wintypes import BYTE, DWORD, WORD + +if sys.platform != "win32": + raise unittest.SkipTest("Windows-specific test") + + +from ctypes import HRESULT, COMError, CopyComPointer + +COINIT_APARTMENTTHREADED = 0x2 +CLSCTX_SERVER = 5 +S_OK = 0 +OUT = 2 +TRUE = 1 +E_NOINTERFACE = -2147467262 + + +class GUID(ctypes.Structure): + # https://learn.microsoft.com/en-us/windows/win32/api/guiddef/ns-guiddef-guid + _fields_ = [ + ("Data1", DWORD), + ("Data2", WORD), + ("Data3", WORD), + ("Data4", BYTE * 8), + ] + + +def create_proto_com_method(name, index, restype, *argtypes): + proto = ctypes.WINFUNCTYPE(restype, *argtypes) + + def make_method(*args): + foreign_func = proto(index, name, *args) + + def call(self, *args, **kwargs): + return foreign_func(self, *args, **kwargs) + + return call + + return make_method + + +def create_guid(name): + guid = GUID() + # https://learn.microsoft.com/en-us/windows/win32/api/combaseapi/nf-combaseapi-clsidfromstring + ole32.CLSIDFromString(name, byref(guid)) + return guid + + +def is_equal_guid(guid1, guid2): + # https://learn.microsoft.com/en-us/windows/win32/api/objbase/nf-objbase-isequalguid + return ole32.IsEqualGUID(byref(guid1), byref(guid2)) + + +ole32 = ctypes.oledll.ole32 + +IID_IUnknown = create_guid("{00000000-0000-0000-C000-000000000046}") +IID_IStream = create_guid("{0000000C-0000-0000-C000-000000000046}") +IID_IPersist = create_guid("{0000010C-0000-0000-C000-000000000046}") +CLSID_ShellLink = create_guid("{00021401-0000-0000-C000-000000000046}") + +# https://learn.microsoft.com/en-us/windows/win32/api/unknwn/nf-unknwn-iunknown-queryinterface(refiid_void) +proto_query_interface = create_proto_com_method( + "QueryInterface", 0, HRESULT, POINTER(GUID), POINTER(c_void_p) +) +# https://learn.microsoft.com/en-us/windows/win32/api/unknwn/nf-unknwn-iunknown-addref +proto_add_ref = create_proto_com_method("AddRef", 1, ctypes.c_long) +# https://learn.microsoft.com/en-us/windows/win32/api/unknwn/nf-unknwn-iunknown-release +proto_release = create_proto_com_method("Release", 2, ctypes.c_long) +# https://learn.microsoft.com/en-us/windows/win32/api/objidl/nf-objidl-ipersist-getclassid +proto_get_class_id = create_proto_com_method( + "GetClassID", 3, HRESULT, POINTER(GUID) +) + + +def create_shelllink_persist(typ): + ppst = typ() + # https://learn.microsoft.com/en-us/windows/win32/api/combaseapi/nf-combaseapi-cocreateinstance + ole32.CoCreateInstance( + byref(CLSID_ShellLink), + None, + CLSCTX_SERVER, + byref(IID_IPersist), + byref(ppst), + ) + return ppst + + +class ForeignFunctionsThatWillCallComMethodsTests(unittest.TestCase): + def setUp(self): + # https://learn.microsoft.com/en-us/windows/win32/api/combaseapi/nf-combaseapi-coinitializeex + ole32.CoInitializeEx(None, COINIT_APARTMENTTHREADED) + + def tearDown(self): + # https://learn.microsoft.com/en-us/windows/win32/api/combaseapi/nf-combaseapi-couninitialize + ole32.CoUninitialize() + gc.collect() + + def test_without_paramflags_and_iid(self): + class IUnknown(c_void_p): + QueryInterface = proto_query_interface() + AddRef = proto_add_ref() + Release = proto_release() + + class IPersist(IUnknown): + GetClassID = proto_get_class_id() + + ppst = create_shelllink_persist(IPersist) + + clsid = GUID() + hr_getclsid = ppst.GetClassID(byref(clsid)) + self.assertEqual(S_OK, hr_getclsid) + self.assertEqual(TRUE, is_equal_guid(CLSID_ShellLink, clsid)) + + self.assertEqual(2, ppst.AddRef()) + self.assertEqual(3, ppst.AddRef()) + + punk = IUnknown() + hr_qi = ppst.QueryInterface(IID_IUnknown, punk) + self.assertEqual(S_OK, hr_qi) + self.assertEqual(3, punk.Release()) + + with self.assertRaises(OSError) as e: + punk.QueryInterface(IID_IStream, IUnknown()) + self.assertEqual(E_NOINTERFACE, e.exception.winerror) + + self.assertEqual(2, ppst.Release()) + self.assertEqual(1, ppst.Release()) + self.assertEqual(0, ppst.Release()) + + def test_with_paramflags_and_without_iid(self): + class IUnknown(c_void_p): + QueryInterface = proto_query_interface(None) + AddRef = proto_add_ref() + Release = proto_release() + + class IPersist(IUnknown): + GetClassID = proto_get_class_id(((OUT, "pClassID"),)) + + ppst = create_shelllink_persist(IPersist) + + clsid = ppst.GetClassID() + self.assertEqual(TRUE, is_equal_guid(CLSID_ShellLink, clsid)) + + punk = IUnknown() + hr_qi = ppst.QueryInterface(IID_IUnknown, punk) + self.assertEqual(S_OK, hr_qi) + self.assertEqual(1, punk.Release()) + + with self.assertRaises(OSError) as e: + ppst.QueryInterface(IID_IStream, IUnknown()) + self.assertEqual(E_NOINTERFACE, e.exception.winerror) + + self.assertEqual(0, ppst.Release()) + + def test_with_paramflags_and_iid(self): + class IUnknown(c_void_p): + QueryInterface = proto_query_interface(None, IID_IUnknown) + AddRef = proto_add_ref() + Release = proto_release() + + class IPersist(IUnknown): + GetClassID = proto_get_class_id(((OUT, "pClassID"),), IID_IPersist) + + ppst = create_shelllink_persist(IPersist) + + clsid = ppst.GetClassID() + self.assertEqual(TRUE, is_equal_guid(CLSID_ShellLink, clsid)) + + punk = IUnknown() + hr_qi = ppst.QueryInterface(IID_IUnknown, punk) + self.assertEqual(S_OK, hr_qi) + self.assertEqual(1, punk.Release()) + + with self.assertRaises(COMError) as e: + ppst.QueryInterface(IID_IStream, IUnknown()) + self.assertEqual(E_NOINTERFACE, e.exception.hresult) + + self.assertEqual(0, ppst.Release()) + + +class CopyComPointerTests(unittest.TestCase): + def setUp(self): + ole32.CoInitializeEx(None, COINIT_APARTMENTTHREADED) + + class IUnknown(c_void_p): + QueryInterface = proto_query_interface(None, IID_IUnknown) + AddRef = proto_add_ref() + Release = proto_release() + + class IPersist(IUnknown): + GetClassID = proto_get_class_id(((OUT, "pClassID"),), IID_IPersist) + + self.IUnknown = IUnknown + self.IPersist = IPersist + + def tearDown(self): + ole32.CoUninitialize() + gc.collect() + + def test_both_are_null(self): + src = self.IPersist() + dst = self.IPersist() + + hr = CopyComPointer(src, byref(dst)) + + self.assertEqual(S_OK, hr) + + self.assertIsNone(src.value) + self.assertIsNone(dst.value) + + def test_src_is_nonnull_and_dest_is_null(self): + # The reference count of the COM pointer created by `CoCreateInstance` + # is initially 1. + src = create_shelllink_persist(self.IPersist) + dst = self.IPersist() + + # `CopyComPointer` calls `AddRef` explicitly in the C implementation. + # The refcount of `src` is incremented from 1 to 2 here. + hr = CopyComPointer(src, byref(dst)) + + self.assertEqual(S_OK, hr) + self.assertEqual(src.value, dst.value) + + # This indicates that the refcount was 2 before the `Release` call. + self.assertEqual(1, src.Release()) + + clsid = dst.GetClassID() + self.assertEqual(TRUE, is_equal_guid(CLSID_ShellLink, clsid)) + + self.assertEqual(0, dst.Release()) + + def test_src_is_null_and_dest_is_nonnull(self): + src = self.IPersist() + dst_orig = create_shelllink_persist(self.IPersist) + dst = self.IPersist() + CopyComPointer(dst_orig, byref(dst)) + self.assertEqual(1, dst_orig.Release()) + + clsid = dst.GetClassID() + self.assertEqual(TRUE, is_equal_guid(CLSID_ShellLink, clsid)) + + # This does NOT affects the refcount of `dst_orig`. + hr = CopyComPointer(src, byref(dst)) + + self.assertEqual(S_OK, hr) + self.assertIsNone(dst.value) + + with self.assertRaises(ValueError): + dst.GetClassID() # NULL COM pointer access + + # This indicates that the refcount was 1 before the `Release` call. + self.assertEqual(0, dst_orig.Release()) + + def test_both_are_nonnull(self): + src = create_shelllink_persist(self.IPersist) + dst_orig = create_shelllink_persist(self.IPersist) + dst = self.IPersist() + CopyComPointer(dst_orig, byref(dst)) + self.assertEqual(1, dst_orig.Release()) + + self.assertEqual(dst.value, dst_orig.value) + self.assertNotEqual(src.value, dst.value) + + hr = CopyComPointer(src, byref(dst)) + + self.assertEqual(S_OK, hr) + self.assertEqual(src.value, dst.value) + self.assertNotEqual(dst.value, dst_orig.value) + + self.assertEqual(1, src.Release()) + + clsid = dst.GetClassID() + self.assertEqual(TRUE, is_equal_guid(CLSID_ShellLink, clsid)) + + self.assertEqual(0, dst.Release()) + self.assertEqual(0, dst_orig.Release()) + + +if __name__ == "__main__": + unittest.main() From a3bf97713767e511d0258ef82ce5f0a156236137 Mon Sep 17 00:00:00 2001 From: Jun Komoda <45822440+junkmd@users.noreply.github.com> Date: Sun, 15 Dec 2024 13:07:12 +0000 Subject: [PATCH 3/3] Add "Build IDL" poc workflow. --- .github/workflows/poc.yml | 24 ++++ src/TestCtypesComServer.idl | 30 +++++ src/__init__.py | 0 src/test.py | 217 ++++++++++++++++++++++++++++++++++++ 4 files changed, 271 insertions(+) create mode 100644 src/TestCtypesComServer.idl create mode 100644 src/__init__.py create mode 100644 src/test.py diff --git a/.github/workflows/poc.yml b/.github/workflows/poc.yml index 702dc11..a3209b8 100644 --- a/.github/workflows/poc.yml +++ b/.github/workflows/poc.yml @@ -12,3 +12,27 @@ jobs: steps: - name: Check out repository uses: actions/checkout@v3 + + - name: Set up MSVC + uses: ilammy/msvc-dev-cmd@v1 + + - name: Compile IDL + run: midl /out src src\TestCtypesComServer.idl + + - name: Download and Extract CPython Zip + run: | + $url = "https://github.com/python/cpython/archive/refs/heads/main.zip" + $output = "cpython.zip" + Invoke-WebRequest -Uri $url -OutFile $output + Expand-Archive -Path $output -DestinationPath . + Rename-Item -Path "cpython-main" -NewName "cpython" + + - name: Build CPython + working-directory: ./cpython + run: PCbuild\build.bat -p x64 -c Release + + - name: Unittest + run: | + .\cpython\PCbuild\amd64\python.exe -m venv .venv + .\.venv\Scripts\activate + python -m src.test diff --git a/src/TestCtypesComServer.idl b/src/TestCtypesComServer.idl new file mode 100644 index 0000000..d343bc4 --- /dev/null +++ b/src/TestCtypesComServer.idl @@ -0,0 +1,30 @@ +import "oaidl.idl"; +import "ocidl.idl"; + +[ + object, + uuid(479c32ae-7505-4182-8cf7-f39b7f1a8de4), + helpstring("ITestCtypesComServer interface") +] +interface ITestCtypesComServer : IUnknown { + [id(1), helpstring("Adds one to the provided integer")] + HRESULT AddOne([in] int value, [out, retval] int* result); +}; + +[ + uuid(4b914909-66e1-47c2-98c1-7a1bd41ea23f), + version(1.0), + helpstring("TestCtypesComServer 1.0 Type library") +] +library TestCtypesComServerLib +{ + importlib("stdole2.tlb"); + + [ + uuid(c0a45aa7-4423-4263-9492-4bd6e446823f), + helpstring("TestCtypesComServer class object") + ] + coclass TestCtypesComServer { + [default] interface ITestCtypesComServer; + }; +}; diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/test.py b/src/test.py new file mode 100644 index 0000000..c585d1e --- /dev/null +++ b/src/test.py @@ -0,0 +1,217 @@ +import _ctypes +import ctypes +import gc +import os +import pprint +import unittest +import winreg +from unittest import mock + +from .win32_com_foreign_func import ( + COINIT_APARTMENTTHREADED, + E_NOINTERFACE, + GUID, + HRESULT, + POINTER, + S_OK, + TRUE, + COMError, + CopyComPointer, + IID_IUnknown, + byref, + c_void_p, + create_guid, + create_proto_com_method, + is_equal_guid, + ole32, + proto_add_ref, + proto_query_interface, + proto_release, +) + +oleaut32 = ctypes.oledll.oleaut32 +HKCR = winreg.HKEY_CLASSES_ROOT + +IN = 1 +OUT = 2 +RETVAL = 8 + +COINIT_MULTITHREADED = 0x0 + +CLSCTX_INPROC_SERVER = 1 +CLSCTX_LOCAL_SERVER = 4 + +REGKIND_DEFAULT = 0 +REGKIND_REGISTER = 1 +REGKIND_NONE = 2 + + +IID_ITestCtypesComServer = create_guid( + "{479C32AE-7505-4182-8CF7-F39B7F1A8DE4}" +) +CLSID_TestCtypesComServer = create_guid( + "{C0A45AA7-4423-4263-9492-4BD6E446823F}" +) +LIBID_TestCtypesComServerLib = create_guid( + "{4B914909-66E1-47C2-98C1-7A1BD41EA23F}" +) +PROGID_TestCtypesComServer = "TestCtypesComServerLib.TestCtypesComServer.1" +proto_add_one = create_proto_com_method( + "AddOne", 3, HRESULT, ctypes.c_int, POINTER(ctypes.c_int) +) + + +def str_from_guid(guid): + p = ctypes.c_wchar_p() + ole32.StringFromCLSID(byref(guid), byref(p)) + result = p.value + ctypes.windll.ole32.CoTaskMemFree(p) + return result + + +class IUnknown(c_void_p): + QueryInterface = proto_query_interface(None, IID_IUnknown) + AddRef = proto_add_ref(None, IID_IUnknown) + Release = proto_release(None, IID_IUnknown) + + +class ITestCtypesComServer(IUnknown): + AddOne = proto_add_one( + ((IN, "value"), (OUT | RETVAL, "result")), IID_ITestCtypesComServer + ) + + +class TestCtypesComServer: + def AddOne(self, value): + return value + 1 + + +DIR_NAME = os.path.dirname(__file__) +TLB_FULLPATH = os.path.join(DIR_NAME, "TestCtypesComServer.tlb") +clsid_sub = rf"CLSID\{str_from_guid(CLSID_TestCtypesComServer)}" +inproc_srv_sub = rf"{clsid_sub}\InprocServer32" +full_classname = f"{__name__}.{TestCtypesComServer.__name__}" + + +def get_inproc_sever_registry_entries(): + return sorted( + [ + (HKCR, clsid_sub, "", ""), + (HKCR, rf"{clsid_sub}\ProgID", "", PROGID_TestCtypesComServer), + ( + HKCR, + rf"{PROGID_TestCtypesComServer}\CLSID", + "", + str_from_guid(CLSID_TestCtypesComServer), + ), + (HKCR, inproc_srv_sub, "", _ctypes.__file__), + (HKCR, inproc_srv_sub, "PythonClass", full_classname), + (HKCR, inproc_srv_sub, "PythonPath", DIR_NAME), + (HKCR, inproc_srv_sub, "ThreadingModel", "Both"), + ( + HKCR, + rf"{clsid_sub}\Typelib", + "", + str_from_guid(LIBID_TestCtypesComServerLib), + ), + ] + ) + + +def register_inproc_server(): + for hkey, subkey, name, value in get_inproc_sever_registry_entries(): + k = winreg.CreateKey(hkey, subkey) + winreg.SetValueEx(k, name, None, winreg.REG_SZ, str(value)) + ptl = IUnknown() + oleaut32.LoadTypeLibEx( + ctypes.c_wchar_p(TLB_FULLPATH), REGKIND_REGISTER, byref(ptl) + ) + + +def create_instance(clsctx): + psvr = ITestCtypesComServer() + # https://learn.microsoft.com/en-us/windows/win32/api/combaseapi/nf-combaseapi-cocreateinstance + ole32.CoCreateInstance( + byref(CLSID_TestCtypesComServer), + None, + clsctx, + byref(IID_ITestCtypesComServer), + byref(psvr), + ) + return psvr + + +# class IClassFactory(comtypes.IUnknown): +# _iid_ = comtypes.GUID("{00000001-0000-0000-C000-000000000046}") +# _methods_ = [ +# comtypes.STDMETHOD( +# comtypes.HRESULT, +# "CreateInstance", +# [ +# ctypes.POINTER(comtypes.IUnknown), +# ctypes.POINTER(comtypes.GUID), +# ctypes.POINTER(ctypes.c_void_p), +# ], +# ), +# comtypes.STDMETHOD(comtypes.HRESULT, "LockServer", [ctypes.c_int]), +# ] + + +CLASS_E_CLASSNOTAVAILABLE = -2147221231 +IID_IClassFactory = create_guid("{00000001-0000-0000-C000-000000000046}") +proto_create_instance = create_proto_com_method( + "CreateInstance", + 3, + HRESULT, + POINTER(IUnknown), + POINTER(GUID), + POINTER(c_void_p), +) +proto_lock_server = create_proto_com_method( + "LockServer", 4, HRESULT, ctypes.c_int +) + + +class IClassFactory(IUnknown): + CreateInstance = proto_create_instance(None, IID_IClassFactory) + LockServer = proto_lock_server(None, IID_IClassFactory) + + +class TestInprocServer(unittest.TestCase): + def setUp(self): + # https://learn.microsoft.com/en-us/windows/win32/api/combaseapi/nf-combaseapi-coinitializeex + ole32.CoInitializeEx(None, COINIT_MULTITHREADED) + register_inproc_server() + + def tearDown(self): + # https://learn.microsoft.com/en-us/windows/win32/api/combaseapi/nf-combaseapi-couninitialize + ole32.CoUninitialize() + gc.collect() + + def test_1(self): + create_instance(CLSCTX_INPROC_SERVER) + + def test_2(self): + call_args_list = [] + + def DllGetClassObject(rclsid, riid, ppv): + call_args_list.append( + ( + str_from_guid(GUID.from_address(rclsid)), + str_from_guid(GUID.from_address(riid)), + ppv, + ) + ) + return S_OK + + try: + with mock.patch.object( + ctypes, "DllGetClassObject", DllGetClassObject + ): + create_instance(CLSCTX_INPROC_SERVER) + except Exception as e: + self.fail(f"FAIL:\n{e}\n{pprint.pformat(call_args_list)}") + + +if __name__ == "__main__": + unittest.main()