diff --git a/src/frame_streams/audio_stream2.hpp b/src/frame_streams/audio_stream2.hpp index 37e7230..1717358 100644 --- a/src/frame_streams/audio_stream2.hpp +++ b/src/frame_streams/audio_stream2.hpp @@ -35,5 +35,15 @@ struct AudioFrame2 { } }; +template<> +constexpr bool frameHasBytes(void) { + return true; +} + +template<> +inline uint64_t frameGetBytes(const AudioFrame2& frame) { + return frame.getSpan().size * sizeof(int16_t); +} + using AudioFrame2Stream2I = FrameStream2I; diff --git a/src/frame_streams/frame_stream2.hpp b/src/frame_streams/frame_stream2.hpp index 9a274e7..cb437e0 100644 --- a/src/frame_streams/frame_stream2.hpp +++ b/src/frame_streams/frame_stream2.hpp @@ -45,3 +45,21 @@ struct FrameStream2SinkI { virtual bool unsubscribe(const std::shared_ptr>& sub) = 0; }; +// typed config +// overload for your frame types + +template +constexpr bool frameHasTimestamp(void) { return false; } + +template +constexpr uint64_t frameGetTimestampDivision(void) = delete; + +template +uint64_t frameGetTimestamp(const FrameType&) = delete; + +template +constexpr bool frameHasBytes(void) { return false; } + +template +uint64_t frameGetBytes(const FrameType&) = delete; + diff --git a/src/frame_streams/sdl/video.hpp b/src/frame_streams/sdl/video.hpp index b2c1b11..f9b1851 100644 --- a/src/frame_streams/sdl/video.hpp +++ b/src/frame_streams/sdl/video.hpp @@ -1,5 +1,7 @@ #pragma once +#include "../frame_stream2.hpp" + #include #include @@ -39,3 +41,47 @@ struct SDLVideoFrame { SDLVideoFrame& operator=(const SDLVideoFrame& other) = delete; }; +template<> +constexpr bool frameHasTimestamp(void) { + return true; +} + +template<> +constexpr uint64_t frameGetTimestampDivision(void) { + // microseconds + // technically sdl would provide them in nanoseconds... but we cut them down + return 1'000*1'000; +} + +template<> +inline uint64_t frameGetTimestamp(const SDLVideoFrame& frame) { + return frame.timestampUS; +} + +template<> +constexpr bool frameHasBytes(void) { + return true; +} + +// TODO: test how performant this call is +template<> +inline uint64_t frameGetBytes(const SDLVideoFrame& frame) { + if (!frame.surface) { + return 0; + } + + const auto* surf = frame.surface.get(); + + if (surf->format == SDL_PIXELFORMAT_MJPG) { + // mjpg is special, since it is compressed + return surf->pitch; + } + + const auto* details = SDL_GetPixelFormatDetails(surf->format); + if (details == nullptr) { + return 0; + } + + return details->bytes_per_pixel * surf->w * surf->h; +} + diff --git a/src/frame_streams/stream_manager.cpp b/src/frame_streams/stream_manager.cpp index 1c4be13..2bf72d5 100644 --- a/src/frame_streams/stream_manager.cpp +++ b/src/frame_streams/stream_manager.cpp @@ -113,6 +113,8 @@ bool StreamManager::disconnectAll(Object o) { // do we need the time delta? float StreamManager::tick(float) { + float interval_min {2.01f}; + // pump all mainthread connections for (auto it = _connections.begin(); it != _connections.end();) { auto& con = **it; @@ -125,6 +127,10 @@ float StreamManager::tick(float) { if (con.on_main_thread) { con.pump_fn(con); + const float con_interval = con.interval_avg; + if (con_interval > 0.f) { + interval_min = std::min(interval_min, con_interval); + } } if (con.stop && (con.finished || con.on_main_thread)) { @@ -139,8 +145,7 @@ float StreamManager::tick(float) { } } - // return min over intervals instead - return 2.f; // TODO: 2sec makes mainthread connections unusable + return interval_min; } bool StreamManager::onEvent(const ObjectStore::Events::ObjectConstruct& e) { diff --git a/src/frame_streams/stream_manager.hpp b/src/frame_streams/stream_manager.hpp index 7350036..5bb481f 100644 --- a/src/frame_streams/stream_manager.hpp +++ b/src/frame_streams/stream_manager.hpp @@ -2,6 +2,7 @@ #include #include +#include #include @@ -78,7 +79,16 @@ class StreamManager : protected ObjectStoreEventI { std::thread pump_thread; // frame interval counters and estimates - // TODO + std::atomic interval_avg {0.f}; // s + std::atomic frames_total{0}; + std::atomic bytes_total{0}; // if it can be mesured + + // moving avg + std::atomic bytes_per_sec{0}; + + // temps for mesuring + uint64_t _last_ts {0}; // frame format OR ms if frame has no ts + Connection(void) = default; Connection( @@ -193,19 +203,60 @@ bool StreamManager::connect(Object src, Object sink, bool threaded) { h_src, h_sink, std::move(our_data), - [](Connection& con) -> void { + [](Connection& con) -> void { // pump // there might be more stored for (size_t i = 0; i < 64; i++) { auto new_frame_opt = static_cast(con.data.get())->reader->pop(); // TODO: frame interval estimates if (new_frame_opt.has_value()) { + con.frames_total++; + + // TODO: opt-in ? + float delta{0.f}; // s + uint64_t ts{0}; + if constexpr (frameHasTimestamp()) { + ts = frameGetTimestamp(new_frame_opt.value()); + } else { + ts = getTimeMS(); // fallback + } + + if (con._last_ts != 0 && ts > con._last_ts) { + // normalize to seconds + if constexpr (frameHasTimestamp()) { + delta = float(ts - con._last_ts) / frameGetTimestampDivision(); + } else { + delta = float(ts - con._last_ts) / 1000.f; // fallback + } + + if (con.interval_avg == 0.f) { + con.interval_avg = delta; + } else { + con.interval_avg = con.interval_avg*0.95f + delta*0.05f; + } + } + con._last_ts = ts; + + if constexpr (frameHasBytes()) { + // we need to always run this, timing stuff below might not + const auto bytes = frameGetBytes(new_frame_opt.value()); + con.bytes_total += bytes; + + if (delta > 0.f) { + if (con.bytes_per_sec == 0.f) { + con.bytes_per_sec = bytes/delta; + } else { + con.bytes_per_sec = con.bytes_per_sec*0.95f + (bytes/delta)*0.05f; + } + } + } + static_cast(con.data.get())->writer->push(new_frame_opt.value()); } else { break; } } }, - [](Connection& con) -> void { + [](Connection& con) -> void { // disco auto* src_stream_ptr = con.src.try_get>(); if (src_stream_ptr != nullptr) { (*src_stream_ptr)->unsubscribe(static_cast(con.data.get())->reader); diff --git a/src/stream_manager_ui.cpp b/src/stream_manager_ui.cpp index 9ff55c2..de6c12e 100644 --- a/src/stream_manager_ui.cpp +++ b/src/stream_manager_ui.cpp @@ -2,9 +2,12 @@ #include +#include "./string_formatter_utils.hpp" + #include #include +#include StreamManagerUI::StreamManagerUI(ObjectStore2& os, StreamManager& sm) : _os(os), _sm(sm) { } @@ -193,6 +196,7 @@ void StreamManagerUI::render(void) { const auto& con = _sm._connections[i]; //ImGui::Text("con %d->%d", entt::to_integral(entt::to_entity(con->src.entity())), entt::to_integral(entt::to_entity(con->sink.entity()))); + ImGui::BeginGroup(); // TODO: make group hover work ImGui::PushID(i); ImGui::TableNextColumn(); @@ -224,6 +228,27 @@ void StreamManagerUI::render(void) { ); ImGui::PopID(); + ImGui::EndGroup(); // TODO: make group hover work + if (ImGui::BeginItemTooltip()) { + uint64_t bytes_total = con->bytes_total; + float bytes_per_sec = con->bytes_per_sec; + + const char* bytes_total_suffix = "???"; + int64_t bytes_total_divider = sizeToHumanReadable(bytes_total, bytes_total_suffix); + + const char* bytes_ps_suffix = "???"; + int64_t bytes_ps_divider = sizeToHumanReadable(bytes_per_sec, bytes_ps_suffix); + + ImGui::Text( + "interval: ~%.2fms (%.2ffps)\n" + "frames total: %" PRIu64 "\n" + "bytes total: %.2f%s (avg ~%.1f%s/s)", + con->interval_avg*1000.f, 1.f/con->interval_avg, + (uint64_t)con->frames_total, + bytes_total/float(bytes_total_divider), bytes_total_suffix, bytes_per_sec/bytes_ps_divider, bytes_ps_suffix + ); + ImGui::EndTooltip(); + } } ImGui::EndTable(); } diff --git a/src/string_formatter_utils.hpp b/src/string_formatter_utils.hpp index 22d8669..ef10c9b 100644 --- a/src/string_formatter_utils.hpp +++ b/src/string_formatter_utils.hpp @@ -5,7 +5,7 @@ #include // returns divider and places static suffix string into suffix_out -static int64_t sizeToHumanReadable(int64_t file_size, const char*& suffix_out) { +static inline int64_t sizeToHumanReadable(int64_t file_size, const char*& suffix_out) { static const char* suffix_arr[] { "Bytes", "KiB", @@ -27,7 +27,7 @@ static int64_t sizeToHumanReadable(int64_t file_size, const char*& suffix_out) { } // returns divider and places static suffix string into suffix_out -static int64_t durationToHumanReadable(int64_t t, const char*& suffix_out) { +static inline int64_t durationToHumanReadable(int64_t t, const char*& suffix_out) { static const char* suffix_arr[] { "ms", "s",