HumanRender/human_render/Ipc/Ipc.cpp

129 lines
3.0 KiB
C++
Raw Permalink Normal View History

2024-12-22 15:24:02 +00:00
#include "Ipc/Ipc.h"
#include <functional>
#include <locale>
#include <codecvt>
#include "Ipc/ReadCallback.h"
std::shared_ptr<Ipc> Ipc::create(const char* reader_name, const char* writer_name) {
struct Creator : public Ipc {
Creator(const char* reader_name, const char* writer_name) : Ipc(reader_name, writer_name) {}
~Creator() override = default;
};
return std::make_shared<Creator>(reader_name, writer_name);
}
Ipc::Ipc(const char* reader_name, const char* writer_name) noexcept
: reader_name_(reader_name)
, writer_name_(writer_name) {
sender_ = std::make_unique<ipc::channel>(toUtf8(reader_name_).c_str(), ipc::sender);
fprintf(stderr, "%s, %s\n", reader_name, writer_name);
}
Ipc::~Ipc() {
stop();
}
bool Ipc::listen() {
if (!reciver_stop_.load(std::memory_order_acquire)) {
return false;
}
reciver_stop_.store(false);
std::weak_ptr<Ipc> wThis = shared_from_this();
reciver_thread_ = std::move(std::thread(std::bind(&Ipc::doReciver, this, wThis)));
return true;
}
void Ipc::stop() {
if (reciver_stop_.load(std::memory_order_acquire)) {
return;
}
reciver_stop_.store(true, std::memory_order_release);
sender_->disconnect();
receiver_->disconnect();
if (reciver_thread_.joinable()) {
reciver_thread_.join();
}
}
bool Ipc::send(const char* data, unsigned int size) {
if (!sender_) {
return false;
}
return sender_->send(data, size);
}
bool Ipc::isConnected() {
/* if (receiver_) {
return receiver_->is_connected();
}*/
return false;
}
void Ipc::reConnect() {
sender_->reconnect(ipc::sender);
if (receiver_) {
receiver_->disconnect();
receiver_->reconnect(ipc::receiver);
}
}
void Ipc::registReadCallback(const std::shared_ptr<IReaderCallback>& reader) {
std::lock_guard<std::mutex> lock(mutex_);
auto itor = std::find(reader_callbacks_.begin(), reader_callbacks_.end(), reader);
if (reader_callbacks_.end() != itor) {
return;
}
reader_callbacks_.emplace_back(reader);
}
void Ipc::unregistReadCallback(const std::shared_ptr<IReaderCallback>& reader) {
std::lock_guard<std::mutex> lock(mutex_);
auto itor = std::find(reader_callbacks_.begin(), reader_callbacks_.end(), reader);
if (reader_callbacks_.end() != itor) {
return;
}
reader_callbacks_.erase(itor);
}
void Ipc::doReciver(std::weak_ptr<Ipc> wThis) {
std::shared_ptr<Ipc> self(wThis.lock());
if (!self) {
return;
}
receiver_ = std::make_unique<ipc::channel>(writer_name_.c_str(), ipc::receiver);
while (!reciver_stop_.load(std::memory_order_acquire)) {
ipc::buff_t buffer = receiver_->recv(5);
if (buffer.empty()) {
continue;
}
const char* data = buffer.get<const char*>();
onReciveer(data, static_cast<unsigned int>(buffer.size()));
}
}
void Ipc::onReciveer(const char* buffer, unsigned int size) {
std::lock_guard<std::mutex> lock(mutex_);
for (auto& read : reader_callbacks_) {
read->onRead(buffer, size);
}
}
std::string Ipc::toUtf8(const std::string& str) {
std::wstring_convert<std::codecvt_utf8<wchar_t>> converter;
std::wstring wide_str = converter.from_bytes(str);
return converter.to_bytes(wide_str);
}