From b899b8131e1334d86a5207b137196162377c93d6 Mon Sep 17 00:00:00 2001 From: Green Sky Date: Fri, 27 Sep 2024 13:26:18 +0200 Subject: [PATCH] start porting frame streams --- src/CMakeLists.txt | 5 + src/frame_streams/audio_stream2.hpp | 39 +++++ src/frame_streams/frame_stream2.hpp | 47 ++++++ src/frame_streams/stream_manager.cpp | 139 ++++++++++++++++++ src/frame_streams/stream_manager.hpp | 205 +++++++++++++++++++++++++++ src/main.cpp | 2 +- src/main_screen.cpp | 7 + src/main_screen.hpp | 3 + 8 files changed, 446 insertions(+), 1 deletion(-) create mode 100644 src/frame_streams/audio_stream2.hpp create mode 100644 src/frame_streams/frame_stream2.hpp create mode 100644 src/frame_streams/stream_manager.cpp create mode 100644 src/frame_streams/stream_manager.hpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 882c67aa..993703c6 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -102,6 +102,11 @@ target_sources(tomato PUBLIC ./chat_gui4.hpp ./chat_gui4.cpp + + ./frame_streams/frame_stream2.hpp + ./frame_streams/audio_stream2.hpp + ./frame_streams/stream_manager.hpp + ./frame_streams/stream_manager.cpp ) if (TOMATO_TOX_AV) diff --git a/src/frame_streams/audio_stream2.hpp b/src/frame_streams/audio_stream2.hpp new file mode 100644 index 00000000..37e7230b --- /dev/null +++ b/src/frame_streams/audio_stream2.hpp @@ -0,0 +1,39 @@ +#pragma once + +#include "./frame_stream2.hpp" + +#include + +#include +#include +#include + +// raw audio +// channels make samples interleaved, +// planar channels are not supported +// s16 only stopgap audio frame (simplified) +struct AudioFrame2 { + // samples per second + uint32_t sample_rate {48'000}; + + // only >0 is valid + size_t channels {0}; + + std::variant< + std::vector, // S16, platform endianess + Span // non owning variant, for direct consumption + > buffer; + + // helpers + Span getSpan(void) const { + if (std::holds_alternative>(buffer)) { + return Span{std::get>(buffer)}; + } else { + return std::get>(buffer); + } + return {}; + } +}; + +using AudioFrame2Stream2I = FrameStream2I; + diff --git a/src/frame_streams/frame_stream2.hpp b/src/frame_streams/frame_stream2.hpp new file mode 100644 index 00000000..9a274e76 --- /dev/null +++ b/src/frame_streams/frame_stream2.hpp @@ -0,0 +1,47 @@ +#pragma once + +#include +#include +#include +#include + +// Frames often consist of: +// - seq id // incremental sequential id, gaps in ids can be used to detect loss +// - or timestamp +// - data // the frame data +// eg: +//struct ExampleFrame { + //int64_t seq_id {0}; + //std::vector data; +//}; + +template +struct FrameStream2I { + virtual ~FrameStream2I(void) {} + + // get number of available frames + // returns -1 if unknown + [[nodiscard]] virtual int32_t size(void) = 0; + + // get next frame + // data sharing? -> no, data is copied for each fsr, if concurency supported + [[nodiscard]] virtual std::optional pop(void) = 0; + + // returns true if there are readers (or we dont know) + virtual bool push(const FrameType& value) = 0; +}; + +template +struct FrameStream2SourceI { + virtual ~FrameStream2SourceI(void) {} + [[nodiscard]] virtual std::shared_ptr> subscribe(void) = 0; + virtual bool unsubscribe(const std::shared_ptr>& sub) = 0; +}; + +template +struct FrameStream2SinkI { + virtual ~FrameStream2SinkI(void) {} + [[nodiscard]] virtual std::shared_ptr> subscribe(void) = 0; + virtual bool unsubscribe(const std::shared_ptr>& sub) = 0; +}; + diff --git a/src/frame_streams/stream_manager.cpp b/src/frame_streams/stream_manager.cpp new file mode 100644 index 00000000..27bda3ef --- /dev/null +++ b/src/frame_streams/stream_manager.cpp @@ -0,0 +1,139 @@ +#include "./stream_manager.hpp" + +StreamManager::Connection::Connection( + ObjectHandle src_, + ObjectHandle sink_, + std::unique_ptr&& data_, + std::function&& pump_fn_, + std::function&& unsubscribe_fn_, + bool on_main_thread_ +) : + src(src_), + sink(sink_), + data(std::move(data_)), + pump_fn(std::move(pump_fn_)), + unsubscribe_fn(std::move(unsubscribe_fn_)), + on_main_thread(on_main_thread_) +{ + if (!on_main_thread) { + // start thread + pump_thread = std::thread([this](void) { + while (!stop) { + pump_fn(*this); + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } + finished = true; + }); + } +} + +StreamManager::StreamManager(ObjectStore2& os) : _os(os) {} + +StreamManager::~StreamManager(void) { + // stop all connetions + for (const auto& con : _connections) { + con->stop = true; + if (!con->on_main_thread) { + con->pump_thread.join(); // we skip the finished check and wait + } + con->unsubscribe_fn(*con); + } +} + +bool StreamManager::connect(Object src, Object sink, bool threaded) { + auto h_src = _os.objectHandle(src); + auto h_sink = _os.objectHandle(sink); + if (!static_cast(h_src) || !static_cast(h_sink)) { + // an object does not exist + return false; + } + + // get src and sink comps + if (!h_src.all_of()) { + // src not stream source + return false; + } + + if (!h_sink.all_of()) { + // sink not stream sink + return false; + } + + const auto& ssrc = h_src.get(); + const auto& ssink = h_sink.get(); + + // compare type + if (ssrc.frame_type_name != ssink.frame_type_name) { + return false; + } + + // always fail in debug mode + assert(static_cast(ssrc.connect_fn)); + if (!static_cast(ssrc.connect_fn)) { + return false; + } + + // use connect fn from src + return ssrc.connect_fn(*this, src, sink, threaded); +} + +bool StreamManager::disconnect(Object src, Object sink) { + auto res = std::find_if( + _connections.cbegin(), _connections.cend(), + [&](const auto& a) { return a->src == src && a->sink == sink; } + ); + if (res == _connections.cend()) { + // not found + return false; + } + + // do disconnect + (*res)->stop = true; + + return true; +} + +bool StreamManager::disconnectAll(Object o) { + bool succ {false}; + for (const auto& con : _connections) { + if (con->src == o || con->sink == o) { + con->stop = true; + succ = true; + } + } + + return succ; +} + +// do we need the time delta? +float StreamManager::tick(float) { + // pump all mainthread connections + for (auto it = _connections.begin(); it != _connections.end();) { + auto& con = **it; + + if (!static_cast(con.src) || !static_cast(con.sink)) { + // either side disappeard without disconnectAll + // TODO: warn/error log + con.stop = true; + } + + if (con.on_main_thread) { + con.pump_fn(con); + } + + if (con.stop && (con.finished || con.on_main_thread)) { + if (!con.on_main_thread) { + assert(con.pump_thread.joinable()); + con.pump_thread.join(); + } + con.unsubscribe_fn(con); + it = _connections.erase(it); + } else { + it++; + } + } + + // return min over intervals instead + return 2.f; // TODO: 2sec makes mainthread connections unusable +} + diff --git a/src/frame_streams/stream_manager.hpp b/src/frame_streams/stream_manager.hpp new file mode 100644 index 00000000..ecd116db --- /dev/null +++ b/src/frame_streams/stream_manager.hpp @@ -0,0 +1,205 @@ +#pragma once + +#include +#include + +#include + +#include "./frame_stream2.hpp" + +#include +#include +#include +#include +#include +#include + +// fwd +class StreamManager; + +namespace Components { + struct StreamSource { + std::string name; + std::string frame_type_name; + + std::function connect_fn; + + template + static StreamSource create(const std::string& name); + }; + + struct StreamSink { + std::string name; + std::string frame_type_name; + + template + static StreamSink create(const std::string& name); + }; + + template + using FrameStream2Source = std::unique_ptr>; + + template + using FrameStream2Sink = std::unique_ptr>; + +} // Components + + +class StreamManager { + friend class StreamManagerUI; // TODO: make this go away + ObjectStore2& _os; + + struct Connection { + ObjectHandle src; + ObjectHandle sink; + + struct Data { + virtual ~Data(void) {} + }; + std::unique_ptr data; // stores reader writer type erased + std::function pump_fn; // TODO: make it return next interval? + std::function unsubscribe_fn; + + bool on_main_thread {true}; + std::atomic_bool stop {false}; // disconnect + std::atomic_bool finished {false}; // disconnect + + // pump thread + std::thread pump_thread; + + // frame interval counters and estimates + // TODO + + Connection(void) = default; + Connection( + ObjectHandle src_, + ObjectHandle sink_, + std::unique_ptr&& data_, + std::function&& pump_fn_, + std::function&& unsubscribe_fn_, + bool on_main_thread_ = true + ); + }; + std::vector> _connections; + + public: + StreamManager(ObjectStore2& os); + virtual ~StreamManager(void); + + template + bool connect(Object src, Object sink, bool threaded = true); + + bool connect(Object src, Object sink, bool threaded = true); + bool disconnect(Object src, Object sink); + bool disconnectAll(Object o); + + // do we need the time delta? + float tick(float); +}; + +// template impls + +namespace Components { + + // we require the complete sm type here + template + StreamSource StreamSource::create(const std::string& name) { + return StreamSource{ + name, + std::string{entt::type_name::value()}, + +[](StreamManager& sm, Object src, Object sink, bool threaded) { + return sm.connect(src, sink, threaded); + }, + }; + } + + template + StreamSink StreamSink::create(const std::string& name) { + return StreamSink{ + name, + std::string{entt::type_name::value()}, + }; + } + +} // Components + +template +bool StreamManager::connect(Object src, Object sink, bool threaded) { + auto res = std::find_if( + _connections.cbegin(), _connections.cend(), + [&](const auto& a) { return a->src == src && a->sink == sink; } + ); + if (res != _connections.cend()) { + // already exists + return false; + } + + auto h_src = _os.objectHandle(src); + auto h_sink = _os.objectHandle(sink); + if (!static_cast(h_src) || !static_cast(h_sink)) { + // an object does not exist + return false; + } + + if (!h_src.all_of>()) { + // src not stream source + return false; + } + + if (!h_sink.all_of>()) { + // sink not stream sink + return false; + } + + auto& src_stream = h_src.get>(); + auto& sink_stream = h_sink.get>(); + + struct inlineData : public Connection::Data { + virtual ~inlineData(void) {} + std::shared_ptr> reader; + std::shared_ptr> writer; + }; + + auto our_data = std::make_unique(); + + our_data->reader = src_stream->subscribe(); + if (!our_data->reader) { + return false; + } + our_data->writer = sink_stream->subscribe(); + if (!our_data->writer) { + return false; + } + + _connections.push_back(std::make_unique( + h_src, + h_sink, + std::move(our_data), + [](Connection& con) -> void { + // there might be more stored + for (size_t i = 0; i < 10; i++) { + auto new_frame_opt = static_cast(con.data.get())->reader->pop(); + // TODO: frame interval estimates + if (new_frame_opt.has_value()) { + static_cast(con.data.get())->writer->push(new_frame_opt.value()); + } else { + break; + } + } + }, + [](Connection& con) -> void { + auto* src_stream_ptr = con.src.try_get>(); + if (src_stream_ptr != nullptr) { + (*src_stream_ptr)->unsubscribe(static_cast(con.data.get())->reader); + } + auto* sink_stream_ptr = con.sink.try_get>(); + if (sink_stream_ptr != nullptr) { + (*sink_stream_ptr)->unsubscribe(static_cast(con.data.get())->writer); + } + }, + !threaded + )); + + return true; +} + diff --git a/src/main.cpp b/src/main.cpp index 9e9eceb3..49de9669 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -35,7 +35,7 @@ int main(int argc, char** argv) { // setup hints #ifndef __ANDROID__ - if (SDL_SetHint(SDL_HINT_VIDEO_ALLOW_SCREENSAVER, "1") != SDL_TRUE) { + if (!SDL_SetHint(SDL_HINT_VIDEO_ALLOW_SCREENSAVER, "1")) { std::cerr << "Failed to set '" << SDL_HINT_VIDEO_ALLOW_SCREENSAVER << "' to 1\n"; } #endif diff --git a/src/main_screen.cpp b/src/main_screen.cpp index 285f1a4e..8454f9e1 100644 --- a/src/main_screen.cpp +++ b/src/main_screen.cpp @@ -19,6 +19,7 @@ MainScreen::MainScreen(SimpleConfigModel&& conf_, SDL_Renderer* renderer_, Theme rmm(cr), msnj{cr, {}, {}}, mts(rmm), + sm(os), tc(save_path, save_password), tpi(tc.getTox()), ad(tc), @@ -474,6 +475,8 @@ Screen* MainScreen::render(float time_delta, bool&) { } Screen* MainScreen::tick(float time_delta, bool& quit) { + const float sm_interval = sm.tick(time_delta); + quit = !tc.iterate(time_delta); // compute tcm.iterate(time_delta); // compute @@ -505,6 +508,10 @@ Screen* MainScreen::tick(float time_delta, bool& quit) { std::pow(tc.toxIterationInterval(), 1.6f)/1000.f, pm_interval ); + _min_tick_interval = std::min( + _min_tick_interval, + sm_interval + ); _min_tick_interval = std::min( _min_tick_interval, fo_interval diff --git a/src/main_screen.hpp b/src/main_screen.hpp index 1aa5939d..51721efe 100644 --- a/src/main_screen.hpp +++ b/src/main_screen.hpp @@ -11,6 +11,7 @@ #include #include #include "./tox_private_impl.hpp" +#include "./frame_streams/stream_manager.hpp" #include #include @@ -58,6 +59,8 @@ struct MainScreen final : public Screen { MessageSerializerNJ msnj; MessageTimeSort mts; + StreamManager sm; + ToxEventLogger tel{std::cout}; ToxClient tc; ToxPrivateImpl tpi;