human/ipc/ipc_mem.py

63 lines
2.1 KiB
Python
Raw Normal View History

2024-12-04 16:47:17 +00:00
#encoding = utf8
import logging
import os
from ctypes import *
current = os.path.dirname(__file__)
dynamic_path = os.path.join(current, 'ipc.dll')
class IPCMem:
def __init__(self, sender, receiver):
self.__ipc_obj = WinDLL(dynamic_path)
print(self.__ipc_obj)
if self.__ipc_obj is not None:
self.__ipc_obj.initialize.argtypes = [c_char_p, c_char_p]
self.__ipc_obj.initialize.restype = c_bool
print('IPCUtil init', sender.encode('utf-8'), receiver.encode('utf-8'))
self.__init = self.__ipc_obj.initialize(sender.encode('utf-8'), receiver.encode('utf-8'))
print('IPCUtil init', self.__init)
def __del__(self):
print('IPCUtil __del__')
if self.__ipc_obj is None:
return
if self.__init:
self.__ipc_obj.uninitialize()
def listen(self):
if not self.__init:
return False
self.__ipc_obj.listen.restype = c_bool
return self.__ipc_obj.listen()
def send_text(self, data):
if not self.__init:
return False
self.__ipc_obj.send.argtypes = [c_char_p, c_uint]
self.__ipc_obj.send.restype = c_bool
send_data = data.encode('utf-8')
send_len = len(send_data) + 1
if not self.__ipc_obj.send(send_data, send_len):
self.__ipc_obj.reConnect()
return True
def send_binary(self, data, size):
if not self.__init:
return False
self.__ipc_obj.send.argtypes = [c_char_p, c_uint]
self.__ipc_obj.send.restype = c_bool
data_ptr = cast(data, c_char_p)
return self.__ipc_obj.send(data_ptr, size)
def set_reader_callback(self, callback):
if not self.__init:
return False
CALLBACK_TYPE = CFUNCTYPE(None, c_char_p, c_uint)
self.c_callback = CALLBACK_TYPE(callback) # Store the callback to prevent garbage collection
self.__ipc_obj.setReaderCallback.argtypes = [CALLBACK_TYPE]
self.__ipc_obj.setReaderCallback.restype = c_bool
return self.__ipc_obj.setReaderCallback(self.c_callback)