diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 74c0e525..d4a5c375 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -108,6 +108,7 @@ target_sources(tomato PUBLIC ./frame_streams/stream_manager.hpp ./frame_streams/stream_manager.cpp ./frame_streams/locked_frame_stream.hpp + ./frame_streams/multi_source.hpp ./frame_streams/voip_model.hpp diff --git a/src/frame_streams/locked_frame_stream.hpp b/src/frame_streams/locked_frame_stream.hpp index a2a43ebd..5a095694 100644 --- a/src/frame_streams/locked_frame_stream.hpp +++ b/src/frame_streams/locked_frame_stream.hpp @@ -34,6 +34,10 @@ struct LockedFrameStream2 : public FrameStream2I { bool push(const FrameType& value) { std::lock_guard lg{_lock}; + if (_frames.size() > 1024) { + return false; // hard limit + } + _frames.push_back(value); return true; diff --git a/src/frame_streams/multi_source.hpp b/src/frame_streams/multi_source.hpp new file mode 100644 index 00000000..cce966cf --- /dev/null +++ b/src/frame_streams/multi_source.hpp @@ -0,0 +1,62 @@ +#pragma once + +#include "./locked_frame_stream.hpp" + +#include + +// implements a stream that pushes to all sub streams +template> +struct FrameStream2MultiSource : public FrameStream2SourceI, public FrameStream2I { + using sub_stream_type_t = SubStreamType; + + // pointer stability + std::vector> _sub_streams; + std::mutex _sub_stream_lock; // accessing the _sub_streams array needs to be exclusive + // a simple lock here is ok, since this tends to be a rare operation, + // except for the push, which is always on the same thread + + virtual ~FrameStream2MultiSource(void) {} + + // TODO: forward args instead + std::shared_ptr> subscribe(void) override { + std::lock_guard lg{_sub_stream_lock}; + return _sub_streams.emplace_back(std::make_unique()); + } + + bool unsubscribe(const std::shared_ptr>& sub) override { + std::lock_guard lg{_sub_stream_lock}; + for (auto it = _sub_streams.begin(); it != _sub_streams.end(); it++) { + if (*it == sub) { + _sub_streams.erase(it); + return true; + } + } + return false; // ? + } + + // stream interface + + int32_t size(void) override { + // TODO: return something sensible? + return -1; + } + + std::optional pop(void) override { + // nope + assert(false && "this logic is very frame type specific, provide an impl"); + return std::nullopt; + } + + // returns true if there are readers + bool push(const FrameType& value) override { + std::lock_guard lg{_sub_stream_lock}; + bool have_readers{false}; + for (auto& it : _sub_streams) { + [[maybe_unused]] auto _ = it->push(value); + have_readers = true; // even if queue full, we still continue believing in them + // maybe consider push return value? + } + return have_readers; + } +}; + diff --git a/src/tox_av_voip_model.cpp b/src/tox_av_voip_model.cpp index 2007a235..781c2930 100644 --- a/src/tox_av_voip_model.cpp +++ b/src/tox_av_voip_model.cpp @@ -6,6 +6,7 @@ #include "./frame_streams/stream_manager.hpp" #include "./frame_streams/audio_stream2.hpp" #include "./frame_streams/locked_frame_stream.hpp" +#include "./frame_streams/multi_source.hpp" #include @@ -31,7 +32,7 @@ namespace Components { // TODO: make proper adapter struct AudioStreamReFramer { FrameStream2I* _stream {nullptr}; - uint32_t frame_length_ms {10}; + uint32_t frame_length_ms {20}; // dequeue? std::vector buffer; @@ -59,7 +60,7 @@ struct AudioStreamReFramer { } - std::cout << "new incoming frame is " << new_value.getSpan().size/new_value.channels*1000/new_value.sample_rate << "ms\n"; + //std::cout << "new incoming frame is " << new_value.getSpan().size/new_value.channels*1000/new_value.sample_rate << "ms\n"; auto new_span = new_value.getSpan(); @@ -380,9 +381,36 @@ bool ToxAVVoIPModel::onEvent(const Events::FriendCallState& e) { // video // add/update sources - // audio - // video + auto& stream_source = o.get_or_emplace().streams; + // audio + if (s.is_sending_a() && !o.all_of()) { + ObjectHandle incoming_audio {_os.registry(), _os.registry().create()}; + + auto new_asrc = std::make_unique>(); + incoming_audio.emplace*>(new_asrc.get()); + incoming_audio.emplace>(std::move(new_asrc)); + incoming_audio.emplace(Components::StreamSource::create("ToxAV Friend Call Incoming Audio")); + + std::cout << "new incoming audio\n"; + if ( + const auto* defaults = o.try_get(); + defaults != nullptr && defaults->incoming_audio + ) { + incoming_audio.emplace(); // depends on what was specified in enter() + std::cout << "with default\n"; + } + + stream_source.push_back(incoming_audio); + o.emplace(incoming_audio); + // TODO: tie session to stream + + _os.throwEventConstruct(incoming_audio); + } else if (!s.is_sending_a() && o.all_of()) { + // remove asrc? + } + + // video } } } @@ -398,8 +426,56 @@ bool ToxAVVoIPModel::onEvent(const Events::FriendVideoBitrate&) { return false; } -bool ToxAVVoIPModel::onEvent(const Events::FriendAudioFrame&) { - return false; +bool ToxAVVoIPModel::onEvent(const Events::FriendAudioFrame& e) { + //auto& call = _calls[e.friend_number]; + + // get session? + // get asrc (directly instead?) this is pretty hot + const auto session_contact = _tcm.getContactFriend(e.friend_number); + if (!_cr.valid(session_contact)) { + return false; + } + + // jesus this is bad + ObjectHandle asrc; + for (const auto& [ov, voipmodel] : _os.registry().view().each()) { + if (voipmodel != this) { + continue; + } + + auto o = _os.objectHandle(ov); + + if (!o.all_of()) { + continue; + } + if (session_contact != o.get().c) { + continue; + } + + if (!o.all_of()) { + continue; + } + asrc = o.get().o; + break; + } + + if (!static_cast(asrc)) { + // missing src to put frame into ?? + return false; + } + + //assert(call.incoming_asrc.all_of()); + assert(asrc.all_of*>()); + //assert(call.incoming_asrc.all_of>()); + assert(asrc.all_of>()); + + asrc.get*>()->push(AudioFrame2{ + e.sampling_rate, + e.channels, + std::vector(e.pcm.begin(), e.pcm.end()) // copy + }); + + return true; } bool ToxAVVoIPModel::onEvent(const Events::FriendVideoFrame&) {