stream connection mesurements
Some checks are pending
ContinuousDelivery / linux-ubuntu (push) Waiting to run
ContinuousDelivery / android (map[ndk_abi:arm64-v8a vcpkg_toolkit:arm64-android]) (push) Waiting to run
ContinuousDelivery / android (map[ndk_abi:armeabi-v7a vcpkg_toolkit:arm-neon-android]) (push) Waiting to run
ContinuousDelivery / android (map[ndk_abi:x86_64 vcpkg_toolkit:x64-android]) (push) Waiting to run
ContinuousDelivery / windows (push) Waiting to run
ContinuousDelivery / windows-asan (push) Waiting to run
ContinuousDelivery / dumpsyms (push) Blocked by required conditions
ContinuousDelivery / release (push) Blocked by required conditions
ContinuousIntegration / linux (push) Waiting to run
ContinuousIntegration / android (map[ndk_abi:arm64-v8a vcpkg_toolkit:arm64-android]) (push) Waiting to run
ContinuousIntegration / android (map[ndk_abi:armeabi-v7a vcpkg_toolkit:arm-neon-android]) (push) Waiting to run
ContinuousIntegration / android (map[ndk_abi:x86_64 vcpkg_toolkit:x64-android]) (push) Waiting to run
ContinuousIntegration / macos (push) Waiting to run
ContinuousIntegration / windows (push) Waiting to run

This commit is contained in:
Green Sky 2025-03-22 12:14:14 +01:00
parent 1fb590dfc1
commit 0f85bcc128
No known key found for this signature in database
GPG Key ID: DBE05085D874AB4A
7 changed files with 162 additions and 7 deletions

View File

@ -35,5 +35,15 @@ struct AudioFrame2 {
}
};
template<>
constexpr bool frameHasBytes<AudioFrame2>(void) {
return true;
}
template<>
inline uint64_t frameGetBytes(const AudioFrame2& frame) {
return frame.getSpan().size * sizeof(int16_t);
}
using AudioFrame2Stream2I = FrameStream2I<AudioFrame2>;

View File

@ -45,3 +45,21 @@ struct FrameStream2SinkI {
virtual bool unsubscribe(const std::shared_ptr<FrameStream2I<FrameType>>& sub) = 0;
};
// typed config
// overload for your frame types
template<typename FrameType>
constexpr bool frameHasTimestamp(void) { return false; }
template<typename FrameType>
constexpr uint64_t frameGetTimestampDivision(void) = delete;
template<typename FrameType>
uint64_t frameGetTimestamp(const FrameType&) = delete;
template<typename FrameType>
constexpr bool frameHasBytes(void) { return false; }
template<typename FrameType>
uint64_t frameGetBytes(const FrameType&) = delete;

View File

@ -1,5 +1,7 @@
#pragma once
#include "../frame_stream2.hpp"
#include <SDL3/SDL.h>
#include <cstdint>
@ -39,3 +41,47 @@ struct SDLVideoFrame {
SDLVideoFrame& operator=(const SDLVideoFrame& other) = delete;
};
template<>
constexpr bool frameHasTimestamp<SDLVideoFrame>(void) {
return true;
}
template<>
constexpr uint64_t frameGetTimestampDivision<SDLVideoFrame>(void) {
// microseconds
// technically sdl would provide them in nanoseconds... but we cut them down
return 1'000*1'000;
}
template<>
inline uint64_t frameGetTimestamp(const SDLVideoFrame& frame) {
return frame.timestampUS;
}
template<>
constexpr bool frameHasBytes<SDLVideoFrame>(void) {
return true;
}
// TODO: test how performant this call is
template<>
inline uint64_t frameGetBytes(const SDLVideoFrame& frame) {
if (!frame.surface) {
return 0;
}
const auto* surf = frame.surface.get();
if (surf->format == SDL_PIXELFORMAT_MJPG) {
// mjpg is special, since it is compressed
return surf->pitch;
}
const auto* details = SDL_GetPixelFormatDetails(surf->format);
if (details == nullptr) {
return 0;
}
return details->bytes_per_pixel * surf->w * surf->h;
}

View File

