#encoding = utf8 import os import time from ctypes import * current = os.path.dirname(__file__) dynamic_path = os.path.join(current, 'ipc.dll') class IPCUtil: def __init__(self, sender, receiver): self.__ipc_obj = WinDLL(dynamic_path) print(self.__ipc_obj) if self.__ipc_obj is not None: self.__ipc_obj.initialize.argtypes = [c_char_p, c_char_p] self.__ipc_obj.initialize.restype = c_bool print('IPCUtil init', sender.encode('utf-8'), receiver.encode('utf-8')) self.__init = self.__ipc_obj.initialize(sender.encode('utf-8'), receiver.encode('utf-8')) print('IPCUtil init', self.__init) def __del__(self): print('IPCUtil __del__') if self.__ipc_obj is None: return if self.__init: self.__ipc_obj.uninitialize() def listen(self): if not self.__init: return False self.__ipc_obj.listen.restype = c_bool return self.__ipc_obj.listen() def send_text(self, data): if not self.__init: return False self.__ipc_obj.send.argtypes = [c_char_p, c_uint] self.__ipc_obj.send.restype = c_bool send_data = data.encode('utf-8') send_len = len(send_data) + 1 if not self.__ipc_obj.trySend(send_data, send_len): self.__ipc_obj.reConnect() return True def send_binary(self, data, size): if not self.__init: return False self.__ipc_obj.send.argtypes = [c_char_p, c_uint] self.__ipc_obj.send.restype = c_bool data_ptr = cast(data, c_char_p) return self.__ipc_obj.trySend(data_ptr, size) def set_reader_callback(self, callback): if not self.__init: return False CALLBACK_TYPE = CFUNCTYPE(None, c_char_p, c_uint) self.c_callback = CALLBACK_TYPE(callback) # Store the callback to prevent garbage collection self.__ipc_obj.setReaderCallback.argtypes = [CALLBACK_TYPE] self.__ipc_obj.setReaderCallback.restype = c_bool return self.__ipc_obj.setReaderCallback(self.c_callback) # def ipc_log_callback(log, size): # print(f'log={log.decode("utf-8")}, len={size}') # # # util = IPCUtil('ipc_sender', 'ipc_sender') # util.set_reader_callback(ipc_log_callback) # print(util.listen()) # print(util.send_text('hello')) # time.sleep(200)