129 lines
3.0 KiB
C++
129 lines
3.0 KiB
C++
|
#include "Ipc/Ipc.h"
|
||
|
|
||
|
#include <functional>
|
||
|
#include <locale>
|
||
|
#include <codecvt>
|
||
|
|
||
|
#include "Ipc/ReadCallback.h"
|
||
|
|
||
|
std::shared_ptr<Ipc> Ipc::create(const char* reader_name, const char* writer_name) {
|
||
|
struct Creator : public Ipc {
|
||
|
Creator(const char* reader_name, const char* writer_name) : Ipc(reader_name, writer_name) {}
|
||
|
~Creator() override = default;
|
||
|
};
|
||
|
return std::make_shared<Creator>(reader_name, writer_name);
|
||
|
}
|
||
|
|
||
|
Ipc::Ipc(const char* reader_name, const char* writer_name) noexcept
|
||
|
: reader_name_(reader_name)
|
||
|
, writer_name_(writer_name) {
|
||
|
sender_ = std::make_unique<ipc::channel>(toUtf8(reader_name_).c_str(), ipc::sender);
|
||
|
fprintf(stderr, "%s, %s\n", reader_name, writer_name);
|
||
|
}
|
||
|
|
||
|
Ipc::~Ipc() {
|
||
|
stop();
|
||
|
}
|
||
|
|
||
|
bool Ipc::listen() {
|
||
|
if (!reciver_stop_.load(std::memory_order_acquire)) {
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
reciver_stop_.store(false);
|
||
|
|
||
|
std::weak_ptr<Ipc> wThis = shared_from_this();
|
||
|
reciver_thread_ = std::move(std::thread(std::bind(&Ipc::doReciver, this, wThis)));
|
||
|
return true;
|
||
|
}
|
||
|
|
||
|
void Ipc::stop() {
|
||
|
if (reciver_stop_.load(std::memory_order_acquire)) {
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
reciver_stop_.store(true, std::memory_order_release);
|
||
|
|
||
|
sender_->disconnect();
|
||
|
receiver_->disconnect();
|
||
|
|
||
|
if (reciver_thread_.joinable()) {
|
||
|
reciver_thread_.join();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
bool Ipc::send(const char* data, unsigned int size) {
|
||
|
if (!sender_) {
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
return sender_->send(data, size);
|
||
|
}
|
||
|
|
||
|
|
||
|
bool Ipc::isConnected() {
|
||
|
/* if (receiver_) {
|
||
|
return receiver_->is_connected();
|
||
|
}*/
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
|
||
|
void Ipc::reConnect() {
|
||
|
sender_->reconnect(ipc::sender);
|
||
|
if (receiver_) {
|
||
|
receiver_->disconnect();
|
||
|
receiver_->reconnect(ipc::receiver);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
void Ipc::registReadCallback(const std::shared_ptr<IReaderCallback>& reader) {
|
||
|
std::lock_guard<std::mutex> lock(mutex_);
|
||
|
|
||
|
auto itor = std::find(reader_callbacks_.begin(), reader_callbacks_.end(), reader);
|
||
|
if (reader_callbacks_.end() != itor) {
|
||
|
return;
|
||
|
}
|
||
|
reader_callbacks_.emplace_back(reader);
|
||
|
}
|
||
|
|
||
|
void Ipc::unregistReadCallback(const std::shared_ptr<IReaderCallback>& reader) {
|
||
|
std::lock_guard<std::mutex> lock(mutex_);
|
||
|
|
||
|
auto itor = std::find(reader_callbacks_.begin(), reader_callbacks_.end(), reader);
|
||
|
if (reader_callbacks_.end() != itor) {
|
||
|
return;
|
||
|
}
|
||
|
reader_callbacks_.erase(itor);
|
||
|
}
|
||
|
|
||
|
void Ipc::doReciver(std::weak_ptr<Ipc> wThis) {
|
||
|
std::shared_ptr<Ipc> self(wThis.lock());
|
||
|
if (!self) {
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
receiver_ = std::make_unique<ipc::channel>(writer_name_.c_str(), ipc::receiver);
|
||
|
while (!reciver_stop_.load(std::memory_order_acquire)) {
|
||
|
ipc::buff_t buffer = receiver_->recv(5);
|
||
|
if (buffer.empty()) {
|
||
|
continue;
|
||
|
}
|
||
|
const char* data = buffer.get<const char*>();
|
||
|
onReciveer(data, static_cast<unsigned int>(buffer.size()));
|
||
|
}
|
||
|
}
|
||
|
|
||
|
void Ipc::onReciveer(const char* buffer, unsigned int size) {
|
||
|
std::lock_guard<std::mutex> lock(mutex_);
|
||
|
for (auto& read : reader_callbacks_) {
|
||
|
read->onRead(buffer, size);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
std::string Ipc::toUtf8(const std::string& str) {
|
||
|
std::wstring_convert<std::codecvt_utf8<wchar_t>> converter;
|
||
|
std::wstring wide_str = converter.from_bytes(str);
|
||
|
return converter.to_bytes(wide_str);
|
||
|
}
|