prep for toxav multithreading

This commit is contained in:
Green Sky 2024-10-05 11:17:44 +02:00
parent 09c8bbfcc6
commit 3475f0751f
No known key found for this signature in database
4 changed files with 257 additions and 109 deletions

View File

@ -80,6 +80,17 @@ uint32_t ToxAVI::toxavIterationInterval(void) const {
void ToxAVI::toxavIterate(void) { void ToxAVI::toxavIterate(void) {
toxav_iterate(_tox_av); toxav_iterate(_tox_av);
dispatch(
ToxAV_Event::iterate_audio,
Events::IterateAudio{
}
);
dispatch(
ToxAV_Event::iterate_video,
Events::IterateVideo{
}
);
} }
uint32_t ToxAVI::toxavAudioIterationInterval(void) const { uint32_t ToxAVI::toxavAudioIterationInterval(void) const {
@ -88,6 +99,12 @@ uint32_t ToxAVI::toxavAudioIterationInterval(void) const {
void ToxAVI::toxavAudioIterate(void) { void ToxAVI::toxavAudioIterate(void) {
toxav_audio_iterate(_tox_av); toxav_audio_iterate(_tox_av);
dispatch(
ToxAV_Event::iterate_audio,
Events::IterateAudio{
}
);
} }
uint32_t ToxAVI::toxavVideoIterationInterval(void) const { uint32_t ToxAVI::toxavVideoIterationInterval(void) const {
@ -96,6 +113,12 @@ uint32_t ToxAVI::toxavVideoIterationInterval(void) const {
void ToxAVI::toxavVideoIterate(void) { void ToxAVI::toxavVideoIterate(void) {
toxav_video_iterate(_tox_av); toxav_video_iterate(_tox_av);
dispatch(
ToxAV_Event::iterate_video,
Events::IterateVideo{
}
);
} }
Toxav_Err_Call ToxAVI::toxavCall(uint32_t friend_number, uint32_t audio_bit_rate, uint32_t video_bit_rate) { Toxav_Err_Call ToxAVI::toxavCall(uint32_t friend_number, uint32_t audio_bit_rate, uint32_t video_bit_rate) {

View File

@ -55,6 +55,16 @@ namespace /*toxav*/ Events {
int32_t vstride; int32_t vstride;
}; };
// event fired on a/av thread every iterate
struct IterateAudio {
//float time_delta;
};
// event fired on v/av thread every iterate
struct IterateVideo {
//float time_delta;
};
} // Event } // Event
enum class ToxAV_Event : uint32_t { enum class ToxAV_Event : uint32_t {
@ -65,6 +75,9 @@ enum class ToxAV_Event : uint32_t {
friend_audio_frame, friend_audio_frame,
friend_video_frame, friend_video_frame,
iterate_audio,
iterate_video,
MAX MAX
}; };
@ -79,11 +92,15 @@ struct ToxAVEventI {
virtual bool onEvent(const Events::FriendVideoBitrate&) { return false; } virtual bool onEvent(const Events::FriendVideoBitrate&) { return false; }
virtual bool onEvent(const Events::FriendAudioFrame&) { return false; } virtual bool onEvent(const Events::FriendAudioFrame&) { return false; }
virtual bool onEvent(const Events::FriendVideoFrame&) { return false; } virtual bool onEvent(const Events::FriendVideoFrame&) { return false; }
virtual bool onEvent(const Events::IterateAudio&) { return false; }
virtual bool onEvent(const Events::IterateVideo&) { return false; }
}; };
using ToxAVEventProviderI = EventProviderI<ToxAVEventI>; using ToxAVEventProviderI = EventProviderI<ToxAVEventI>;
// TODO: seperate out implementation from interface // TODO: seperate out implementation from interface
struct ToxAVI : public ToxAVEventProviderI { struct ToxAVI : public ToxAVEventProviderI {
// tox and toxav internally are mutex protected
// BUT only if "experimental_thread_safety" is enabled
Tox* _tox = nullptr; Tox* _tox = nullptr;
ToxAV* _tox_av = nullptr; ToxAV* _tox_av = nullptr;

View File

@ -185,7 +185,8 @@ void ToxAVVoIPModel::addAudioSink(ObjectHandle session, uint32_t friend_number)
ObjectHandle outgoing_audio {_os.registry(), _os.registry().create()}; ObjectHandle outgoing_audio {_os.registry(), _os.registry().create()};
auto new_asink = std::make_unique<ToxAVCallAudioSink>(_av, friend_number); auto new_asink = std::make_unique<ToxAVCallAudioSink>(_av, friend_number);
outgoing_audio.emplace<ToxAVCallAudioSink*>(new_asink.get()); auto* new_asink_ptr = new_asink.get();
outgoing_audio.emplace<ToxAVCallAudioSink*>(new_asink_ptr);
outgoing_audio.emplace<Components::FrameStream2Sink<AudioFrame2>>(std::move(new_asink)); outgoing_audio.emplace<Components::FrameStream2Sink<AudioFrame2>>(std::move(new_asink));
outgoing_audio.emplace<Components::StreamSink>(Components::StreamSink::create<AudioFrame2>("ToxAV Friend Call Outgoing Audio")); outgoing_audio.emplace<Components::StreamSink>(Components::StreamSink::create<AudioFrame2>("ToxAV Friend Call Outgoing Audio"));
@ -201,6 +202,9 @@ void ToxAVVoIPModel::addAudioSink(ObjectHandle session, uint32_t friend_number)
// TODO: tie session to stream // TODO: tie session to stream
_os.throwEventConstruct(outgoing_audio); _os.throwEventConstruct(outgoing_audio);
std::lock_guard lg{_audio_sinks_mutex};
_audio_sinks.push_back(new_asink_ptr);
} }
void ToxAVVoIPModel::addVideoSource(ObjectHandle session, uint32_t friend_number) { void ToxAVVoIPModel::addVideoSource(ObjectHandle session, uint32_t friend_number) {
@ -234,7 +238,8 @@ void ToxAVVoIPModel::addVideoSink(ObjectHandle session, uint32_t friend_number)
ObjectHandle outgoing_video {_os.registry(), _os.registry().create()}; ObjectHandle outgoing_video {_os.registry(), _os.registry().create()};
auto new_vsink = std::make_unique<ToxAVCallVideoSink>(_av, friend_number); auto new_vsink = std::make_unique<ToxAVCallVideoSink>(_av, friend_number);
outgoing_video.emplace<ToxAVCallVideoSink*>(new_vsink.get()); auto* new_vsink_ptr = new_vsink.get();
outgoing_video.emplace<ToxAVCallVideoSink*>(new_vsink_ptr);
outgoing_video.emplace<Components::FrameStream2Sink<SDLVideoFrame>>(std::move(new_vsink)); outgoing_video.emplace<Components::FrameStream2Sink<SDLVideoFrame>>(std::move(new_vsink));
outgoing_video.emplace<Components::StreamSink>(Components::StreamSink::create<SDLVideoFrame>("ToxAV Friend Call Outgoing Video")); outgoing_video.emplace<Components::StreamSink>(Components::StreamSink::create<SDLVideoFrame>("ToxAV Friend Call Outgoing Video"));
@ -250,6 +255,9 @@ void ToxAVVoIPModel::addVideoSink(ObjectHandle session, uint32_t friend_number)
// TODO: tie session to stream // TODO: tie session to stream
_os.throwEventConstruct(outgoing_video); _os.throwEventConstruct(outgoing_video);
std::lock_guard lg{_video_sinks_mutex};
_video_sinks.push_back(new_vsink_ptr);
} }
void ToxAVVoIPModel::destroySession(ObjectHandle session) { void ToxAVVoIPModel::destroySession(ObjectHandle session) {
@ -282,6 +290,20 @@ void ToxAVVoIPModel::destroySession(ObjectHandle session) {
_video_sources.erase(it_vsrc); _video_sources.erase(it_vsrc);
} }
} }
if (session.all_of<ToxAVCallAudioSink*>()) {
std::lock_guard lg{_audio_sinks_mutex};
auto it = std::find(_audio_sinks.cbegin(), _audio_sinks.cend(), session.get<ToxAVCallAudioSink*>());
if (it != _audio_sinks.cend()) {
_audio_sinks.erase(it);
}
}
if (session.all_of<ToxAVCallVideoSink*>()) {
std::lock_guard lg{_video_sinks_mutex};
auto it = std::find(_video_sinks.cbegin(), _video_sinks.cend(), session.get<ToxAVCallVideoSink*>());
if (it != _video_sinks.cend()) {
_video_sinks.erase(it);
}
}
// destory sources // destory sources
if (auto* ss = session.try_get<Components::VoIP::StreamSources>(); ss != nullptr) { if (auto* ss = session.try_get<Components::VoIP::StreamSources>(); ss != nullptr) {
@ -306,34 +328,10 @@ void ToxAVVoIPModel::destroySession(ObjectHandle session) {
_os.registry().destroy(session); _os.registry().destroy(session);
} }
ToxAVVoIPModel::ToxAVVoIPModel(ObjectStore2& os, ToxAVI& av, Contact3Registry& cr, ToxContactModel2& tcm) : void ToxAVVoIPModel::audio_thread_tick(void) {
_os(os), _av(av), _cr(cr), _tcm(tcm) //for (const auto& [oc, asink] : _os.registry().view<ToxAVCallAudioSink*>().each()) {
{ std::lock_guard lg{_audio_sinks_mutex};
_av.subscribe(this, ToxAV_Event::friend_call); for (const auto& asink : _audio_sinks) {
_av.subscribe(this, ToxAV_Event::friend_call_state);
_av.subscribe(this, ToxAV_Event::friend_audio_bitrate);
_av.subscribe(this, ToxAV_Event::friend_video_bitrate);
_av.subscribe(this, ToxAV_Event::friend_audio_frame);
_av.subscribe(this, ToxAV_Event::friend_video_frame);
// attach to all tox friend contacts
for (const auto& [cv, _] : _cr.view<Contact::Components::ToxFriendPersistent>().each()) {
_cr.emplace<VoIPModelI*>(cv, this);
}
// TODO: events
}
ToxAVVoIPModel::~ToxAVVoIPModel(void) {
for (const auto& [ov, voipmodel] : _os.registry().view<VoIPModelI*>().each()) {
if (voipmodel == this) {
destroySession(_os.objectHandle(ov));
}
}
}
void ToxAVVoIPModel::tick(void) {
for (const auto& [oc, asink] : _os.registry().view<ToxAVCallAudioSink*>().each()) {
if (!asink->_writer) { if (!asink->_writer) {
continue; continue;
} }
@ -366,8 +364,12 @@ void ToxAVVoIPModel::tick(void) {
} }
} }
} }
}
for (const auto& [oc, vsink] : _os.registry().view<ToxAVCallVideoSink*>().each()) { void ToxAVVoIPModel::video_thread_tick(void) {
//for (const auto& [oc, vsink] : _os.registry().view<ToxAVCallVideoSink*>().each()) {
std::lock_guard lg{_video_sinks_mutex};
for (const auto& vsink : _video_sinks) {
if (!vsink->_writer) { if (!vsink->_writer) {
continue; continue;
} }
@ -401,6 +403,134 @@ void ToxAVVoIPModel::tick(void) {
} }
} }
void ToxAVVoIPModel::handleEvent(const Events::FriendCall& e) {
// new incoming call, create voip session, ready to be accepted
// (or rejected...)
const auto session_contact = _tcm.getContactFriend(e.friend_number);
if (!_cr.valid(session_contact)) {
return;
}
ObjectHandle new_session {_os.registry(), _os.registry().create()};
new_session.emplace<VoIPModelI*>(this);
new_session.emplace<Components::VoIP::TagVoIPSession>(); // ??
new_session.emplace<Components::VoIP::Incoming>(session_contact); // in 1on1 its always the same contact, might leave blank
new_session.emplace<Components::VoIP::SessionContact>(session_contact);
new_session.emplace<Components::VoIP::SessionState>().state = Components::VoIP::SessionState::State::RINGING;
new_session.emplace<Components::ToxAVIncomingAV>(e.audio_enabled, e.video_enabled);
_os.throwEventConstruct(new_session);
}
void ToxAVVoIPModel::handleEvent(const Events::FriendCallState& e) {
const auto session_contact = _tcm.getContactFriend(e.friend_number);
if (!_cr.valid(session_contact)) {
return;
}
ToxAVFriendCallState s{e.state};
// find session(s?)
// TODO: keep lookup table
for (const auto& [ov, voipmodel] : _os.registry().view<VoIPModelI*>().each()) {
if (voipmodel == this) {
auto o = _os.objectHandle(ov);
if (!o.all_of<Components::VoIP::SessionContact>()) {
continue;
}
if (session_contact != o.get<Components::VoIP::SessionContact>().c) {
continue;
}
if (s.is_error() || s.is_finished()) {
// destroy call
destroySession(o);
} else {
// remote accepted our call, or av send/recv conditions changed?
o.get<Components::VoIP::SessionState>().state = Components::VoIP::SessionState::State::CONNECTED; // set to in call ??
if (s.is_accepting_a() && !o.all_of<Components::ToxAVAudioSink>()) {
addAudioSink(o, e.friend_number);
} else if (!s.is_accepting_a() && o.all_of<Components::ToxAVAudioSink>()) {
// remove asink?
}
// video
if (s.is_accepting_v() && !o.all_of<Components::ToxAVVideoSink>()) {
addVideoSink(o, e.friend_number);
} else if (!s.is_accepting_v() && o.all_of<Components::ToxAVVideoSink>()) {
// remove vsink?
}
// add/update sources
// audio
if (s.is_sending_a() && !o.all_of<Components::ToxAVAudioSource>()) {
addAudioSource(o, e.friend_number);
} else if (!s.is_sending_a() && o.all_of<Components::ToxAVAudioSource>()) {
// remove asrc?
}
// video
if (s.is_sending_v() && !o.all_of<Components::ToxAVVideoSource>()) {
addVideoSource(o, e.friend_number);
} else if (!s.is_sending_v() && o.all_of<Components::ToxAVVideoSource>()) {
// remove vsrc?
}
}
}
}
}
ToxAVVoIPModel::ToxAVVoIPModel(ObjectStore2& os, ToxAVI& av, Contact3Registry& cr, ToxContactModel2& tcm) :
_os(os), _av(av), _cr(cr), _tcm(tcm)
{
_av.subscribe(this, ToxAV_Event::friend_call);
_av.subscribe(this, ToxAV_Event::friend_call_state);
_av.subscribe(this, ToxAV_Event::friend_audio_bitrate);
_av.subscribe(this, ToxAV_Event::friend_video_bitrate);
_av.subscribe(this, ToxAV_Event::friend_audio_frame);
_av.subscribe(this, ToxAV_Event::friend_video_frame);
_av.subscribe(this, ToxAV_Event::iterate_audio);
_av.subscribe(this, ToxAV_Event::iterate_video);
// attach to all tox friend contacts
for (const auto& [cv, _] : _cr.view<Contact::Components::ToxFriendPersistent>().each()) {
_cr.emplace<VoIPModelI*>(cv, this);
}
// TODO: events
}
ToxAVVoIPModel::~ToxAVVoIPModel(void) {
for (const auto& [ov, voipmodel] : _os.registry().view<VoIPModelI*>().each()) {
if (voipmodel == this) {
destroySession(_os.objectHandle(ov));
}
}
}
void ToxAVVoIPModel::tick(void) {
std::lock_guard lg{_e_queue_mutex};
while (!_e_queue.empty()) {
const auto& e_var = _e_queue.front();
if (std::holds_alternative<Events::FriendCall>(e_var)) {
const auto& e = std::get<Events::FriendCall>(e_var);
handleEvent(e);
} else if (std::holds_alternative<Events::FriendCallState>(e_var)) {
const auto& e = std::get<Events::FriendCallState>(e_var);
handleEvent(e);
} else {
assert(false && "unk event");
}
_e_queue.pop_front();
}
}
ObjectHandle ToxAVVoIPModel::enter(const Contact3 c, const Components::VoIP::DefaultConfig& defaults) { ObjectHandle ToxAVVoIPModel::enter(const Contact3 c, const Components::VoIP::DefaultConfig& defaults) {
if (!_cr.all_of<Contact::Components::ToxFriendEphemeral>(c)) { if (!_cr.all_of<Contact::Components::ToxFriendEphemeral>(c)) {
return {}; return {};
@ -529,94 +659,24 @@ bool ToxAVVoIPModel::leave(ObjectHandle session) {
} }
bool ToxAVVoIPModel::onEvent(const Events::FriendCall& e) { bool ToxAVVoIPModel::onEvent(const Events::FriendCall& e) {
// new incoming call, create voip session, ready to be accepted std::lock_guard lg{_e_queue_mutex};
// (or rejected...) _e_queue.push_back(e);
return true; // false?
const auto session_contact = _tcm.getContactFriend(e.friend_number);
if (!_cr.valid(session_contact)) {
return false;
}
ObjectHandle new_session {_os.registry(), _os.registry().create()};
new_session.emplace<VoIPModelI*>(this);
new_session.emplace<Components::VoIP::TagVoIPSession>(); // ??
new_session.emplace<Components::VoIP::Incoming>(session_contact); // in 1on1 its always the same contact, might leave blank
new_session.emplace<Components::VoIP::SessionContact>(session_contact);
new_session.emplace<Components::VoIP::SessionState>().state = Components::VoIP::SessionState::State::RINGING;
new_session.emplace<Components::ToxAVIncomingAV>(e.audio_enabled, e.video_enabled);
_os.throwEventConstruct(new_session);
return true;
} }
bool ToxAVVoIPModel::onEvent(const Events::FriendCallState& e) { bool ToxAVVoIPModel::onEvent(const Events::FriendCallState& e) {
const auto session_contact = _tcm.getContactFriend(e.friend_number); std::lock_guard lg{_e_queue_mutex};
if (!_cr.valid(session_contact)) { _e_queue.push_back(e);
return false; return true; // false?
}
ToxAVFriendCallState s{e.state};
// find session(s?)
// TODO: keep lookup table
for (const auto& [ov, voipmodel] : _os.registry().view<VoIPModelI*>().each()) {
if (voipmodel == this) {
auto o = _os.objectHandle(ov);
if (!o.all_of<Components::VoIP::SessionContact>()) {
continue;
}
if (session_contact != o.get<Components::VoIP::SessionContact>().c) {
continue;
}
if (s.is_error() || s.is_finished()) {
// destroy call
destroySession(o);
} else {
// remote accepted our call, or av send/recv conditions changed?
o.get<Components::VoIP::SessionState>().state = Components::VoIP::SessionState::State::CONNECTED; // set to in call ??
if (s.is_accepting_a() && !o.all_of<Components::ToxAVAudioSink>()) {
addAudioSink(o, e.friend_number);
} else if (!s.is_accepting_a() && o.all_of<Components::ToxAVAudioSink>()) {
// remove asink?
}
// video
if (s.is_accepting_v() && !o.all_of<Components::ToxAVVideoSink>()) {
addVideoSink(o, e.friend_number);
} else if (!s.is_accepting_v() && o.all_of<Components::ToxAVVideoSink>()) {
// remove vsink?
}
// add/update sources
// audio
if (s.is_sending_a() && !o.all_of<Components::ToxAVAudioSource>()) {
addAudioSource(o, e.friend_number);
} else if (!s.is_sending_a() && o.all_of<Components::ToxAVAudioSource>()) {
// remove asrc?
}
// video
if (s.is_sending_v() && !o.all_of<Components::ToxAVVideoSource>()) {
addVideoSource(o, e.friend_number);
} else if (!s.is_sending_v() && o.all_of<Components::ToxAVVideoSource>()) {
// remove vsrc?
}
}
}
}
return true;
} }
bool ToxAVVoIPModel::onEvent(const Events::FriendAudioBitrate&) { bool ToxAVVoIPModel::onEvent(const Events::FriendAudioBitrate&) {
// TODO: use this info
return false; return false;
} }
bool ToxAVVoIPModel::onEvent(const Events::FriendVideoBitrate&) { bool ToxAVVoIPModel::onEvent(const Events::FriendVideoBitrate&) {
// TODO: use this info
return false; return false;
} }
@ -708,6 +768,16 @@ bool ToxAVVoIPModel::onEvent(const Events::FriendVideoFrame& e) {
}); });
SDL_DestroySurface(new_surf); SDL_DestroySurface(new_surf);
return true;
}
bool ToxAVVoIPModel::onEvent(const Events::IterateAudio&) {
audio_thread_tick();
return false;
}
bool ToxAVVoIPModel::onEvent(const Events::IterateVideo&) {
video_thread_tick();
return false; return false;
} }

View File

@ -7,6 +7,13 @@
#include "./tox_av.hpp" #include "./tox_av.hpp"
#include <unordered_map> #include <unordered_map>
#include <variant>
#include <deque>
#include <mutex>
// fwd
struct ToxAVCallAudioSink;
struct ToxAVCallVideoSink;
class ToxAVVoIPModel : protected ToxAVEventI, public VoIPModelI { class ToxAVVoIPModel : protected ToxAVEventI, public VoIPModelI {
ObjectStore2& _os; ObjectStore2& _os;
@ -14,6 +21,26 @@ class ToxAVVoIPModel : protected ToxAVEventI, public VoIPModelI {
Contact3Registry& _cr; Contact3Registry& _cr;
ToxContactModel2& _tcm; ToxContactModel2& _tcm;
uint64_t _pad0;
// these events need to be worked on the main thread instead
// TODO: replac ewith lockless queue
std::deque<
std::variant<
Events::FriendCall,
Events::FriendCallState
// bitrates
>> _e_queue;
std::mutex _e_queue_mutex;
uint64_t _pad1;
std::vector<ToxAVCallAudioSink*> _audio_sinks;
std::mutex _audio_sinks_mutex;
uint64_t _pad2;
std::vector<ToxAVCallVideoSink*> _video_sinks;
std::mutex _video_sinks_mutex;
uint64_t _pad3;
// for faster lookup // for faster lookup
std::unordered_map<uint32_t, ObjectHandle> _audio_sources; std::unordered_map<uint32_t, ObjectHandle> _audio_sources;
std::unordered_map<uint32_t, ObjectHandle> _video_sources; std::unordered_map<uint32_t, ObjectHandle> _video_sources;
@ -26,10 +53,19 @@ class ToxAVVoIPModel : protected ToxAVEventI, public VoIPModelI {
void destroySession(ObjectHandle session); void destroySession(ObjectHandle session);
// TODO: this needs to move to the toxav thread
// we could use "events" as pre/post audio/video iterate...
void audio_thread_tick(void);
void video_thread_tick(void);
void handleEvent(const Events::FriendCall&);
void handleEvent(const Events::FriendCallState&);
public: public:
ToxAVVoIPModel(ObjectStore2& os, ToxAVI& av, Contact3Registry& cr, ToxContactModel2& tcm); ToxAVVoIPModel(ObjectStore2& os, ToxAVI& av, Contact3Registry& cr, ToxContactModel2& tcm);
~ToxAVVoIPModel(void); ~ToxAVVoIPModel(void);
// handle events coming from toxav thread(s)
void tick(void); void tick(void);
public: // voip model public: // voip model
@ -44,5 +80,7 @@ class ToxAVVoIPModel : protected ToxAVEventI, public VoIPModelI {
bool onEvent(const Events::FriendVideoBitrate&) override; bool onEvent(const Events::FriendVideoBitrate&) override;
bool onEvent(const Events::FriendAudioFrame&) override; bool onEvent(const Events::FriendAudioFrame&) override;
bool onEvent(const Events::FriendVideoFrame&) override; bool onEvent(const Events::FriendVideoFrame&) override;
bool onEvent(const Events::IterateAudio&) override;
bool onEvent(const Events::IterateVideo&) override;
}; };