#include "./tox_ipc_server.hpp" #include "./zpp_rpc_cldev.hpp" #include #include #include #include #include ToxIPCServer::ToxIPCServer(ToxI& t, std::mutex& t_m) : _t(t), _t_m(t_m) { } ToxIPCServer::~ToxIPCServer(void) { ipc_mem_close(&_mem_rpc); ipc_sem_close(&_sem_rpc_c); ipc_sem_close(&_sem_rpc_s); ipc_mem_close(&_mem_events); ipc_sem_close(&_sem_events_lock); } bool ToxIPCServer::connect(void) { { // rpc ipc_mem_init(&_mem_rpc, "solana_toxI_rpc.shm", RPC_MEM_SIZE); // 2k prob enough for now // server creates if (ipc_mem_create(&_mem_rpc) != 0) { std::cerr << "rpc shared mem already exists or failed\n"; return false; } // client write lock (server waits on this) ipc_sem_init(&_sem_rpc_c, "solana_toxI_rpc_client.lock"); if (ipc_sem_create(&_sem_rpc_c, 0) != 0) { std::cerr << "failed to create sem_rpc_c\n"; return false; } // server write lock (client waits on this) ipc_sem_init(&_sem_rpc_s, "solana_toxI_rpc_server.lock"); if (ipc_sem_create(&_sem_rpc_s, 0) != 0) { std::cerr << "failed to create sem_rpc_s\n"; return false; } // if no call is in progress, both sems are 0 } { // events ipc_mem_init(&_mem_events, "solana_toxEventsI_events.shm", EVENTS_MEM_SIZE); // server creates if (ipc_mem_create(&_mem_events) != 0) { std::cerr << "events shared mem already exists or failed\n"; return false; } // TODO: do this with the lock? ipc_mem_access(&_mem_events)[0] = EVENTS_READ; // reset // mutex ipc_sem_init(&_sem_events_lock, "solana_toxEventsI_events.lock"); if (ipc_sem_create(&_sem_events_lock, 1) != 0) { std::cerr << "failed to create sem_events_lock\n"; return false; } } return true; } using namespace std::chrono_literals; void ToxIPCServer::run(void) { while (!_quit) { // wait for client call ipc_sem_decrement(&_sem_rpc_c); if (_quit) { break; } zpp::bits::in in{std::span{ipc_mem_access(&_mem_rpc), RPC_MEM_SIZE}}; zpp::bits::out out{std::span{ipc_mem_access(&_mem_rpc), RPC_MEM_SIZE}}; ToxI_rpc::server se{in, out, _t}; // ephimeral, should be cheap { std::lock_guard lg{_t_m}; se.serve().or_throw(); } // signal client ipc_sem_increment(&_sem_rpc_s); } } void ToxIPCServer::start(void) { _thread = std::thread([this](){ this->run(); }); } void ToxIPCServer::stop(void) { _quit = true; ipc_sem_increment(&_sem_rpc_c); // hack, wake up server thread _thread.join(); } void ToxIPCServer::updateEvents(const Tox_Events* events) { if (uint32_t size = tox_events_bytes_size(events); size <= EVENTS_MEM_SIZE-1) { _tmp_events_buf.resize(size); tox_events_get_bytes(events, _tmp_events_buf.data()); } else { std::cerr << "events data bigger than space (" << size << " > " << EVENTS_MEM_SIZE-1 << ")\n"; } } void ToxIPCServer::pub(void) { if (_tmp_events_buf.empty()) { return; // no events, no op } // if noone is reading the events, this loop sleeps ~30ms for (size_t i = 0; i < 10; i++) { // grap mutex ipc_sem_decrement(&_sem_events_lock); // look at first byte if it got read const uint8_t zbyte = ipc_mem_access(&_mem_events)[0]; if (zbyte == EVENTS_WRITTEN) { // if not, release mutex and short sleep ipc_sem_increment(&_sem_events_lock); std::this_thread::sleep_for(3ms); } else { break; } } // after loop we write events (potentially overwriting) // memcpy new events, set first byte std::memcpy(ipc_mem_access(&_mem_events)+1, _tmp_events_buf.data(), _tmp_events_buf.size()); ipc_mem_access(&_mem_events)[0] = EVENTS_WRITTEN; _tmp_events_buf.clear(); // TODO: just mark read, and keep around // release mutex ipc_sem_increment(&_sem_events_lock); }