@ -113,6 +113,8 @@ bool StreamManager::disconnectAll(Object o) {
// do we need the time delta?
float StreamManager::tick(float) {
float interval_min {2.01f};
// pump all mainthread connections
for (auto it = _connections.begin(); it != _connections.end();) {
auto& con = **it;
@ -125,6 +127,10 @@ float StreamManager::tick(float) {
if (con.on_main_thread) {
con.pump_fn(con);
const float con_interval = con.interval_avg;
if (con_interval > 0.f) {
interval_min = std::min(interval_min, con_interval);
}
}
if (con.stop && (con.finished || con.on_main_thread)) {
@ -139,8 +145,7 @@ float StreamManager::tick(float) {
}
}
// return min over intervals instead
return 2.f; // TODO: 2sec makes mainthread connections unusable
return interval_min;
}
bool StreamManager::onEvent(const ObjectStore::Events::ObjectConstruct& e) {

View File

@ -2,6 +2,7 @@
#include <solanaceae/object_store/fwd.hpp>
#include <solanaceae/object_store/object_store.hpp>
#include <solanaceae/util/time.hpp>
#include <entt/core/type_info.hpp>
@ -78,7 +79,16 @@ class StreamManager : protected ObjectStoreEventI {
std::thread pump_thread;
// frame interval counters and estimates
// TODO
std::atomic<float> interval_avg {0.f}; // s
std::atomic<uint64_t> frames_total{0};
std::atomic<uint64_t> bytes_total{0}; // if it can be mesured
// moving avg
std::atomic<float> bytes_per_sec{0};
// temps for mesuring
uint64_t _last_ts {0}; // frame format OR ms if frame has no ts
Connection(void) = default;
Connection(
@ -193,19 +203,60 @@ bool StreamManager::connect(Object src, Object sink, bool threaded) {
h_src,
h_sink,
std::move(our_data),
[](Connection& con) -> void {
[](Connection& con) -> void { // pump
// there might be more stored
for (size_t i = 0; i < 64; i++) {
auto new_frame_opt = static_cast<inlineData*>(con.data.get())->reader->pop();
// TODO: frame interval estimates
if (new_frame_opt.has_value()) {
con.frames_total++;
// TODO: opt-in ?
float delta{0.f}; // s
uint64_t ts{0};
if constexpr (frameHasTimestamp<FrameType>()) {
ts = frameGetTimestamp(new_frame_opt.value());
} else {
ts = getTimeMS(); // fallback
}
if (con._last_ts != 0 && ts > con._last_ts) {
// normalize to seconds
if constexpr (frameHasTimestamp<FrameType>()) {
delta = float(ts - con._last_ts) / frameGetTimestampDivision<FrameType>();
} else {
delta = float(ts - con._last_ts) / 1000.f; // fallback
}
if (con.interval_avg == 0.f) {
con.interval_avg = delta;
} else {
con.interval_avg = con.interval_avg*0.95f + delta*0.05f;
}
}
con._last_ts = ts;
if constexpr (frameHasBytes<FrameType>()) {
// we need to always run this, timing stuff below might not
const auto bytes = frameGetBytes(new_frame_opt.value());
con.bytes_total += bytes;
if (delta > 0.f) {
if (con.bytes_per_sec == 0.f) {
con.bytes_per_sec = bytes/delta;
} else {
con.bytes_per_sec = con.bytes_per_sec*0.95f + (bytes/delta)*0.05f;
}
}
}
static_cast<inlineData*>(con.data.get())->writer->push(new_frame_opt.value());
} else {
break;
}
}
},
[](Connection& con) -> void {
[](Connection& con) -> void { // disco
auto* src_stream_ptr = con.src.try_get<Components::FrameStream2Source<FrameType>>();
if (src_stream_ptr != nullptr) {
(*src_stream_ptr)->unsubscribe(static_cast<inlineData*>(con.data.get())->reader);

View File

@ -2,9 +2,12 @@
#include <solanaceae/object_store/object_store.hpp>
#include "./string_formatter_utils.hpp"
#include <imgui/imgui.h>
#include <string>
#include <cinttypes>
StreamManagerUI::StreamManagerUI(ObjectStore2& os, StreamManager& sm) : _os(os), _sm(sm) {
}
@ -193,6 +196,7 @@ void StreamManagerUI::render(void) {
const auto& con = _sm._connections[i];
//ImGui::Text("con %d->%d", entt::to_integral(entt::to_entity(con->src.entity())), entt::to_integral(entt::to_entity(con->sink.entity())));
ImGui::BeginGroup(); // TODO: make group hover work
ImGui::PushID(i);
ImGui::TableNextColumn();
@ -224,6 +228,27 @@ void StreamManagerUI::render(void) {
);
ImGui::PopID();
ImGui::EndGroup(); // TODO: make group hover work
if (ImGui::BeginItemTooltip()) {
uint64_t bytes_total = con->bytes_total;
float bytes_per_sec = con->bytes_per_sec;
const char* bytes_total_suffix = "???";
int64_t bytes_total_divider = sizeToHumanReadable(bytes_total, bytes_total_suffix);
const char* bytes_ps_suffix = "???";
int64_t bytes_ps_divider = sizeToHumanReadable(bytes_per_sec, bytes_ps_suffix);
ImGui::Text(
"interval: ~%.2fms (%.2ffps)\n"
"frames total: %" PRIu64 "\n"
"bytes total: %.2f%s (avg ~%.1f%s/s)",
con->interval_avg*1000.f, 1.f/con->interval_avg,
(uint64_t)con->frames_total,
bytes_total/float(bytes_total_divider), bytes_total_suffix, bytes_per_sec/bytes_ps_divider, bytes_ps_suffix
);
ImGui::EndTooltip();
}
}
ImGui::EndTable();
}

View File

@ -5,7 +5,7 @@
#include <array>
// returns divider and places static suffix string into suffix_out
static int64_t sizeToHumanReadable(int64_t file_size, const char*& suffix_out) {
static inline int64_t sizeToHumanReadable(int64_t file_size, const char*& suffix_out) {
static const char* suffix_arr[] {
"Bytes",
"KiB",
@ -27,7 +27,7 @@ static int64_t sizeToHumanReadable(int64_t file_size, const char*& suffix_out) {
}
// returns divider and places static suffix string into suffix_out
static int64_t durationToHumanReadable(int64_t t, const char*& suffix_out) {
static inline int64_t durationToHumanReadable(int64_t t, const char*& suffix_out) {
static const char* suffix_arr[] {
"ms",
"s",