From b4373e0d9a66f54b566ca6a5731b408ca83e21a4 Mon Sep 17 00:00:00 2001 From: Green Sky Date: Mon, 16 Sep 2024 20:24:07 +0200 Subject: [PATCH] more stream progress, threaded connections toxav video sending --- src/content/sdl_video_frame_stream2.cpp | 2 +- src/content/sdl_video_frame_stream2.hpp | 9 +- src/debug_tox_call.cpp | 302 ++++++++++++++++++------ src/debug_tox_call.hpp | 17 +- src/debug_video_tap.cpp | 58 ++--- src/debug_video_tap.hpp | 2 +- src/main.cpp | 2 +- src/main_screen.cpp | 2 +- src/stream_manager.hpp | 98 ++++++-- src/stream_manager_ui.cpp | 44 +++- 10 files changed, 395 insertions(+), 141 deletions(-) diff --git a/src/content/sdl_video_frame_stream2.cpp b/src/content/sdl_video_frame_stream2.cpp index eff7fb4..a5b8517 100644 --- a/src/content/sdl_video_frame_stream2.cpp +++ b/src/content/sdl_video_frame_stream2.cpp @@ -126,7 +126,7 @@ SDLVideoCameraContent::SDLVideoCameraContent(void) { bool someone_listening {false}; { SDLVideoFrame new_frame_non_owning { - timestampNS, + timestampNS/1000, sdl_frame_next }; diff --git a/src/content/sdl_video_frame_stream2.hpp b/src/content/sdl_video_frame_stream2.hpp index 85cc413..6b8dfd7 100644 --- a/src/content/sdl_video_frame_stream2.hpp +++ b/src/content/sdl_video_frame_stream2.hpp @@ -12,7 +12,8 @@ inline void nopSurfaceDestructor(SDL_Surface*) {} // this is very sdl specific struct SDLVideoFrame { // TODO: sequence numbering? - uint64_t timestampNS {0}; + // micro seconds (nano is way too much) + uint64_t timestampUS {0}; std::unique_ptr surface {nullptr, &SDL_DestroySurface}; @@ -21,12 +22,12 @@ struct SDLVideoFrame { uint64_t ts, SDL_Surface* surf ) { - timestampNS = ts; + timestampUS = ts; surface = {surf, &nopSurfaceDestructor}; } // copy SDLVideoFrame(const SDLVideoFrame& other) { - timestampNS = other.timestampNS; + timestampUS = other.timestampUS; if (static_cast(other.surface)) { //surface = { // SDL_CreateSurface( @@ -39,7 +40,7 @@ struct SDLVideoFrame { //SDL_BlitSurface(other.surface.get(), nullptr, surface.get(), nullptr); surface = { SDL_DuplicateSurface(other.surface.get()), - &SDL_DestroySurface + &SDL_DestroySurface }; } } diff --git a/src/debug_tox_call.cpp b/src/debug_tox_call.cpp index 14acf43..f9a2610 100644 --- a/src/debug_tox_call.cpp +++ b/src/debug_tox_call.cpp @@ -1,21 +1,21 @@ #include "./debug_tox_call.hpp" +#include "./stream_manager.hpp" +#include "./content/sdl_video_frame_stream2.hpp" + #include -#include #include #include +#include #include +#include // fwd namespace Message { - uint64_t getTimeMS(void); -} - -static constexpr float lerp(float a, float b, float t) { - return a + t * (b - a); + uint64_t getTimeMS(); } namespace Components { @@ -32,6 +32,110 @@ namespace Components { }; } +static bool isFormatPlanar(SDL_PixelFormat f) { + return + f == SDL_PIXELFORMAT_YV12 || + f == SDL_PIXELFORMAT_IYUV || + f == SDL_PIXELFORMAT_YUY2 || + f == SDL_PIXELFORMAT_UYVY || + f == SDL_PIXELFORMAT_YVYU || + f == SDL_PIXELFORMAT_NV12 || + f == SDL_PIXELFORMAT_NV21 || + f == SDL_PIXELFORMAT_P010 + ; +} + +struct PushConversionQueuedVideoStream : public QueuedFrameStream2 { + SDL_PixelFormat _forced_format {SDL_PIXELFORMAT_IYUV}; + + PushConversionQueuedVideoStream(size_t queue_size, bool lossy = true) : QueuedFrameStream2(queue_size, lossy) {} + ~PushConversionQueuedVideoStream(void) {} + + bool push(const SDLVideoFrame& value) override { + SDL_Surface* converted_surf = value.surface.get(); + if (converted_surf->format != SDL_PIXELFORMAT_IYUV) { + //std::cerr << "DTC: need to convert from " << SDL_GetPixelFormatName(converted_surf->format) << " to SDL_PIXELFORMAT_IYUV\n"; + if (isFormatPlanar(converted_surf->format)) { + // meh, need to convert to rgb as a stopgap + + //auto start = Message::getTimeMS(); + //SDL_Surface* tmp_conv_surf = SDL_ConvertSurfaceAndColorspace(converted_surf, SDL_PIXELFORMAT_RGBA32, nullptr, SDL_COLORSPACE_RGB_DEFAULT, 0); + SDL_Surface* tmp_conv_surf = SDL_ConvertSurfaceAndColorspace(converted_surf, SDL_PIXELFORMAT_RGB24, nullptr, SDL_COLORSPACE_RGB_DEFAULT, 0); + //auto end = Message::getTimeMS(); + //std::cerr << "DTC: timing " << SDL_GetPixelFormatName(converted_surf->format) << "->SDL_PIXELFORMAT_RGB24: " << end-start << "ms\n"; + + // TODO: fix sdl rgb->yuv conversion resulting in too dark (colorspace) issues + //start = Message::getTimeMS(); + converted_surf = SDL_ConvertSurfaceAndColorspace(tmp_conv_surf, SDL_PIXELFORMAT_IYUV, nullptr, SDL_COLORSPACE_YUV_DEFAULT, 0); + //end = Message::getTimeMS(); + //std::cerr << "DTC: timing SDL_PIXELFORMAT_RGB24->SDL_PIXELFORMAT_IYUV: " << end-start << "ms\n"; + + SDL_DestroySurface(tmp_conv_surf); + } else { + converted_surf = SDL_ConvertSurface(converted_surf, SDL_PIXELFORMAT_IYUV); + } + + if (converted_surf == nullptr) { + // oh god + std::cerr << "DTC error: failed to convert surface to IYUV: " << SDL_GetError() << "\n"; + return false; + } + } + assert(converted_surf != nullptr); + if (converted_surf != value.surface.get()) { + // TODO: add ctr with uptr + SDLVideoFrame new_value{value.timestampUS, nullptr}; + new_value.surface = { + converted_surf, + &SDL_DestroySurface + }; + + return QueuedFrameStream2::push(new_value); + } else { + return QueuedFrameStream2::push(value); + } + } +}; + +// exlusive +// TODO: replace with something better than a queue +struct ToxAVCallVideoSink : public FrameStream2SinkI { + uint32_t _fid; + std::shared_ptr _writer; + + ToxAVCallVideoSink(uint32_t fid) : _fid(fid) {} + ~ToxAVCallVideoSink(void) {} + + // sink + std::shared_ptr> subscribe(void) override { + if (_writer) { + // max 1 (exclusive) + return nullptr; + } + + // TODO: enable video here + _writer = std::make_shared(1, true); + + return _writer; + } + + bool unsubscribe(const std::shared_ptr>& sub) override { + if (!sub || !_writer) { + // nah + return false; + } + + if (sub == _writer) { + // TODO: disable video here + _writer = nullptr; + return true; + } + + // what + return false; + } +}; + DebugToxCall::DebugToxCall(ObjectStore2& os, ToxAV& toxav, TextureUploaderI& tu) : _os(os), _toxav(toxav), _tu(tu) { _toxav.subscribe(this, ToxAV_Event::friend_call); _toxav.subscribe(this, ToxAV_Event::friend_call_state); @@ -41,7 +145,70 @@ DebugToxCall::DebugToxCall(ObjectStore2& os, ToxAV& toxav, TextureUploaderI& tu) _toxav.subscribe(this, ToxAV_Event::friend_video_frame); } -void DebugToxCall::tick(float time_delta) { +void DebugToxCall::tick(float) { + // pump sink to tox + // TODO: own thread or direct on push + // TODO: pump at double the frame rate + for (const auto& [oc, vsink] : _os.registry().view().each()) { + if (!vsink->_writer) { + continue; + } + + auto new_frame_opt = vsink->_writer->pop(); + if (!new_frame_opt.has_value()) { + continue; + } + + if (!new_frame_opt.value().surface) { + // wtf? + continue; + } + + SDL_Surface* surf = new_frame_opt.value().surface.get(); + + SDL_Surface* converted_surf = surf; + if (converted_surf->format != SDL_PIXELFORMAT_IYUV) { + std::cerr << "DTC: need to convert from " << SDL_GetPixelFormatName(converted_surf->format) << " to SDL_PIXELFORMAT_IYUV\n"; + if (isFormatPlanar(converted_surf->format)) { + // meh, need to convert to rgb as a stopgap + //SDL_Surface* tmp_conv_surf = SDL_ConvertSurfaceAndColorspace(converted_surf, SDL_PIXELFORMAT_RGBA32, nullptr, SDL_COLORSPACE_RGB_DEFAULT, 0); + auto start = Message::getTimeMS(); + SDL_Surface* tmp_conv_surf = SDL_ConvertSurfaceAndColorspace(converted_surf, SDL_PIXELFORMAT_RGB24, nullptr, SDL_COLORSPACE_RGB_DEFAULT, 0); + auto end = Message::getTimeMS(); + std::cerr << "DTC: timing " << SDL_GetPixelFormatName(converted_surf->format) << "->SDL_PIXELFORMAT_RGB24: " << end-start << "ms\n"; + + // TODO: fix sdl rgb->yuv conversion resulting in too dark (colorspace) issues + start = Message::getTimeMS(); + converted_surf = SDL_ConvertSurfaceAndColorspace(tmp_conv_surf, SDL_PIXELFORMAT_IYUV, nullptr, SDL_COLORSPACE_YUV_DEFAULT, 0); + end = Message::getTimeMS(); + std::cerr << "DTC: timing SDL_PIXELFORMAT_RGB24->SDL_PIXELFORMAT_IYUV: " << end-start << "ms\n"; + SDL_DestroySurface(tmp_conv_surf); + } else { + converted_surf = SDL_ConvertSurface(converted_surf, SDL_PIXELFORMAT_IYUV); + } + + if (converted_surf == nullptr) { + // oh god + std::cerr << "DTC error: failed to convert surface to IYUV: " << SDL_GetError() << "\n"; + continue; + } + } + assert(converted_surf != nullptr); + + SDL_LockSurface(converted_surf); + _toxav.toxavVideoSendFrame( + vsink->_fid, + converted_surf->w, converted_surf->h, + static_cast(converted_surf->pixels), + static_cast(converted_surf->pixels) + converted_surf->w * converted_surf->h, + static_cast(converted_surf->pixels) + converted_surf->w * converted_surf->h + (converted_surf->w/2) * (converted_surf->h/2) + ); + SDL_UnlockSurface(converted_surf); + + if (converted_surf != surf) { + SDL_DestroySurface(converted_surf); + } + } } float DebugToxCall::render(void) { @@ -56,9 +223,32 @@ float DebugToxCall::render(void) { if (call.incoming) { ImGui::SameLine(); if (ImGui::SmallButton("answer")) { - const auto ret = _toxav.toxavAnswer(fid, 0, 0); + //const auto ret = _toxav.toxavAnswer(fid, 0, 1); // 1mbit/s + const auto ret = _toxav.toxavAnswer(fid, 0, 2); // 2mbit/s + //const auto ret = _toxav.toxavAnswer(fid, 0, 100); // 100mbit/s + //const auto ret = _toxav.toxavAnswer(fid, 0, 2500); // 2500mbit/s if (ret == TOXAV_ERR_ANSWER_OK) { call.incoming = false; + + // create sinks + call.outgoing_vsink = {_os.registry(), _os.registry().create()}; + { + auto new_vsink = std::make_unique(fid); + call.outgoing_vsink.emplace(new_vsink.get()); + call.outgoing_vsink.emplace>(std::move(new_vsink)); + call.outgoing_vsink.emplace("ToxAV friend call video", std::string{entt::type_name::value()}); + } + + // create sources + if (call.incoming_v) { + call.incoming_vsrc = {_os.registry(), _os.registry().create()}; + { + auto new_vsrc = std::make_unique(); + call.incoming_vsrc.emplace(new_vsrc.get()); + call.incoming_vsrc.emplace>(std::move(new_vsrc)); + call.incoming_vsrc.emplace("ToxAV friend call video", std::string{entt::type_name::value()}); + } + } } } } else if (call.state != TOXAV_FRIEND_CALL_STATE_FINISHED) { @@ -70,6 +260,14 @@ float DebugToxCall::render(void) { // we hung up // not sure if its possible for toxcore to tell this us too when the other side does this at the same time? call.state = TOXAV_FRIEND_CALL_STATE_FINISHED; + + // TODO: stream manager disconnectAll() + if (static_cast(call.outgoing_vsink)) { + call.outgoing_vsink.destroy(); + } + if (static_cast(call.incoming_vsrc)) { + call.incoming_vsrc.destroy(); + } } } @@ -81,18 +279,6 @@ float DebugToxCall::render(void) { //} } - //if (call.last_v_frame_tex != 0 && ImGui::BeginItemTooltip()) { - if (call.last_v_frame_tex != 0) { - next_frame = std::min(next_frame, call.v_frame_interval_avg); - ImGui::Text("vframe interval avg: %f", call.v_frame_interval_avg); - ImGui::Image( - reinterpret_cast(call.last_v_frame_tex), - //ImVec2{float(call.last_v_frame_width), float(call.last_v_frame_height)} - ImVec2{100.f, 100.f * float(call.last_v_frame_height)/call.last_v_frame_width} - ); - //ImGui::EndTooltip(); - } - ImGui::PopID(); } ImGui::Unindent(); @@ -107,7 +293,7 @@ bool DebugToxCall::onEvent(const Events::FriendCall& e) { call.incoming = true; call.incoming_a = e.audio_enabled; call.incoming_v = e.video_enabled; - //call.state = TOXAV_FRIEND_CALL_STATE_NONE; + call.state = TOXAV_FRIEND_CALL_STATE_NONE; return true; } @@ -116,6 +302,18 @@ bool DebugToxCall::onEvent(const Events::FriendCallState& e) { auto& call = _calls[e.friend_number]; call.state = e.state; + if ( + (call.state & TOXAV_FRIEND_CALL_STATE_FINISHED) != 0 || + (call.state & TOXAV_FRIEND_CALL_STATE_ERROR) != 0 + ) { + if (static_cast(call.outgoing_vsink)) { + call.outgoing_vsink.destroy(); + } + if (static_cast(call.incoming_vsrc)) { + call.incoming_vsrc.destroy(); + } + } + return true; } @@ -134,26 +332,19 @@ bool DebugToxCall::onEvent(const Events::FriendAudioFrame& e) { } bool DebugToxCall::onEvent(const Events::FriendVideoFrame& e) { + // TODO: skip if we dont know about this call auto& call = _calls[e.friend_number]; - call.num_v_frames++; - if (call.last_v_frame_timepoint == 0) { - call.last_v_frame_timepoint = Message::getTimeMS(); - } else { - const auto new_time_point = Message::getTimeMS(); - auto time_delta_ms = new_time_point - call.last_v_frame_timepoint; - call.last_v_frame_timepoint = new_time_point; - time_delta_ms = std::min(time_delta_ms, 10*1000); // cap at 10sec - - if (call.v_frame_interval_avg == 0) { - call.v_frame_interval_avg = time_delta_ms/1000.f; - } else { - std::cerr << "lerp(" << call.v_frame_interval_avg << ", " << time_delta_ms/1000.f << ", 0.2f) = "; - call.v_frame_interval_avg = lerp(call.v_frame_interval_avg, time_delta_ms/1000.f, 0.2f); - std::cerr << call.v_frame_interval_avg << "\n"; - } + if (!static_cast(call.incoming_vsrc)) { + // missing src to put frame into ?? + return false; } + assert(call.incoming_vsrc.all_of()); + assert(call.incoming_vsrc.all_of>()); + + call.num_v_frames++; + auto* new_surf = SDL_CreateSurface(e.width, e.height, SDL_PIXELFORMAT_IYUV); assert(new_surf); if (SDL_LockSurface(new_surf)) { @@ -190,37 +381,12 @@ bool DebugToxCall::onEvent(const Events::FriendVideoFrame& e) { SDL_UnlockSurface(new_surf); } - auto* converted_surf = SDL_ConvertSurfaceAndColorspace(new_surf, SDL_PIXELFORMAT_RGBA32, nullptr, SDL_COLORSPACE_YUV_DEFAULT, 0); - SDL_DestroySurface(new_surf); - if (converted_surf == nullptr) { - assert(false); - return true; - } + call.incoming_vsrc.get()->push({ + // ms -> us + Message::getTimeMS() * 1000, // TODO: make more precise + new_surf + }); - SDL_LockSurface(converted_surf); - if (call.last_v_frame_tex == 0 || call.last_v_frame_width != e.width || call.last_v_frame_height != e.height) { - _tu.destroy(call.last_v_frame_tex); - call.last_v_frame_tex = _tu.uploadRGBA( - static_cast(converted_surf->pixels), - converted_surf->w, - converted_surf->h, - TextureUploaderI::LINEAR, - TextureUploaderI::STREAMING - ); - - call.last_v_frame_width = e.width; - call.last_v_frame_height = e.height; - } else { - _tu.updateRGBA(call.last_v_frame_tex, static_cast(converted_surf->pixels), converted_surf->w * converted_surf->h * 4); - } - SDL_UnlockSurface(converted_surf); - SDL_DestroySurface(converted_surf); - - // TODO: use this instead - //SDL_UpdateYUVTexture(tex, nullptr, e.y.ptr, e.ystride,... - - std::cout << "DTC: updated video texture " << call.last_v_frame_tex << "\n"; - - return false; + return true; } diff --git a/src/debug_tox_call.hpp b/src/debug_tox_call.hpp index a17f981..369de2e 100644 --- a/src/debug_tox_call.hpp +++ b/src/debug_tox_call.hpp @@ -1,6 +1,7 @@ #pragma once -#include +//#include +#include #include "./tox_av.hpp" #include "./texture_uploader.hpp" @@ -19,19 +20,17 @@ class DebugToxCall : public ToxAVEventI { uint32_t state {0}; // ? just last state ? - uint32_t abr {0}; - uint32_t vbr {0}; + uint32_t incomming_abr {0}; + uint32_t incomming_vbr {0}; size_t num_a_frames {0}; size_t num_v_frames {0}; - // fps moving interval - uint64_t last_v_frame_timepoint {0}; - float v_frame_interval_avg {0.f}; + ObjectHandle incoming_vsrc; + ObjectHandle incoming_asrc; - uint64_t last_v_frame_tex {0}; - uint64_t last_v_frame_width {0}; - uint64_t last_v_frame_height {0}; + ObjectHandle outgoing_vsink; + ObjectHandle outgoing_asink; }; // tox friend id -> call std::map _calls; diff --git a/src/debug_video_tap.cpp b/src/debug_video_tap.cpp index c21aab9..c18574d 100644 --- a/src/debug_video_tap.cpp +++ b/src/debug_video_tap.cpp @@ -72,30 +72,6 @@ DebugVideoTap::~DebugVideoTap(void) { float DebugVideoTap::render(void) { if (ImGui::Begin("DebugVideoTap")) { - // list sources dropdown to connect too - std::string preview_label {"none"}; - if (static_cast(_selected_src)) { - preview_label = std::to_string(entt::to_integral(_selected_src.entity())) + " (" + _selected_src.get().name + ")"; - } - - if (ImGui::BeginCombo("selected source", preview_label.c_str())) { - if (ImGui::Selectable("none")) { - switchTo({}); - } - - for (const auto& [oc, ss] : _os.registry().view().each()) { - if (ss.frame_type_name != entt::type_name::value()) { - continue; - } - std::string label = std::to_string(entt::to_integral(oc)) + " (" + ss.name + ")"; - if (ImGui::Selectable(label.c_str())) { - switchTo({_os.registry(), oc}); - } - } - - ImGui::EndCombo(); - } - { // first pull the latest img from sink and update the texture assert(static_cast(_tap)); @@ -106,18 +82,18 @@ float DebugVideoTap::render(void) { if (new_frame_opt.has_value()) { // timing if (_v_last_ts == 0) { - _v_last_ts = new_frame_opt.value().timestampNS; + _v_last_ts = new_frame_opt.value().timestampUS; } else { - auto delta = int64_t(new_frame_opt.value().timestampNS) - int64_t(_v_last_ts); - _v_last_ts = new_frame_opt.value().timestampNS; + auto delta = int64_t(new_frame_opt.value().timestampUS) - int64_t(_v_last_ts); + _v_last_ts = new_frame_opt.value().timestampUS; //delta = std::min(delta, 10*1000*1000); if (_v_interval_avg == 0) { - _v_interval_avg = delta/1'000'000'000.f; + _v_interval_avg = delta/1'000'000.f; } else { const float r = 0.2f; - _v_interval_avg = _v_interval_avg * (1-r) + (delta/1'000'000'000.f) * r; + _v_interval_avg = _v_interval_avg * (1.f-r) + (delta/1'000'000.f) * r; } } @@ -160,6 +136,30 @@ float DebugVideoTap::render(void) { } } + // list sources dropdown to connect too + std::string preview_label {"none"}; + if (static_cast(_selected_src)) { + preview_label = std::to_string(entt::to_integral(_selected_src.entity())) + " (" + _selected_src.get().name + ")"; + } + + if (ImGui::BeginCombo("selected source", preview_label.c_str())) { + if (ImGui::Selectable("none")) { + switchTo({}); + } + + for (const auto& [oc, ss] : _os.registry().view().each()) { + if (ss.frame_type_name != entt::type_name::value()) { + continue; + } + std::string label = std::to_string(entt::to_integral(oc)) + " (" + ss.name + ")"; + if (ImGui::Selectable(label.c_str())) { + switchTo({_os.registry(), oc}); + } + } + + ImGui::EndCombo(); + } + // img here if (_tex != 0) { ImGui::Text("moving avg interval: %f", _v_interval_avg); diff --git a/src/debug_video_tap.hpp b/src/debug_video_tap.hpp index 3d9624f..6044ca9 100644 --- a/src/debug_video_tap.hpp +++ b/src/debug_video_tap.hpp @@ -18,7 +18,7 @@ class DebugVideoTap { uint32_t _tex_w {0}; uint32_t _tex_h {0}; - uint64_t _v_last_ts {0}; // ns + uint64_t _v_last_ts {0}; // us float _v_interval_avg {0.f}; // s public: diff --git a/src/main.cpp b/src/main.cpp index 1e0805a..29c54a8 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -108,7 +108,7 @@ int main(int argc, char** argv) { std::this_thread::sleep_for(std::chrono::milliseconds(50)); auto new_frame_opt = reader->pop(); if (new_frame_opt.has_value()) { - std::cout << "video frame was " << new_frame_opt.value().surface->w << "x" << new_frame_opt.value().surface->h << " " << new_frame_opt.value().timestampNS << "ns " << new_frame_opt.value().surface->format << "sf\n"; + std::cout << "video frame was " << new_frame_opt.value().surface->w << "x" << new_frame_opt.value().surface->h << " " << new_frame_opt.value().timestampUS << "us " << new_frame_opt.value().surface->format << "sf\n"; } } vcc.unsubscribe(reader); diff --git a/src/main_screen.cpp b/src/main_screen.cpp index 0d0fe18..e7de762 100644 --- a/src/main_screen.cpp +++ b/src/main_screen.cpp @@ -147,7 +147,7 @@ MainScreen::MainScreen(SimpleConfigModel&& conf_, SDL_Renderer* renderer_, Theme conf.dump(); { // add system av devices - { + if (false) { ObjectHandle vsrc {os.registry(), os.registry().create()}; try { vsrc.emplace>( diff --git a/src/stream_manager.hpp b/src/stream_manager.hpp index 09688df..8310517 100644 --- a/src/stream_manager.hpp +++ b/src/stream_manager.hpp @@ -10,6 +10,8 @@ #include #include #include +#include +#include namespace Components { struct StreamSource { @@ -41,13 +43,19 @@ class StreamManager { ObjectHandle src; ObjectHandle sink; + struct Data { + virtual ~Data(void) {} + }; + std::unique_ptr data; // stores reader writer type erased std::function pump_fn; + std::function unsubscribe_fn; bool on_main_thread {true}; std::atomic_bool stop {false}; // disconnect std::atomic_bool finished {false}; // disconnect // pump thread + std::thread pump_thread; // frame interval counters and estimates @@ -55,20 +63,44 @@ class StreamManager { Connection( ObjectHandle src_, ObjectHandle sink_, + std::unique_ptr&& data_, std::function&& pump_fn_, + std::function&& unsubscribe_fn_, bool on_main_thread_ = true ) : src(src_), sink(sink_), + data(std::move(data_)), pump_fn(std::move(pump_fn_)), + unsubscribe_fn(std::move(unsubscribe_fn_)), on_main_thread(on_main_thread_) - {} + { + if (!on_main_thread) { + // start thread + pump_thread = std::thread([this](void) { + while (!stop) { + pump_fn(*this); + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } + finished = true; + }); + } + } }; std::vector> _connections; public: StreamManager(ObjectStore2& os) : _os(os) {} - virtual ~StreamManager(void) {} + virtual ~StreamManager(void) { + // stop all connetions + for (const auto& con : _connections) { + con->stop = true; + if (!con->on_main_thread) { + con->pump_thread.join(); // we skip the finished check and wait + } + con->unsubscribe_fn(*con); + } + } // TODO: default typed sources and sinks @@ -76,7 +108,7 @@ class StreamManager { // TODO: improve this design // src and sink need to be a FrameStream2MultiStream template - bool connect(Object src, Object sink) { + bool connect(Object src, Object sink, bool threaded = true) { auto res = std::find_if( _connections.cbegin(), _connections.cend(), [&](const auto& a) { return a->src == src && a->sink == sink; } @@ -114,44 +146,50 @@ class StreamManager { auto& src_stream = h_src.get>(); auto& sink_stream = h_sink.get>(); - auto reader = src_stream->subscribe(); - if (!reader) { + struct inlineData : public Connection::Data { + virtual ~inlineData(void) {} + std::shared_ptr> reader; + std::shared_ptr> writer; + }; + + auto our_data = std::make_unique(); + + our_data->reader = src_stream->subscribe(); + if (!our_data->reader) { return false; } - auto writer = sink_stream->subscribe(); - if (!writer) { + our_data->writer = sink_stream->subscribe(); + if (!our_data->writer) { return false; } _connections.push_back(std::make_unique( h_src, h_sink, - // refactor extract, we just need the type info here - [reader = std::move(reader), writer = std::move(writer)](Connection& con) -> void { + std::move(our_data), + [](Connection& con) -> void { // there might be more stored for (size_t i = 0; i < 10; i++) { - auto new_frame_opt = reader->pop(); + auto new_frame_opt = static_cast(con.data.get())->reader->pop(); // TODO: frame interval estimates if (new_frame_opt.has_value()) { - writer->push(new_frame_opt.value()); + static_cast(con.data.get())->writer->push(new_frame_opt.value()); } else { break; } } - - if (con.stop) { - auto* src_stream_ptr = con.src.try_get>(); - if (src_stream_ptr != nullptr) { - (*src_stream_ptr)->unsubscribe(reader); - } - auto* sink_stream_ptr = con.sink.try_get>(); - if (sink_stream_ptr != nullptr) { - (*sink_stream_ptr)->unsubscribe(writer); - } - con.finished = true; + }, + [](Connection& con) -> void { + auto* src_stream_ptr = con.src.try_get>(); + if (src_stream_ptr != nullptr) { + (*src_stream_ptr)->unsubscribe(static_cast(con.data.get())->reader); + } + auto* sink_stream_ptr = con.sink.try_get>(); + if (sink_stream_ptr != nullptr) { + (*sink_stream_ptr)->unsubscribe(static_cast(con.data.get())->writer); } }, - true // TODO: threaded + !threaded )); return true; @@ -192,11 +230,23 @@ class StreamManager { // pump all mainthread connections for (auto it = _connections.begin(); it != _connections.end();) { auto& con = **it; + + if (!static_cast(con.src) || !static_cast(con.sink)) { + // either side disappeard without disconnectAll + // TODO: warn/error log + con.stop = true; + } + if (con.on_main_thread) { con.pump_fn(con); } - if (con.stop && con.finished) { + if (con.stop && (con.finished || con.on_main_thread)) { + if (!con.on_main_thread) { + assert(con.pump_thread.joinable()); + con.pump_thread.join(); + } + con.unsubscribe_fn(con); it = _connections.erase(it); } else { it++; diff --git a/src/stream_manager_ui.cpp b/src/stream_manager_ui.cpp index 4196647..03d221e 100644 --- a/src/stream_manager_ui.cpp +++ b/src/stream_manager_ui.cpp @@ -1,9 +1,13 @@ #include "./stream_manager_ui.hpp" +#include "./content/sdl_video_frame_stream2.hpp" + #include #include +#include + StreamManagerUI::StreamManagerUI(ObjectStore2& os, StreamManager& sm) : _os(os), _sm(sm) { } @@ -15,23 +19,57 @@ void StreamManagerUI::render(void) { ImGui::SeparatorText("Sources"); + // TODO: tables of id, button connect->to, name, type + // list sources for (const auto& [oc, ss] : _os.registry().view().each()) { - ImGui::Text("src %d (%s)[%s]", entt::to_integral(oc), ss.name.c_str(), ss.frame_type_name.c_str()); + ImGui::Text("src %d (%s)[%s]", entt::to_integral(entt::to_entity(oc)), ss.name.c_str(), ss.frame_type_name.c_str()); } ImGui::SeparatorText("Sinks"); // list sinks for (const auto& [oc, ss] : _os.registry().view().each()) { - ImGui::Text("sink %d (%s)[%s]", entt::to_integral(oc), ss.name.c_str(), ss.frame_type_name.c_str()); + ImGui::PushID(entt::to_integral(oc)); + ImGui::Text("sink %d (%s)[%s]", entt::to_integral(entt::to_entity(oc)), ss.name.c_str(), ss.frame_type_name.c_str()); + + if (ImGui::BeginPopupContextItem("sink_connect")) { + if (ImGui::BeginMenu("connect video", ss.frame_type_name == entt::type_name::value())) { + for (const auto& [oc_src, s_src] : _os.registry().view().each()) { + if (s_src.frame_type_name != ss.frame_type_name) { + continue; + } + + ImGui::PushID(entt::to_integral(oc_src)); + + std::string source_label {"src "}; + source_label += std::to_string(entt::to_integral(entt::to_entity(oc_src))); + source_label += " ("; + source_label += s_src.name; + source_label += ")["; + source_label += s_src.frame_type_name; + source_label += "]"; + if (ImGui::MenuItem(source_label.c_str())) { + _sm.connect(oc_src, oc); + } + + ImGui::PopID(); + } + + ImGui::EndMenu(); + } + ImGui::EndPopup(); + } + ImGui::PopID(); } ImGui::SeparatorText("Connections"); + // TODO: table of id, button disconnect, context x->y, from name, to name, type? + // list connections for (const auto& con : _sm._connections) { - ImGui::Text("con %d->%d", entt::to_integral(con->src.entity()), entt::to_integral(con->sink.entity())); + ImGui::Text("con %d->%d", entt::to_integral(entt::to_entity(con->src.entity())), entt::to_integral(entt::to_entity(con->sink.entity()))); } } ImGui::End();