From 3475f0751f645f4cb27ed282e4188b2741f2f08e Mon Sep 17 00:00:00 2001 From: Green Sky Date: Sat, 5 Oct 2024 11:17:44 +0200 Subject: [PATCH] prep for toxav multithreading --- src/tox_av.cpp | 23 +++ src/tox_av.hpp | 17 +++ src/tox_av_voip_model.cpp | 288 +++++++++++++++++++++++--------------- src/tox_av_voip_model.hpp | 38 +++++ 4 files changed, 257 insertions(+), 109 deletions(-) diff --git a/src/tox_av.cpp b/src/tox_av.cpp index 4e6c044b..be29ad44 100644 --- a/src/tox_av.cpp +++ b/src/tox_av.cpp @@ -80,6 +80,17 @@ uint32_t ToxAVI::toxavIterationInterval(void) const { void ToxAVI::toxavIterate(void) { toxav_iterate(_tox_av); + + dispatch( + ToxAV_Event::iterate_audio, + Events::IterateAudio{ + } + ); + dispatch( + ToxAV_Event::iterate_video, + Events::IterateVideo{ + } + ); } uint32_t ToxAVI::toxavAudioIterationInterval(void) const { @@ -88,6 +99,12 @@ uint32_t ToxAVI::toxavAudioIterationInterval(void) const { void ToxAVI::toxavAudioIterate(void) { toxav_audio_iterate(_tox_av); + + dispatch( + ToxAV_Event::iterate_audio, + Events::IterateAudio{ + } + ); } uint32_t ToxAVI::toxavVideoIterationInterval(void) const { @@ -96,6 +113,12 @@ uint32_t ToxAVI::toxavVideoIterationInterval(void) const { void ToxAVI::toxavVideoIterate(void) { toxav_video_iterate(_tox_av); + + dispatch( + ToxAV_Event::iterate_video, + Events::IterateVideo{ + } + ); } Toxav_Err_Call ToxAVI::toxavCall(uint32_t friend_number, uint32_t audio_bit_rate, uint32_t video_bit_rate) { diff --git a/src/tox_av.hpp b/src/tox_av.hpp index 7771118d..34746a92 100644 --- a/src/tox_av.hpp +++ b/src/tox_av.hpp @@ -55,6 +55,16 @@ namespace /*toxav*/ Events { int32_t vstride; }; + // event fired on a/av thread every iterate + struct IterateAudio { + //float time_delta; + }; + + // event fired on v/av thread every iterate + struct IterateVideo { + //float time_delta; + }; + } // Event enum class ToxAV_Event : uint32_t { @@ -65,6 +75,9 @@ enum class ToxAV_Event : uint32_t { friend_audio_frame, friend_video_frame, + iterate_audio, + iterate_video, + MAX }; @@ -79,11 +92,15 @@ struct ToxAVEventI { virtual bool onEvent(const Events::FriendVideoBitrate&) { return false; } virtual bool onEvent(const Events::FriendAudioFrame&) { return false; } virtual bool onEvent(const Events::FriendVideoFrame&) { return false; } + virtual bool onEvent(const Events::IterateAudio&) { return false; } + virtual bool onEvent(const Events::IterateVideo&) { return false; } }; using ToxAVEventProviderI = EventProviderI; // TODO: seperate out implementation from interface struct ToxAVI : public ToxAVEventProviderI { + // tox and toxav internally are mutex protected + // BUT only if "experimental_thread_safety" is enabled Tox* _tox = nullptr; ToxAV* _tox_av = nullptr; diff --git a/src/tox_av_voip_model.cpp b/src/tox_av_voip_model.cpp index fea097f3..e1f74dfe 100644 --- a/src/tox_av_voip_model.cpp +++ b/src/tox_av_voip_model.cpp @@ -185,7 +185,8 @@ void ToxAVVoIPModel::addAudioSink(ObjectHandle session, uint32_t friend_number) ObjectHandle outgoing_audio {_os.registry(), _os.registry().create()}; auto new_asink = std::make_unique(_av, friend_number); - outgoing_audio.emplace(new_asink.get()); + auto* new_asink_ptr = new_asink.get(); + outgoing_audio.emplace(new_asink_ptr); outgoing_audio.emplace>(std::move(new_asink)); outgoing_audio.emplace(Components::StreamSink::create("ToxAV Friend Call Outgoing Audio")); @@ -201,6 +202,9 @@ void ToxAVVoIPModel::addAudioSink(ObjectHandle session, uint32_t friend_number) // TODO: tie session to stream _os.throwEventConstruct(outgoing_audio); + + std::lock_guard lg{_audio_sinks_mutex}; + _audio_sinks.push_back(new_asink_ptr); } void ToxAVVoIPModel::addVideoSource(ObjectHandle session, uint32_t friend_number) { @@ -234,7 +238,8 @@ void ToxAVVoIPModel::addVideoSink(ObjectHandle session, uint32_t friend_number) ObjectHandle outgoing_video {_os.registry(), _os.registry().create()}; auto new_vsink = std::make_unique(_av, friend_number); - outgoing_video.emplace(new_vsink.get()); + auto* new_vsink_ptr = new_vsink.get(); + outgoing_video.emplace(new_vsink_ptr); outgoing_video.emplace>(std::move(new_vsink)); outgoing_video.emplace(Components::StreamSink::create("ToxAV Friend Call Outgoing Video")); @@ -250,6 +255,9 @@ void ToxAVVoIPModel::addVideoSink(ObjectHandle session, uint32_t friend_number) // TODO: tie session to stream _os.throwEventConstruct(outgoing_video); + + std::lock_guard lg{_video_sinks_mutex}; + _video_sinks.push_back(new_vsink_ptr); } void ToxAVVoIPModel::destroySession(ObjectHandle session) { @@ -282,6 +290,20 @@ void ToxAVVoIPModel::destroySession(ObjectHandle session) { _video_sources.erase(it_vsrc); } } + if (session.all_of()) { + std::lock_guard lg{_audio_sinks_mutex}; + auto it = std::find(_audio_sinks.cbegin(), _audio_sinks.cend(), session.get()); + if (it != _audio_sinks.cend()) { + _audio_sinks.erase(it); + } + } + if (session.all_of()) { + std::lock_guard lg{_video_sinks_mutex}; + auto it = std::find(_video_sinks.cbegin(), _video_sinks.cend(), session.get()); + if (it != _video_sinks.cend()) { + _video_sinks.erase(it); + } + } // destory sources if (auto* ss = session.try_get(); ss != nullptr) { @@ -306,34 +328,10 @@ void ToxAVVoIPModel::destroySession(ObjectHandle session) { _os.registry().destroy(session); } -ToxAVVoIPModel::ToxAVVoIPModel(ObjectStore2& os, ToxAVI& av, Contact3Registry& cr, ToxContactModel2& tcm) : - _os(os), _av(av), _cr(cr), _tcm(tcm) -{ - _av.subscribe(this, ToxAV_Event::friend_call); - _av.subscribe(this, ToxAV_Event::friend_call_state); - _av.subscribe(this, ToxAV_Event::friend_audio_bitrate); - _av.subscribe(this, ToxAV_Event::friend_video_bitrate); - _av.subscribe(this, ToxAV_Event::friend_audio_frame); - _av.subscribe(this, ToxAV_Event::friend_video_frame); - - // attach to all tox friend contacts - - for (const auto& [cv, _] : _cr.view().each()) { - _cr.emplace(cv, this); - } - // TODO: events -} - -ToxAVVoIPModel::~ToxAVVoIPModel(void) { - for (const auto& [ov, voipmodel] : _os.registry().view().each()) { - if (voipmodel == this) { - destroySession(_os.objectHandle(ov)); - } - } -} - -void ToxAVVoIPModel::tick(void) { - for (const auto& [oc, asink] : _os.registry().view().each()) { +void ToxAVVoIPModel::audio_thread_tick(void) { + //for (const auto& [oc, asink] : _os.registry().view().each()) { + std::lock_guard lg{_audio_sinks_mutex}; + for (const auto& asink : _audio_sinks) { if (!asink->_writer) { continue; } @@ -366,8 +364,12 @@ void ToxAVVoIPModel::tick(void) { } } } +} - for (const auto& [oc, vsink] : _os.registry().view().each()) { +void ToxAVVoIPModel::video_thread_tick(void) { + //for (const auto& [oc, vsink] : _os.registry().view().each()) { + std::lock_guard lg{_video_sinks_mutex}; + for (const auto& vsink : _video_sinks) { if (!vsink->_writer) { continue; } @@ -401,6 +403,134 @@ void ToxAVVoIPModel::tick(void) { } } +void ToxAVVoIPModel::handleEvent(const Events::FriendCall& e) { + // new incoming call, create voip session, ready to be accepted + // (or rejected...) + + const auto session_contact = _tcm.getContactFriend(e.friend_number); + if (!_cr.valid(session_contact)) { + return; + } + + ObjectHandle new_session {_os.registry(), _os.registry().create()}; + + new_session.emplace(this); + new_session.emplace(); // ?? + new_session.emplace(session_contact); // in 1on1 its always the same contact, might leave blank + new_session.emplace(session_contact); + new_session.emplace().state = Components::VoIP::SessionState::State::RINGING; + new_session.emplace(e.audio_enabled, e.video_enabled); + + _os.throwEventConstruct(new_session); +} + +void ToxAVVoIPModel::handleEvent(const Events::FriendCallState& e) { + const auto session_contact = _tcm.getContactFriend(e.friend_number); + if (!_cr.valid(session_contact)) { + return; + } + + ToxAVFriendCallState s{e.state}; + + // find session(s?) + // TODO: keep lookup table + for (const auto& [ov, voipmodel] : _os.registry().view().each()) { + if (voipmodel == this) { + auto o = _os.objectHandle(ov); + + if (!o.all_of()) { + continue; + } + if (session_contact != o.get().c) { + continue; + } + + if (s.is_error() || s.is_finished()) { + // destroy call + destroySession(o); + } else { + // remote accepted our call, or av send/recv conditions changed? + o.get().state = Components::VoIP::SessionState::State::CONNECTED; // set to in call ?? + + if (s.is_accepting_a() && !o.all_of()) { + addAudioSink(o, e.friend_number); + } else if (!s.is_accepting_a() && o.all_of()) { + // remove asink? + } + + // video + if (s.is_accepting_v() && !o.all_of()) { + addVideoSink(o, e.friend_number); + } else if (!s.is_accepting_v() && o.all_of()) { + // remove vsink? + } + + // add/update sources + // audio + if (s.is_sending_a() && !o.all_of()) { + addAudioSource(o, e.friend_number); + } else if (!s.is_sending_a() && o.all_of()) { + // remove asrc? + } + + // video + if (s.is_sending_v() && !o.all_of()) { + addVideoSource(o, e.friend_number); + } else if (!s.is_sending_v() && o.all_of()) { + // remove vsrc? + } + } + } + } +} + +ToxAVVoIPModel::ToxAVVoIPModel(ObjectStore2& os, ToxAVI& av, Contact3Registry& cr, ToxContactModel2& tcm) : + _os(os), _av(av), _cr(cr), _tcm(tcm) +{ + _av.subscribe(this, ToxAV_Event::friend_call); + _av.subscribe(this, ToxAV_Event::friend_call_state); + _av.subscribe(this, ToxAV_Event::friend_audio_bitrate); + _av.subscribe(this, ToxAV_Event::friend_video_bitrate); + _av.subscribe(this, ToxAV_Event::friend_audio_frame); + _av.subscribe(this, ToxAV_Event::friend_video_frame); + _av.subscribe(this, ToxAV_Event::iterate_audio); + _av.subscribe(this, ToxAV_Event::iterate_video); + + // attach to all tox friend contacts + + for (const auto& [cv, _] : _cr.view().each()) { + _cr.emplace(cv, this); + } + // TODO: events +} + +ToxAVVoIPModel::~ToxAVVoIPModel(void) { + for (const auto& [ov, voipmodel] : _os.registry().view().each()) { + if (voipmodel == this) { + destroySession(_os.objectHandle(ov)); + } + } +} + +void ToxAVVoIPModel::tick(void) { + std::lock_guard lg{_e_queue_mutex}; + while (!_e_queue.empty()) { + const auto& e_var = _e_queue.front(); + + if (std::holds_alternative(e_var)) { + const auto& e = std::get(e_var); + handleEvent(e); + } else if (std::holds_alternative(e_var)) { + const auto& e = std::get(e_var); + handleEvent(e); + } else { + assert(false && "unk event"); + } + + _e_queue.pop_front(); + } +} + ObjectHandle ToxAVVoIPModel::enter(const Contact3 c, const Components::VoIP::DefaultConfig& defaults) { if (!_cr.all_of(c)) { return {}; @@ -529,94 +659,24 @@ bool ToxAVVoIPModel::leave(ObjectHandle session) { } bool ToxAVVoIPModel::onEvent(const Events::FriendCall& e) { - // new incoming call, create voip session, ready to be accepted - // (or rejected...) - - const auto session_contact = _tcm.getContactFriend(e.friend_number); - if (!_cr.valid(session_contact)) { - return false; - } - - ObjectHandle new_session {_os.registry(), _os.registry().create()}; - - new_session.emplace(this); - new_session.emplace(); // ?? - new_session.emplace(session_contact); // in 1on1 its always the same contact, might leave blank - new_session.emplace(session_contact); - new_session.emplace().state = Components::VoIP::SessionState::State::RINGING; - new_session.emplace(e.audio_enabled, e.video_enabled); - - _os.throwEventConstruct(new_session); - return true; + std::lock_guard lg{_e_queue_mutex}; + _e_queue.push_back(e); + return true; // false? } bool ToxAVVoIPModel::onEvent(const Events::FriendCallState& e) { - const auto session_contact = _tcm.getContactFriend(e.friend_number); - if (!_cr.valid(session_contact)) { - return false; - } - - ToxAVFriendCallState s{e.state}; - - // find session(s?) - // TODO: keep lookup table - for (const auto& [ov, voipmodel] : _os.registry().view().each()) { - if (voipmodel == this) { - auto o = _os.objectHandle(ov); - - if (!o.all_of()) { - continue; - } - if (session_contact != o.get().c) { - continue; - } - - if (s.is_error() || s.is_finished()) { - // destroy call - destroySession(o); - } else { - // remote accepted our call, or av send/recv conditions changed? - o.get().state = Components::VoIP::SessionState::State::CONNECTED; // set to in call ?? - - if (s.is_accepting_a() && !o.all_of()) { - addAudioSink(o, e.friend_number); - } else if (!s.is_accepting_a() && o.all_of()) { - // remove asink? - } - - // video - if (s.is_accepting_v() && !o.all_of()) { - addVideoSink(o, e.friend_number); - } else if (!s.is_accepting_v() && o.all_of()) { - // remove vsink? - } - - // add/update sources - // audio - if (s.is_sending_a() && !o.all_of()) { - addAudioSource(o, e.friend_number); - } else if (!s.is_sending_a() && o.all_of()) { - // remove asrc? - } - - // video - if (s.is_sending_v() && !o.all_of()) { - addVideoSource(o, e.friend_number); - } else if (!s.is_sending_v() && o.all_of()) { - // remove vsrc? - } - } - } - } - - return true; + std::lock_guard lg{_e_queue_mutex}; + _e_queue.push_back(e); + return true; // false? } bool ToxAVVoIPModel::onEvent(const Events::FriendAudioBitrate&) { + // TODO: use this info return false; } bool ToxAVVoIPModel::onEvent(const Events::FriendVideoBitrate&) { + // TODO: use this info return false; } @@ -708,6 +768,16 @@ bool ToxAVVoIPModel::onEvent(const Events::FriendVideoFrame& e) { }); SDL_DestroySurface(new_surf); + return true; +} + +bool ToxAVVoIPModel::onEvent(const Events::IterateAudio&) { + audio_thread_tick(); + return false; +} + +bool ToxAVVoIPModel::onEvent(const Events::IterateVideo&) { + video_thread_tick(); return false; } diff --git a/src/tox_av_voip_model.hpp b/src/tox_av_voip_model.hpp index 277588a5..fd024a38 100644 --- a/src/tox_av_voip_model.hpp +++ b/src/tox_av_voip_model.hpp @@ -7,6 +7,13 @@ #include "./tox_av.hpp" #include +#include +#include +#include + +// fwd +struct ToxAVCallAudioSink; +struct ToxAVCallVideoSink; class ToxAVVoIPModel : protected ToxAVEventI, public VoIPModelI { ObjectStore2& _os; @@ -14,6 +21,26 @@ class ToxAVVoIPModel : protected ToxAVEventI, public VoIPModelI { Contact3Registry& _cr; ToxContactModel2& _tcm; + uint64_t _pad0; + // these events need to be worked on the main thread instead + // TODO: replac ewith lockless queue + std::deque< + std::variant< + Events::FriendCall, + Events::FriendCallState + // bitrates + >> _e_queue; + std::mutex _e_queue_mutex; + uint64_t _pad1; + + std::vector _audio_sinks; + std::mutex _audio_sinks_mutex; + uint64_t _pad2; + + std::vector _video_sinks; + std::mutex _video_sinks_mutex; + uint64_t _pad3; + // for faster lookup std::unordered_map _audio_sources; std::unordered_map _video_sources; @@ -26,10 +53,19 @@ class ToxAVVoIPModel : protected ToxAVEventI, public VoIPModelI { void destroySession(ObjectHandle session); + // TODO: this needs to move to the toxav thread + // we could use "events" as pre/post audio/video iterate... + void audio_thread_tick(void); + void video_thread_tick(void); + + void handleEvent(const Events::FriendCall&); + void handleEvent(const Events::FriendCallState&); + public: ToxAVVoIPModel(ObjectStore2& os, ToxAVI& av, Contact3Registry& cr, ToxContactModel2& tcm); ~ToxAVVoIPModel(void); + // handle events coming from toxav thread(s) void tick(void); public: // voip model @@ -44,5 +80,7 @@ class ToxAVVoIPModel : protected ToxAVEventI, public VoIPModelI { bool onEvent(const Events::FriendVideoBitrate&) override; bool onEvent(const Events::FriendAudioFrame&) override; bool onEvent(const Events::FriendVideoFrame&) override; + bool onEvent(const Events::IterateAudio&) override; + bool onEvent(const Events::IterateVideo&) override; };