diff --git a/human_render/CMakeLists.txt b/human_render/CMakeLists.txt index 94b1d44..a432ee3 100644 --- a/human_render/CMakeLists.txt +++ b/human_render/CMakeLists.txt @@ -24,13 +24,16 @@ SET( INCLUDE_DIRECTORIES( ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/Ipc ${Thirdparty} - ${Thirdparty}/libcef + ${Thirdparty}/libcef + ${Thirdparty}/libipc ) link_directories( ${Thirdparty}/libcef/lib/Debug + ${Thirdparty}/libipc/lib ) diff --git a/human_render/Constant.h b/human_render/Constant.h new file mode 100644 index 0000000..fe2059a --- /dev/null +++ b/human_render/Constant.h @@ -0,0 +1,44 @@ +#pragma once + +#include + +using int8 = char; +using uint8 = unsigned char; +using int16 = short; +using uint16 = unsigned short; +using int32 = int; +using uint32 = unsigned int; +using ulong32 = unsigned long; +using int64 = long long; +using uint64 = unsigned long long; + +using AtomicRefCount = std::atomic; +// +//#ifndef MAKEWORD +//#define MAKEWORD(a, b) ((uint16)(((uint8)(((uint64)(a)) & 0xff)) | ((uint16)((uint8)(((uint64)(b)) & 0xff))) << 8)) +//#endif // !MAKEWORD +// +//#ifndef MAKELONG +//#define MAKELONG(a, b) ((long)(((uint16)(((uint64)(a)) & 0xffff)) | ((unsigned long)((uint16)(((uint64)(b)) & 0xffff))) << 16)) +//#endif // !MAKELONG +// +//#ifndef LOWORD +//#define LOWORD(l) ((uint16)(((uint64)(l)) & 0xffff)) +//#endif // !LOWORD +// +//#ifndef HIWORD +//#define HIWORD(l) ((uint16)((((uint64)(l)) >> 16) & 0xffff)) +//#endif // !HIWORD +// +//#ifndef LOBYTE +//#define LOBYTE(w) ((uint8)(((uint64)(w)) & 0xff)) +//#endif // !LOBYTE +// +//#ifndef HIBYTE +//#define HIBYTE(w) ((uint8)((((uint64)(w)) >> 8) & 0xff)) +//#endif // !HIBYTE + + +#define NON_COPYABLE(class_name) \ + class_name(const class_name&) = delete; \ + class_name& operator=(const class_name&) = delete; diff --git a/human_render/Ipc/Ipc.cpp b/human_render/Ipc/Ipc.cpp new file mode 100644 index 0000000..08ebd35 --- /dev/null +++ b/human_render/Ipc/Ipc.cpp @@ -0,0 +1,128 @@ +#include "Ipc/Ipc.h" + +#include +#include +#include + +#include "Ipc/ReadCallback.h" + +std::shared_ptr Ipc::create(const char* reader_name, const char* writer_name) { + struct Creator : public Ipc { + Creator(const char* reader_name, const char* writer_name) : Ipc(reader_name, writer_name) {} + ~Creator() override = default; + }; + return std::make_shared(reader_name, writer_name); +} + +Ipc::Ipc(const char* reader_name, const char* writer_name) noexcept + : reader_name_(reader_name) + , writer_name_(writer_name) { + sender_ = std::make_unique(toUtf8(reader_name_).c_str(), ipc::sender); + fprintf(stderr, "%s, %s\n", reader_name, writer_name); +} + +Ipc::~Ipc() { + stop(); +} + +bool Ipc::listen() { + if (!reciver_stop_.load(std::memory_order_acquire)) { + return false; + } + + reciver_stop_.store(false); + + std::weak_ptr wThis = shared_from_this(); + reciver_thread_ = std::move(std::thread(std::bind(&Ipc::doReciver, this, wThis))); + return true; +} + +void Ipc::stop() { + if (reciver_stop_.load(std::memory_order_acquire)) { + return; + } + + reciver_stop_.store(true, std::memory_order_release); + + sender_->disconnect(); + receiver_->disconnect(); + + if (reciver_thread_.joinable()) { + reciver_thread_.join(); + } +} + +bool Ipc::send(const char* data, unsigned int size) { + if (!sender_) { + return false; + } + + return sender_->send(data, size); +} + + +bool Ipc::isConnected() { + /* if (receiver_) { + return receiver_->is_connected(); + }*/ + return false; +} + + +void Ipc::reConnect() { + sender_->reconnect(ipc::sender); + if (receiver_) { + receiver_->disconnect(); + receiver_->reconnect(ipc::receiver); + } +} + +void Ipc::registReadCallback(const std::shared_ptr& reader) { + std::lock_guard lock(mutex_); + + auto itor = std::find(reader_callbacks_.begin(), reader_callbacks_.end(), reader); + if (reader_callbacks_.end() != itor) { + return; + } + reader_callbacks_.emplace_back(reader); +} + +void Ipc::unregistReadCallback(const std::shared_ptr& reader) { + std::lock_guard lock(mutex_); + + auto itor = std::find(reader_callbacks_.begin(), reader_callbacks_.end(), reader); + if (reader_callbacks_.end() != itor) { + return; + } + reader_callbacks_.erase(itor); +} + +void Ipc::doReciver(std::weak_ptr wThis) { + std::shared_ptr self(wThis.lock()); + if (!self) { + return; + } + + receiver_ = std::make_unique(writer_name_.c_str(), ipc::receiver); + while (!reciver_stop_.load(std::memory_order_acquire)) { + ipc::buff_t buffer = receiver_->recv(5); + if (buffer.empty()) { + continue; + } + const char* data = buffer.get(); + onReciveer(data, static_cast(buffer.size())); + } +} + +void Ipc::onReciveer(const char* buffer, unsigned int size) { + std::lock_guard lock(mutex_); + for (auto& read : reader_callbacks_) { + read->onRead(buffer, size); + } +} + +std::string Ipc::toUtf8(const std::string& str) { + std::wstring_convert> converter; + std::wstring wide_str = converter.from_bytes(str); + return converter.to_bytes(wide_str); +} diff --git a/human_render/Ipc/Ipc.h b/human_render/Ipc/Ipc.h new file mode 100644 index 0000000..c6d7084 --- /dev/null +++ b/human_render/Ipc/Ipc.h @@ -0,0 +1,50 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include + +class IReaderCallback; + +class Ipc : public std::enable_shared_from_this{ +public: + static std::shared_ptr create(const char* reader_name, const char* writer_name); +public: + virtual ~Ipc(); + + bool listen(); + void stop(); + bool send(const char* data, unsigned int size); + bool isConnected(); + void reConnect(); + + void registReadCallback(const std::shared_ptr& reader); + void unregistReadCallback(const std::shared_ptr& reader); + +protected: + Ipc(const char* reader_name, const char* writer_name) noexcept; + +private: + void doReciver(std::weak_ptr wThis); + + void onReciveer(const char* buffer, unsigned int size); + + std::string toUtf8(const std::string& str); + +private: + std::string reader_name_; + std::string writer_name_; + std::unique_ptr sender_; + std::unique_ptr receiver_; + std::thread reciver_thread_; + std::atomic_bool reciver_stop_{true}; + + std::mutex mutex_; + using ReaderCallbackList = std::vector>; + ReaderCallbackList reader_callbacks_; +}; + diff --git a/human_render/Ipc/IpcMoudle.cpp b/human_render/Ipc/IpcMoudle.cpp new file mode 100644 index 0000000..368848f --- /dev/null +++ b/human_render/Ipc/IpcMoudle.cpp @@ -0,0 +1,113 @@ +#include "Ipc/IpcMoudle.h" + +#include + +#define STB_IMAGE_WRITE_IMPLEMENTATION +#include + +//#include "Core/Core.h" +#include "Ipc/ipclib.h" +//#include "ImageBuffer.h" +//#include "VHI/VHI.h" + +//IpcMoudle* Singleton::instance_ = nullptr; + +void OnParseImageData(const char* data, size_t size) { + if (size < 13) { // Minimum size check: 1 byte identifier + 4 bytes width + 4 bytes height + 4 bytes bit depth + std::cerr << "Invalid data size" << std::endl; + return; + } + + // Extract identifier + char identifier = data[0]; + const char* buffer = data + 1; + /* if (0x01 == identifier) { + IpcMoudle::Get()->PushImage(buffer, size - 1); + } else if (0x02 == identifier) { + IpcMoudle::Get()->PushVoice(buffer, size - 1); + } + + INFOLOG("identifier {}", static_cast(identifier));*/ +} + +bool IpcMoudle::Initialize() { + //if (!initialize("human_render", "human_product")) { + // ERRORLOG("ipc initialize failed"); + // return false; + //} + + //setReaderCallback([](const char* data, unsigned int size) { + // //INFOLOG("ipc recive data:{}", size); + // OnParseImageData(data, size); + // } + //); + + //if (!listen()) { + // ERRORLOG("ipc listen failed"); + // return false; + //} + + ////auto callback = [](const char* data, unsigned int size) { + //// //INFOLOG("ipc recive data:{}", size); + //// OnParseImageData(data, size); + ////}; + ////zmqMoudle_ = std::make_unique(callback); + ////zmqMoudle_->Start(); + //lastHeartbeatTime_ = std::chrono::steady_clock::now(); + return true; +} + +void IpcMoudle::Uninitialize() { + //zmqMoudle_->Stop(); + uninitialize(); +} + +bool IpcMoudle::Send(const char* data, unsigned int size) { + return send(data, size); + return false; +} + +void IpcMoudle::OnFrame() { + auto now = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast(now - lastHeartbeatTime_); + + if (duration.count() >= 2) { + /* constexpr char heartbeat[] = "heartbeat"; + if (!send("heartbeat", sizeof(heartbeat) / sizeof(heartbeat[0]))) { + ERRORLOG("send heartbeat failed"); + reConnect(); + }*/ + lastHeartbeatTime_ = now; + } +} + +void IpcMoudle::PushImage(const char* data, uint32 size) { + // Extract width + int width; + std::memcpy(&width, data, sizeof(int)); + + // Extract height + int height; + std::memcpy(&height, data + 4, sizeof(int)); + + // Extract bit depth + int bit_depth; + std::memcpy(&bit_depth, data + 8, sizeof(int)); + + // Extract image bytes + const char* img_bytes = data + 12; + size_t img_size = size - 12; + //std::vector img_data(img_size); + //memcpy(img_data.data(), img_bytes, img_size); + + //ImageBuffer::Get()->PushImage(ImageBuffer::IBType::Human, std::move(img_data), width, height, bit_depth); +} + +void IpcMoudle::PushVoice(const char* data, uint32 size) { + /* IAudioRender* audioRender = VHI::Get()->GetAudioRender(); + if (nullptr == audioRender) { + return; + } + + audioRender->Write(data, size);*/ +} diff --git a/human_render/Ipc/IpcMoudle.h b/human_render/Ipc/IpcMoudle.h new file mode 100644 index 0000000..a3708a9 --- /dev/null +++ b/human_render/Ipc/IpcMoudle.h @@ -0,0 +1,32 @@ +#pragma once + +#include + +//#include "Core/Singleton.h" +//#include "Core/Constant.h" + +//#include "Ipc/ZmqMoudle.h" +#include "Constant.h" + +class IpcMoudle {// : public Singleton { + //NON_COPYABLE(IpcMoudle) + +public: + IpcMoudle() = default; + virtual ~IpcMoudle() = default; + bool Initialize(); + void Uninitialize(); + + bool Send(const char* data, uint32 size); + + void OnFrame(); + + void PushImage(const char* data, uint32 size); + void PushVoice(const char* data, uint32 size); + +private: + std::chrono::time_point lastHeartbeatTime_; + + //std::unique_ptr zmqMoudle_; +}; + diff --git a/human_render/Ipc/ReadCallback.h b/human_render/Ipc/ReadCallback.h new file mode 100644 index 0000000..0240d20 --- /dev/null +++ b/human_render/Ipc/ReadCallback.h @@ -0,0 +1,9 @@ +#pragma once + +class IReaderCallback { +public: + virtual ~IReaderCallback() = default; + + // read thread callback + virtual void onRead(const char* buffer, unsigned int size) = 0; +}; \ No newline at end of file diff --git a/human_render/Ipc/ZmqMoudle.cpp b/human_render/Ipc/ZmqMoudle.cpp new file mode 100644 index 0000000..06e530e --- /dev/null +++ b/human_render/Ipc/ZmqMoudle.cpp @@ -0,0 +1,62 @@ +#include "Ipc/ZmqMoudle.h" + +#if 0 +#include +#include + +#include + + ZmqMoudle::ZmqMoudle(ZmqMoudleCallback callback) + : callback_(callback){ + + } + + void ZmqMoudle::Start() { + if (work_) { + return; + } + + shouldExit_.store(false); + auto run = [this]() { + zmq::context_t context(1); + zmq::socket_t subscriber(context, ZMQ_SUB); + subscriber.connect("tcp://127.0.0.1:55661"); + subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0); + + bool isFirst = true; + while (!shouldExit_.load()) { + zmq::message_t message; + if (subscriber.recv(message, zmq::recv_flags::none)) { + if (isFirst) { + auto start = std::chrono::high_resolution_clock::now(); + + // 打印自 Epoch 以来的时间(以秒为单位) + auto duration = std::chrono::duration(start.time_since_epoch()); + int64_t time = duration.count(); + time = 0; + isFirst = true; + } + const char* data = static_cast(message.data()); + if (nullptr != callback_) { + callback_(data, message.size()); + } + } else { + ::_sleep(1); + } + } + }; + + work_.reset(new std::thread(run)); + } + + void ZmqMoudle::Stop() { + if (!work_) { + return; + } + + shouldExit_.store(true); + if (work_->joinable()) { + work_->join(); + } + } +#endif diff --git a/human_render/Ipc/ZmqMoudle.h b/human_render/Ipc/ZmqMoudle.h new file mode 100644 index 0000000..f64c5e2 --- /dev/null +++ b/human_render/Ipc/ZmqMoudle.h @@ -0,0 +1,22 @@ +#pragma once +#if 0 +#include +#include +#include +#include + +using ZmqMoudleCallback = std::function; +class ZmqMoudle { +public: + ZmqMoudle(ZmqMoudleCallback callback); + + void Start(); + void Stop(); + +private: + ZmqMoudleCallback callback_; + std::unique_ptr work_; + void* context_{ nullptr }; + std::atomic shouldExit_; +}; +#endif \ No newline at end of file diff --git a/human_render/Ipc/config.h b/human_render/Ipc/config.h new file mode 100644 index 0000000..16d3f59 --- /dev/null +++ b/human_render/Ipc/config.h @@ -0,0 +1,20 @@ +#pragma once + +#ifdef _IPC_EXPORT_ + +#ifdef WIN32 + +#define IPC_EXPORT __declspec( dllexport ) + +#endif // WIN32 + +#else + +#ifdef WIN32 + +#define IPC_EXPORT __declspec( dllimport ) + +#endif // WIN32 + +#endif // _IPC_EXPORT_ + diff --git a/human_render/Ipc/ipclib.cpp b/human_render/Ipc/ipclib.cpp new file mode 100644 index 0000000..6f0928e --- /dev/null +++ b/human_render/Ipc/ipclib.cpp @@ -0,0 +1,80 @@ +#include "Ipc/ipclib.h" + +#include +#include + +#include "Ipc.h" +#include "ReadCallback.h" + +class ReaderCallback : public IReaderCallback { +public: + ReaderCallback() = default; + ~ReaderCallback() override = default; + void onRead(const char* buffer, unsigned int size) override { + //printf("%s", buffer); + if (nullptr != m_readerCallbackFunc) { + m_readerCallbackFunc(buffer, size); + } + } + + void setCallback(ReaderCallbackFunc func) { + m_readerCallbackFunc = func; + } + +private: + ReaderCallback(const ReaderCallback&) = delete; + ReaderCallback operator= (const ReaderCallback&) = delete; + +private: + ReaderCallbackFunc m_readerCallbackFunc{nullptr}; +}; + +std::shared_ptr g_ipc; +std::shared_ptr g_readCallback; + +bool __stdcall initialize(const char* sender_name, const char* receiver_name) { + assert(!g_ipc); + g_ipc = Ipc::create(sender_name, receiver_name); + return true; +} + + void __stdcall uninitialize() { + assert(g_ipc); + g_ipc->stop(); + if (g_readCallback) { + g_ipc->unregistReadCallback(g_readCallback); + g_readCallback.reset(); + } + g_ipc.reset(); +} + +bool __stdcall listen() { + assert(g_ipc); + return g_ipc->listen(); + } + + bool __stdcall send(const char* data, unsigned int size) { + assert(g_ipc); + return g_ipc->send(data, size); + } + + bool __stdcall setReaderCallback(ReaderCallbackFunc callback) { + assert(g_ipc); + if (!g_readCallback) { + g_readCallback = std::make_shared(); + g_ipc->registReadCallback(g_readCallback); + } + g_readCallback->setCallback(callback); + return true; + } + + + void __stdcall reConnect() { + assert(g_ipc); + return g_ipc->reConnect(); + } + + bool __stdcall isConnect() { + assert(g_ipc); + return g_ipc->isConnected(); + } diff --git a/human_render/Ipc/ipclib.h b/human_render/Ipc/ipclib.h new file mode 100644 index 0000000..cb4f149 --- /dev/null +++ b/human_render/Ipc/ipclib.h @@ -0,0 +1,25 @@ +#pragma once + +#include "Ipc/config.h" + + +#ifdef __cplusplus +extern "C" { +#endif // __cplusplus + + typedef void(__stdcall *ReaderCallbackFunc)(const char* data, unsigned int size); + + bool __stdcall initialize(const char* sender_name, const char* receiver_name); + void __stdcall uninitialize(); + + bool __stdcall listen(); + bool __stdcall send(const char* data, unsigned int size); + bool __stdcall setReaderCallback(ReaderCallbackFunc callback); + + void __stdcall reConnect(); + + bool __stdcall isConnect(); + +#ifdef __cplusplus +} +#endif // __cplusplus diff --git a/human_render/Ipc/libipc/buffer.cpp b/human_render/Ipc/libipc/buffer.cpp new file mode 100644 index 0000000..0ac0fa7 --- /dev/null +++ b/human_render/Ipc/libipc/buffer.cpp @@ -0,0 +1,87 @@ +#include "libipc/buffer.h" +#include "libipc/utility/pimpl.h" + +#include + +namespace ipc { + +bool operator==(buffer const & b1, buffer const & b2) { + return (b1.size() == b2.size()) && (std::memcmp(b1.data(), b2.data(), b1.size()) == 0); +} + +bool operator!=(buffer const & b1, buffer const & b2) { + return !(b1 == b2); +} + +class buffer::buffer_ : public pimpl { +public: + void* p_; + std::size_t s_; + void* a_; + buffer::destructor_t d_; + + buffer_(void* p, std::size_t s, buffer::destructor_t d, void* a) + : p_(p), s_(s), a_(a), d_(d) { + } + + ~buffer_() { + if (d_ == nullptr) return; + d_((a_ == nullptr) ? p_ : a_, s_); + } +}; + +buffer::buffer() + : buffer(nullptr, 0, nullptr, nullptr) { +} + +buffer::buffer(void* p, std::size_t s, destructor_t d) + : p_(p_->make(p, s, d, nullptr)) { +} + +buffer::buffer(void* p, std::size_t s, destructor_t d, void* additional) + : p_(p_->make(p, s, d, additional)) { +} + +buffer::buffer(void* p, std::size_t s) + : buffer(p, s, nullptr) { +} + +buffer::buffer(char const & c) + : buffer(const_cast(&c), 1) { +} + +buffer::buffer(buffer&& rhs) + : buffer() { + swap(rhs); +} + +buffer::~buffer() { + p_->clear(); +} + +void buffer::swap(buffer& rhs) { + std::swap(p_, rhs.p_); +} + +buffer& buffer::operator=(buffer rhs) { + swap(rhs); + return *this; +} + +bool buffer::empty() const noexcept { + return (impl(p_)->p_ == nullptr) || (impl(p_)->s_ == 0); +} + +void* buffer::data() noexcept { + return impl(p_)->p_; +} + +void const * buffer::data() const noexcept { + return impl(p_)->p_; +} + +std::size_t buffer::size() const noexcept { + return impl(p_)->s_; +} + +} // namespace ipc diff --git a/human_render/Ipc/libipc/buffer.h b/human_render/Ipc/libipc/buffer.h new file mode 100644 index 0000000..5362114 --- /dev/null +++ b/human_render/Ipc/libipc/buffer.h @@ -0,0 +1,67 @@ +#pragma once + +#include +#include +#include +#include + +#include "libipc/def.h" + +namespace ipc { + +class buffer { +public: + using destructor_t = void (*)(void*, std::size_t); + + buffer(); + + buffer(void* p, std::size_t s, destructor_t d); + buffer(void* p, std::size_t s, destructor_t d, void* additional); + buffer(void* p, std::size_t s); + + template + explicit buffer(byte_t const (& data)[N]) + : buffer(data, sizeof(data)) { + } + explicit buffer(char const & c); + + buffer(buffer&& rhs); + ~buffer(); + + void swap(buffer& rhs); + buffer& operator=(buffer rhs); + + bool empty() const noexcept; + + void * data() noexcept; + void const * data() const noexcept; + + template + T get() const { return T(data()); } + + std::size_t size() const noexcept; + + std::tuple to_tuple() { + return std::make_tuple(data(), size()); + } + + std::tuple to_tuple() const { + return std::make_tuple(data(), size()); + } + + std::vector to_vector() const { + return { + get(), + get() + size() + }; + } + + friend bool operator==(buffer const & b1, buffer const & b2); + friend bool operator!=(buffer const & b1, buffer const & b2); + +private: + class buffer_; + buffer_* p_; +}; + +} // namespace ipc diff --git a/human_render/Ipc/libipc/circ/elem_array.h b/human_render/Ipc/libipc/circ/elem_array.h new file mode 100644 index 0000000..0759613 --- /dev/null +++ b/human_render/Ipc/libipc/circ/elem_array.h @@ -0,0 +1,141 @@ +#pragma once + +#include // std::atomic +#include +#include +#include + +#include "libipc/def.h" +#include "libipc/rw_lock.h" + +#include "libipc/circ/elem_def.h" +#include "libipc/platform/detail.h" + +namespace ipc { +namespace circ { + +template +class elem_array : public ipc::circ::conn_head { +public: + using base_t = ipc::circ::conn_head; + using policy_t = Policy; + using cursor_t = decltype(std::declval().cursor()); + using elem_t = typename policy_t::template elem_t; + + enum : std::size_t { + head_size = sizeof(base_t) + sizeof(policy_t), + data_size = DataSize, + elem_max = (std::numeric_limits>::max)() + 1, // default is 255 + 1 + elem_size = sizeof(elem_t), + block_size = elem_size * elem_max + }; + +private: + policy_t head_; + elem_t block_[elem_max] {}; + + /** + * \remarks 'warning C4348: redefinition of default parameter' with MSVC. + * \see + * - https://stackoverflow.com/questions/12656239/redefinition-of-default-template-parameter + * - https://developercommunity.visualstudio.com/content/problem/425978/incorrect-c4348-warning-in-nested-template-declara.html + */ + template ::is_multi_producer*/> + struct sender_checker; + + template + struct sender_checker { + constexpr static bool connect() noexcept { + // always return true + return true; + } + constexpr static void disconnect() noexcept {} + }; + + template + struct sender_checker { + bool connect() noexcept { + return !flag_.test_and_set(std::memory_order_acq_rel); + } + void disconnect() noexcept { + flag_.clear(); + } + + private: + // in shm, it should be 0 whether it's initialized or not. + std::atomic_flag flag_ = ATOMIC_FLAG_INIT; + }; + + template ::is_multi_consumer*/> + struct receiver_checker; + + template + struct receiver_checker { + constexpr static cc_t connect(base_t &conn) noexcept { + return conn.connect(); + } + constexpr static cc_t disconnect(base_t &conn, cc_t cc_id) noexcept { + return conn.disconnect(cc_id); + } + }; + + template + struct receiver_checker : protected sender_checker { + cc_t connect(base_t &conn) noexcept { + return sender_checker::connect() ? conn.connect() : 0; + } + cc_t disconnect(base_t &conn, cc_t cc_id) noexcept { + sender_checker::disconnect(); + return conn.disconnect(cc_id); + } + }; + + sender_checker ::is_multi_producer> s_ckr_; + receiver_checker::is_multi_consumer> r_ckr_; + + // make these be private + using base_t::connect; + using base_t::disconnect; + +public: + bool connect_sender() noexcept { + return s_ckr_.connect(); + } + + void disconnect_sender() noexcept { + return s_ckr_.disconnect(); + } + + cc_t connect_receiver() noexcept { + return r_ckr_.connect(*this); + } + + cc_t disconnect_receiver(cc_t cc_id) noexcept { + return r_ckr_.disconnect(*this, cc_id); + } + + cursor_t cursor() const noexcept { + return head_.cursor(); + } + + template + bool push(Q* que, F&& f) { + return head_.push(que, std::forward(f), block_); + } + + template + bool force_push(Q* que, F&& f) { + return head_.force_push(que, std::forward(f), block_); + } + + template + bool pop(Q* que, cursor_t* cur, F&& f, R&& out) { + if (cur == nullptr) return false; + return head_.pop(que, *cur, std::forward(f), std::forward(out), block_); + } +}; + +} // namespace circ +} // namespace ipc diff --git a/human_render/Ipc/libipc/circ/elem_def.h b/human_render/Ipc/libipc/circ/elem_def.h new file mode 100644 index 0000000..4fd5797 --- /dev/null +++ b/human_render/Ipc/libipc/circ/elem_def.h @@ -0,0 +1,109 @@ +#pragma once + +#include +#include +#include +#include + +#include "libipc/def.h" +#include "libipc/rw_lock.h" + +#include "libipc/platform/detail.h" + +namespace ipc { +namespace circ { + +using u1_t = ipc::uint_t<8>; +using u2_t = ipc::uint_t<32>; + +/** only supports max 32 connections in broadcast mode */ +using cc_t = u2_t; + +constexpr u1_t index_of(u2_t c) noexcept { + return static_cast(c); +} + +class conn_head_base { +protected: + std::atomic cc_{0}; // connections + ipc::spin_lock lc_; + std::atomic constructed_{false}; + +public: + void init() { + /* DCLP */ + if (!constructed_.load(std::memory_order_acquire)) { + IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lc_); + if (!constructed_.load(std::memory_order_relaxed)) { + ::new (this) conn_head_base; + constructed_.store(true, std::memory_order_release); + } + } + } + + conn_head_base() = default; + conn_head_base(conn_head_base const &) = delete; + conn_head_base &operator=(conn_head_base const &) = delete; + + cc_t connections(std::memory_order order = std::memory_order_acquire) const noexcept { + return this->cc_.load(order); + } +}; + +template ::is_broadcast> +class conn_head; + +template +class conn_head : public conn_head_base { +public: + cc_t connect() noexcept { + for (unsigned k = 0;; ipc::yield(k)) { + cc_t curr = this->cc_.load(std::memory_order_acquire); + cc_t next = curr | (curr + 1); // find the first 0, and set it to 1. + if (next == 0) { + // connection-slot is full. + return 0; + } + if (this->cc_.compare_exchange_weak(curr, next, std::memory_order_release)) { + return next ^ curr; // return connected id + } + } + } + + cc_t disconnect(cc_t cc_id) noexcept { + return this->cc_.fetch_and(~cc_id, std::memory_order_acq_rel) & ~cc_id; + } + + std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept { + cc_t cur = this->cc_.load(order); + cc_t cnt; // accumulates the total bits set in cc + for (cnt = 0; cur; ++cnt) cur &= cur - 1; + return cnt; + } +}; + +template +class conn_head : public conn_head_base { +public: + cc_t connect() noexcept { + return this->cc_.fetch_add(1, std::memory_order_relaxed) + 1; + } + + cc_t disconnect(cc_t cc_id) noexcept { + if (cc_id == ~static_cast(0u)) { + // clear all connections + this->cc_.store(0, std::memory_order_relaxed); + return 0u; + } + else { + return this->cc_.fetch_sub(1, std::memory_order_relaxed) - 1; + } + } + + std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept { + return this->connections(order); + } +}; + +} // namespace circ +} // namespace ipc diff --git a/human_render/Ipc/libipc/condition.h b/human_render/Ipc/libipc/condition.h new file mode 100644 index 0000000..2191f2d --- /dev/null +++ b/human_render/Ipc/libipc/condition.h @@ -0,0 +1,38 @@ +#pragma once + +#include // std::uint64_t + +#include "libipc/def.h" +#include "libipc/mutex.h" + +namespace ipc { +namespace sync { + +class condition { + condition(condition const &) = delete; + condition &operator=(condition const &) = delete; + +public: + condition(); + explicit condition(char const *name); + ~condition(); + + void const *native() const noexcept; + void *native() noexcept; + + bool valid() const noexcept; + + bool open(char const *name) noexcept; + void close() noexcept; + + bool wait(ipc::sync::mutex &mtx, std::uint64_t tm = ipc::invalid_value) noexcept; + bool notify(ipc::sync::mutex &mtx) noexcept; + bool broadcast(ipc::sync::mutex &mtx) noexcept; + +private: + class condition_; + condition_* p_; +}; + +} // namespace sync +} // namespace ipc diff --git a/human_render/Ipc/libipc/def.h b/human_render/Ipc/libipc/def.h new file mode 100644 index 0000000..45cd780 --- /dev/null +++ b/human_render/Ipc/libipc/def.h @@ -0,0 +1,73 @@ +#pragma once + +#include +#include +#include // std::numeric_limits +#include +#include + +namespace ipc { + +// types + +using byte_t = std::uint8_t; + +template +struct uint; + +template <> struct uint<8 > { using type = std::uint8_t ; }; +template <> struct uint<16> { using type = std::uint16_t; }; +template <> struct uint<32> { using type = std::uint32_t; }; +template <> struct uint<64> { using type = std::uint64_t; }; + +template +using uint_t = typename uint::type; + +// constants + +enum : std::uint32_t { + invalid_value = (std::numeric_limits::max)(), + default_timeout = 100, // ms +}; + +enum : std::size_t { + data_length = 64, + large_msg_limit = data_length, + large_msg_align = 1024, + large_msg_cache = 32, +}; + +enum class relat { // multiplicity of the relationship + single, + multi +}; + +enum class trans { // transmission + unicast, + broadcast +}; + +// producer-consumer policy flag + +template +struct wr {}; + +template +struct relat_trait; + +template +struct relat_trait> { + constexpr static bool is_multi_producer = (Rp == relat::multi); + constexpr static bool is_multi_consumer = (Rc == relat::multi); + constexpr static bool is_broadcast = (Ts == trans::broadcast); +}; + +template