frame stream refactor, i like it now

This commit is contained in:
Green Sky 2024-05-01 15:16:14 +02:00
parent 495ec41234
commit 165e80c456
No known key found for this signature in database
4 changed files with 50 additions and 23 deletions

View File

@ -18,31 +18,38 @@
//};
template<typename FrameType>
struct FrameStream2Reader {
struct FrameStream2 {
// get number of available frames
virtual int32_t getSize(void) = 0;
[[nodiscard]] virtual int32_t size(void) = 0;
// get next frame
// TODO: optional instead?
// data sharing? -> no, data is copied for each fsr, if concurency supported
virtual std::optional<FrameType> getNext(void) = 0;
[[nodiscard]] virtual std::optional<FrameType> pop(void) = 0;
// returns true if there are readers (or we dont know)
virtual bool push(const FrameType& value) = 0;
};
// needs count frames queue size
// having ~1-2sec buffer size is often sufficent
template<typename FrameType>
struct QueuedFrameStream2Reader : public FrameStream2Reader<FrameType> {
struct QueuedFrameStream2 : public FrameStream2<FrameType> {
using frame_type = FrameType;
rigtorp::SPSCQueue<FrameType> _queue;
explicit QueuedFrameStream2Reader(size_t queue_size) : _queue(queue_size) {}
// discard values if queue full
// will block if not lossy and full on push
const bool _lossy {true};
[[nodiscard]] int32_t getSize(void) override {
explicit QueuedFrameStream2(size_t queue_size, bool lossy = true) : _queue(queue_size), _lossy(lossy) {}
int32_t size(void) override {
return _queue.size();
}
[[nodiscard]] std::optional<FrameType> getNext(void) override {
std::optional<FrameType> pop(void) override {
auto* ret_ptr = _queue.front();
if (ret_ptr == nullptr) {
return std::nullopt;
@ -55,14 +62,21 @@ struct QueuedFrameStream2Reader : public FrameStream2Reader<FrameType> {
return ret;
}
bool push(const FrameType& value) override {
if (_lossy) {
[[maybe_unused]] auto _ = _queue.try_emplace(value);
// TODO: maybe return ?
} else {
_queue.push(value);
}
return true;
}
};
template<typename FrameType>
struct MultiplexedQueuedFrameStream2Writer {
using ReaderType = QueuedFrameStream2Reader<FrameType>;
// TODO: expose
const size_t _queue_size {10};
struct QueuedFrameStream2Multiplexer : public FrameStream2<FrameType> {
using ReaderType = QueuedFrameStream2<FrameType>;
// pointer stability
std::vector<std::unique_ptr<ReaderType>> _readers;
@ -70,9 +84,9 @@ struct MultiplexedQueuedFrameStream2Writer {
// a simple lock here is ok, since this tends to be a rare operation,
// except for the push, which is always on the same thread
ReaderType* aquireReader(void) {
ReaderType* aquireReader(size_t queue_size = 10, bool lossy = true) {
std::lock_guard lg{_readers_lock};
return _readers.emplace_back(std::make_unique<ReaderType>(_queue_size)).get();
return _readers.emplace_back(std::make_unique<ReaderType>(queue_size, lossy)).get();
}
void releaseReader(ReaderType* reader) {
@ -85,13 +99,26 @@ struct MultiplexedQueuedFrameStream2Writer {
}
}
// stream interface
int32_t size(void) override {
// TODO: return something sensible?
return -1;
}
std::optional<FrameType> pop(void) override {
assert(false && "tried to pop from a multiplexer");
return std::nullopt;
}
// returns true if there are readers
bool pushValue(const FrameType& value) {
bool push(const FrameType& value) override {
std::lock_guard lg{_readers_lock};
bool have_readers{false};
for (auto& it : _readers) {
[[maybe_unused]] auto _ = it->_queue.try_emplace(value);
[[maybe_unused]] auto _ = it->push(value);
have_readers = true; // even if queue full, we still continue believing in them
// maybe consider push return value?
}
return have_readers;
}

View File

@ -105,7 +105,7 @@ SDLVideoCameraContent::SDLVideoCameraContent(void) {
};
// creates surface copies
someone_listening = pushValue(new_frame_non_owning);
someone_listening = push(new_frame_non_owning);
}
SDL_ReleaseCameraFrame(_camera.get(), sdl_frame_next);

View File

@ -42,10 +42,10 @@ struct SDLVideoFrame {
SDLVideoFrame& operator=(const SDLVideoFrame& other) = delete;
};
using SDLVideoFrameStream2Writer = MultiplexedQueuedFrameStream2Writer<SDLVideoFrame>;
using SDLVideoFrameStream2Reader = SDLVideoFrameStream2Writer::ReaderType;
using SDLVideoFrameStream2Multiplexer = QueuedFrameStream2Multiplexer<SDLVideoFrame>;
using SDLVideoFrameStream2 = SDLVideoFrameStream2Multiplexer::ReaderType;
struct SDLVideoCameraContent : protected SDLVideoFrameStream2Writer {
struct SDLVideoCameraContent : protected SDLVideoFrameStream2Multiplexer {
// meh, empty default
std::unique_ptr<SDL_Camera, decltype(&SDL_CloseCamera)> _camera {nullptr, &SDL_CloseCamera};
std::atomic<bool> _thread_should_quit {false};
@ -59,7 +59,7 @@ struct SDLVideoCameraContent : protected SDLVideoFrameStream2Writer {
~SDLVideoCameraContent(void);
// make only some of writer public
using SDLVideoFrameStream2Writer::aquireReader;
using SDLVideoFrameStream2Writer::releaseReader;
using SDLVideoFrameStream2Multiplexer::aquireReader;
using SDLVideoFrameStream2Multiplexer::releaseReader;
};

View File

@ -85,7 +85,7 @@ int main(int argc, char** argv) {
auto* reader = vcc.aquireReader();
for (size_t i = 0; i < 20; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
auto new_frame_opt = reader->getNext();
auto new_frame_opt = reader->pop();
if (new_frame_opt.has_value()) {
std::cout << "video frame was " << new_frame_opt.value().surface->w << "x" << new_frame_opt.value().surface->h << " " << new_frame_opt.value().timestampNS << "ns\n";
}