From f46d0a713da26c72870664a64b16fa1277322b9e Mon Sep 17 00:00:00 2001 From: Green Sky Date: Thu, 25 Apr 2024 14:13:51 +0200 Subject: [PATCH] framestream2 (with rigtorp/SPSCQueue) --- src/CMakeLists.txt | 11 +- src/content/SPSCQueue.h | 237 ++++++++++++++++++++++++ src/content/frame_stream2.hpp | 99 ++++++++++ src/content/sdl_video_frame_stream2.cpp | 81 ++++++++ src/content/sdl_video_frame_stream2.hpp | 66 +++++++ src/content/stream_reader_sdl_video.cpp | 84 +++++++++ src/content/stream_reader_sdl_video.hpp | 34 ++++ src/image_loader.hpp | 2 + src/main.cpp | 21 +++ 9 files changed, 632 insertions(+), 3 deletions(-) create mode 100644 src/content/SPSCQueue.h create mode 100644 src/content/frame_stream2.hpp create mode 100644 src/content/sdl_video_frame_stream2.cpp create mode 100644 src/content/sdl_video_frame_stream2.hpp create mode 100644 src/content/stream_reader_sdl_video.cpp create mode 100644 src/content/stream_reader_sdl_video.hpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index c49cf6e..2eca139 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -76,9 +76,14 @@ add_executable(tomato ./chat_gui4.cpp ./content/content.hpp - ./content/stream_reader.hpp - ./content/stream_reader_sdl_audio.hpp - ./content/stream_reader_sdl_audio.cpp + #./content/stream_reader.hpp + #./content/stream_reader_sdl_audio.hpp + #./content/stream_reader_sdl_audio.cpp + #./content/stream_reader_sdl_video.hpp + #./content/stream_reader_sdl_video.cpp + ./content/frame_stream2.hpp + ./content/sdl_video_frame_stream2.hpp + ./content/sdl_video_frame_stream2.cpp ) target_compile_features(tomato PUBLIC cxx_std_17) diff --git a/src/content/SPSCQueue.h b/src/content/SPSCQueue.h new file mode 100644 index 0000000..0693ace --- /dev/null +++ b/src/content/SPSCQueue.h @@ -0,0 +1,237 @@ +/* +Copyright (c) 2020 Erik Rigtorp + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + */ + +#pragma once + +#include +#include +#include +#include // std::allocator +#include // std::hardware_destructive_interference_size +#include +#include // std::enable_if, std::is_*_constructible + +#ifdef __has_cpp_attribute +#if __has_cpp_attribute(nodiscard) +#define RIGTORP_NODISCARD [[nodiscard]] +#endif +#endif +#ifndef RIGTORP_NODISCARD +#define RIGTORP_NODISCARD +#endif + +namespace rigtorp { + +template > class SPSCQueue { + +#if defined(__cpp_if_constexpr) && defined(__cpp_lib_void_t) + template + struct has_allocate_at_least : std::false_type {}; + + template + struct has_allocate_at_least< + Alloc2, std::void_t().allocate_at_least( + size_t{}))>> : std::true_type {}; +#endif + +public: + explicit SPSCQueue(const size_t capacity, + const Allocator &allocator = Allocator()) + : capacity_(capacity), allocator_(allocator) { + // The queue needs at least one element + if (capacity_ < 1) { + capacity_ = 1; + } + capacity_++; // Needs one slack element + // Prevent overflowing size_t + if (capacity_ > SIZE_MAX - 2 * kPadding) { + capacity_ = SIZE_MAX - 2 * kPadding; + } + +#if defined(__cpp_if_constexpr) && defined(__cpp_lib_void_t) + if constexpr (has_allocate_at_least::value) { + auto res = allocator_.allocate_at_least(capacity_ + 2 * kPadding); + slots_ = res.ptr; + capacity_ = res.count - 2 * kPadding; + } else { + slots_ = std::allocator_traits::allocate( + allocator_, capacity_ + 2 * kPadding); + } +#else + slots_ = std::allocator_traits::allocate( + allocator_, capacity_ + 2 * kPadding); +#endif + + static_assert(alignof(SPSCQueue) == kCacheLineSize, ""); + static_assert(sizeof(SPSCQueue) >= 3 * kCacheLineSize, ""); + assert(reinterpret_cast(&readIdx_) - + reinterpret_cast(&writeIdx_) >= + static_cast(kCacheLineSize)); + } + + ~SPSCQueue() { + while (front()) { + pop(); + } + std::allocator_traits::deallocate(allocator_, slots_, + capacity_ + 2 * kPadding); + } + + // non-copyable and non-movable + SPSCQueue(const SPSCQueue &) = delete; + SPSCQueue &operator=(const SPSCQueue &) = delete; + + template + void emplace(Args &&...args) noexcept( + std::is_nothrow_constructible::value) { + static_assert(std::is_constructible::value, + "T must be constructible with Args&&..."); + auto const writeIdx = writeIdx_.load(std::memory_order_relaxed); + auto nextWriteIdx = writeIdx + 1; + if (nextWriteIdx == capacity_) { + nextWriteIdx = 0; + } + while (nextWriteIdx == readIdxCache_) { + readIdxCache_ = readIdx_.load(std::memory_order_acquire); + } + new (&slots_[writeIdx + kPadding]) T(std::forward(args)...); + writeIdx_.store(nextWriteIdx, std::memory_order_release); + } + + template + RIGTORP_NODISCARD bool try_emplace(Args &&...args) noexcept( + std::is_nothrow_constructible::value) { + static_assert(std::is_constructible::value, + "T must be constructible with Args&&..."); + auto const writeIdx = writeIdx_.load(std::memory_order_relaxed); + auto nextWriteIdx = writeIdx + 1; + if (nextWriteIdx == capacity_) { + nextWriteIdx = 0; + } + if (nextWriteIdx == readIdxCache_) { + readIdxCache_ = readIdx_.load(std::memory_order_acquire); + if (nextWriteIdx == readIdxCache_) { + return false; + } + } + new (&slots_[writeIdx + kPadding]) T(std::forward(args)...); + writeIdx_.store(nextWriteIdx, std::memory_order_release); + return true; + } + + void push(const T &v) noexcept(std::is_nothrow_copy_constructible::value) { + static_assert(std::is_copy_constructible::value, + "T must be copy constructible"); + emplace(v); + } + + template ::value>::type> + void push(P &&v) noexcept(std::is_nothrow_constructible::value) { + emplace(std::forward

(v)); + } + + RIGTORP_NODISCARD bool + try_push(const T &v) noexcept(std::is_nothrow_copy_constructible::value) { + static_assert(std::is_copy_constructible::value, + "T must be copy constructible"); + return try_emplace(v); + } + + template ::value>::type> + RIGTORP_NODISCARD bool + try_push(P &&v) noexcept(std::is_nothrow_constructible::value) { + return try_emplace(std::forward

(v)); + } + + RIGTORP_NODISCARD T *front() noexcept { + auto const readIdx = readIdx_.load(std::memory_order_relaxed); + if (readIdx == writeIdxCache_) { + writeIdxCache_ = writeIdx_.load(std::memory_order_acquire); + if (writeIdxCache_ == readIdx) { + return nullptr; + } + } + return &slots_[readIdx + kPadding]; + } + + void pop() noexcept { + static_assert(std::is_nothrow_destructible::value, + "T must be nothrow destructible"); + auto const readIdx = readIdx_.load(std::memory_order_relaxed); + assert(writeIdx_.load(std::memory_order_acquire) != readIdx && + "Can only call pop() after front() has returned a non-nullptr"); + slots_[readIdx + kPadding].~T(); + auto nextReadIdx = readIdx + 1; + if (nextReadIdx == capacity_) { + nextReadIdx = 0; + } + readIdx_.store(nextReadIdx, std::memory_order_release); + } + + RIGTORP_NODISCARD size_t size() const noexcept { + std::ptrdiff_t diff = writeIdx_.load(std::memory_order_acquire) - + readIdx_.load(std::memory_order_acquire); + if (diff < 0) { + diff += capacity_; + } + return static_cast(diff); + } + + RIGTORP_NODISCARD bool empty() const noexcept { + return writeIdx_.load(std::memory_order_acquire) == + readIdx_.load(std::memory_order_acquire); + } + + RIGTORP_NODISCARD size_t capacity() const noexcept { return capacity_ - 1; } + +private: +#ifdef __cpp_lib_hardware_interference_size + static constexpr size_t kCacheLineSize = + std::hardware_destructive_interference_size; +#else + static constexpr size_t kCacheLineSize = 64; +#endif + + // Padding to avoid false sharing between slots_ and adjacent allocations + static constexpr size_t kPadding = (kCacheLineSize - 1) / sizeof(T) + 1; + +private: + size_t capacity_; + T *slots_; +#if defined(__has_cpp_attribute) && __has_cpp_attribute(no_unique_address) + Allocator allocator_ [[no_unique_address]]; +#else + Allocator allocator_; +#endif + + // Align to cache line size in order to avoid false sharing + // readIdxCache_ and writeIdxCache_ is used to reduce the amount of cache + // coherency traffic + alignas(kCacheLineSize) std::atomic writeIdx_ = {0}; + alignas(kCacheLineSize) size_t readIdxCache_ = 0; + alignas(kCacheLineSize) std::atomic readIdx_ = {0}; + alignas(kCacheLineSize) size_t writeIdxCache_ = 0; +}; +} // namespace rigtorp diff --git a/src/content/frame_stream2.hpp b/src/content/frame_stream2.hpp new file mode 100644 index 0000000..98d044e --- /dev/null +++ b/src/content/frame_stream2.hpp @@ -0,0 +1,99 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "./SPSCQueue.h" + +// Frames ofen consist of: +// - seq id // incremental sequential id, gaps in ids can be used to detect loss +// - data // the frame data +// eg: +//struct ExampleFrame { + //int64_t seq_id {0}; + //std::vector data; +//}; + +template +struct FrameStream2Reader { + // get number of available frames + virtual int32_t getSize(void) = 0; + + // get next frame + // TODO: optional instead? + // data sharing? -> no, data is copied for each fsr, if concurency supported + virtual std::optional getNext(void) = 0; +}; + +// needs count frames queue size +// having ~1-2sec buffer size is often sufficent +template +struct QueuedFrameStream2Reader : public FrameStream2Reader { + using frame_type = FrameType; + + rigtorp::SPSCQueue _queue; + + explicit QueuedFrameStream2Reader(size_t queue_size) : _queue(queue_size) {} + + [[nodiscard]] int32_t getSize(void) override { + return _queue.size(); + } + + [[nodiscard]] std::optional getNext(void) override { + auto* ret_ptr = _queue.front(); + if (ret_ptr == nullptr) { + return std::nullopt; + } + + // move away + FrameType ret = std::move(*ret_ptr); + + _queue.pop(); + + return ret; + } +}; + +template +struct MultiplexedQueuedFrameStream2Writer { + using ReaderType = QueuedFrameStream2Reader; + + // TODO: expose + const size_t _queue_size {10}; + + // pointer stability + std::vector> _readers; + std::mutex _readers_lock; // accessing the _readers array needs to be exclusive + // a simple lock here is ok, since this tends to be a rare operation, + // except for the push, which is always on the same thread + + ReaderType* aquireReader(void) { + std::lock_guard lg{_readers_lock}; + return _readers.emplace_back(std::make_unique(_queue_size)).get(); + } + + void releaseReader(ReaderType* reader) { + std::lock_guard lg{_readers_lock}; + for (auto it = _readers.begin(); it != _readers.end(); it++) { + if (it->get() == reader) { + _readers.erase(it); + break; + } + } + } + + // returns true if there are readers + bool pushValue(const FrameType& value) { + std::lock_guard lg{_readers_lock}; + bool have_readers{false}; + for (auto& it : _readers) { + [[maybe_unused]] auto _ = it->_queue.try_emplace(value); + have_readers = true; // even if queue full, we still continue believing in them + } + return have_readers; + } +}; + diff --git a/src/content/sdl_video_frame_stream2.cpp b/src/content/sdl_video_frame_stream2.cpp new file mode 100644 index 0000000..4a3b843 --- /dev/null +++ b/src/content/sdl_video_frame_stream2.cpp @@ -0,0 +1,81 @@ +#include "./sdl_video_frame_stream2.hpp" + +#include +#include +#include +#include +#include + +SDLVideoCameraContent::SDLVideoCameraContent(void) { + int devcount = 0; + SDL_CameraDeviceID *devices = SDL_GetCameraDevices(&devcount); + + if (devices == nullptr || devcount < 1) { + throw int(1); // TODO: proper exception? + } + + std::cout << "### found cameras:\n"; + for (int i = 0; i < devcount; i++) { + const SDL_CameraDeviceID device = devices[i]; + char *name = SDL_GetCameraDeviceName(device); + + std::cout << " - Camera #" << i << ": " << name << "\n"; + + SDL_free(name); + } + + _camera = { + SDL_OpenCameraDevice(devices[0], nullptr), + &SDL_CloseCamera + }; + if (!static_cast(_camera)) { + throw int(2); + } + + SDL_CameraSpec spec; + float interval {0.1f}; + if (SDL_GetCameraFormat(_camera.get(), &spec) < 0) { + // meh + } else { + // interval + interval = float(spec.interval_numerator)/float(spec.interval_denominator); + std::cout << "camera interval: " << interval*1000 << "ms\n"; + } + + _thread = std::thread([this, interval](void) { + while (!_thread_should_quit) { + Uint64 timestampNS = 0; + SDL_Surface* sdl_frame_next = SDL_AcquireCameraFrame(_camera.get(), ×tampNS); + + // no new frame yet, or error + if (sdl_frame_next == nullptr) { + // only sleep 1/10, we expected a frame + std::this_thread::sleep_for(std::chrono::milliseconds(int64_t(interval*1000 / 10))); + continue; + } + + SDLVideoFrame new_frame { + timestampNS, + sdl_frame_next + }; + + bool someone_listening = pushValue(new_frame); + + SDL_ReleaseCameraFrame(_camera.get(), sdl_frame_next); + + if (someone_listening) { + // TODO: maybe double the freq? + std::this_thread::sleep_for(std::chrono::milliseconds(int64_t(interval*1000))); + } else { + // we just sleep 4x as long, bc no one is listening + std::this_thread::sleep_for(std::chrono::milliseconds(int64_t(interval*1000*4))); + } + } + }); +} + +SDLVideoCameraContent::~SDLVideoCameraContent(void) { + _thread_should_quit = true; + _thread.join(); + // TODO: what to do if readers are still present? +} diff --git a/src/content/sdl_video_frame_stream2.hpp b/src/content/sdl_video_frame_stream2.hpp new file mode 100644 index 0000000..f39be99 --- /dev/null +++ b/src/content/sdl_video_frame_stream2.hpp @@ -0,0 +1,66 @@ +#pragma once + +#include "./frame_stream2.hpp" +#include "SDL_surface.h" + +#include + +#include +#include + +inline void nopSurfaceDestructor(SDL_Surface*) {} + +// this is very sdl specific +struct SDLVideoFrame { + // TODO: sequence numbering? + uint64_t timestampNS {0}; + + std::unique_ptr surface {nullptr, &SDL_DestroySurface}; + + // special non-owning constructor? + SDLVideoFrame( + uint64_t ts, + SDL_Surface* surf + ) { + timestampNS = ts; + surface = {surf, &nopSurfaceDestructor}; + } + // copy + SDLVideoFrame(const SDLVideoFrame& other) { + timestampNS = other.timestampNS; + if (static_cast(other.surface)) { + surface = { + SDL_CreateSurface( + other.surface->w, + other.surface->h, + SDL_PIXELFORMAT_RGBA8888 // meh + ), + &SDL_DestroySurface + }; + SDL_BlitSurface(other.surface.get(), nullptr, surface.get(), nullptr); + } + } + SDLVideoFrame& operator=(const SDLVideoFrame& other) = delete; +}; + +using SDLVideoFrameStream2Writer = MultiplexedQueuedFrameStream2Writer; +using SDLVideoFrameStream2Reader = SDLVideoFrameStream2Writer::ReaderType; + +struct SDLVideoCameraContent : protected SDLVideoFrameStream2Writer { + // meh, empty default + std::unique_ptr _camera {nullptr, &SDL_CloseCamera}; + std::atomic _thread_should_quit {false}; + std::thread _thread; + + // construct source and start thread + // TODO: optimize so the thread is not always running + SDLVideoCameraContent(void); + + // stops the thread and closes the camera + ~SDLVideoCameraContent(void); + + // make only some of writer public + using SDLVideoFrameStream2Writer::aquireReader; + using SDLVideoFrameStream2Writer::releaseReader; +}; + diff --git a/src/content/stream_reader_sdl_video.cpp b/src/content/stream_reader_sdl_video.cpp new file mode 100644 index 0000000..a134e07 --- /dev/null +++ b/src/content/stream_reader_sdl_video.cpp @@ -0,0 +1,84 @@ +#include "./stream_reader_sdl_video.hpp" + +#include + +SDLVideoFrameStreamReader::SDLVideoFrameStreamReader() : _camera{nullptr, &SDL_CloseCamera}, _surface{nullptr, &SDL_DestroySurface} { + // enumerate + int devcount = 0; + SDL_CameraDeviceID *devices = SDL_GetCameraDevices(&devcount); + + if (devices == nullptr || devcount < 1) { + throw int(1); // TODO: proper exception? + } + + std::cout << "### found cameras:\n"; + for (int i = 0; i < devcount; i++) { + const SDL_CameraDeviceID device = devices[i]; + char *name = SDL_GetCameraDeviceName(device); + + std::cout << " - Camera #" << i << ": " << name << "\n"; + + SDL_free(name); + } + + _camera = { + SDL_OpenCameraDevice(devices[0], nullptr), + &SDL_CloseCamera + }; + if (!static_cast(_camera)) { + throw int(2); + } + + SDL_CameraSpec spec; + if (SDL_GetCameraFormat(_camera.get(), &spec) < 0) { + // meh + } else { + // interval + float interval = float(spec.interval_numerator)/float(spec.interval_denominator); + std::cout << "camera interval: " << interval*1000 << "ms\n"; + } +} + +SDLVideoFrameStreamReader::VideoFrame SDLVideoFrameStreamReader::getNextVideoFrameRGBA(void) { + if (!static_cast(_camera)) { + return {}; + } + + Uint64 timestampNS = 0; + SDL_Surface* frame_next = SDL_AcquireCameraFrame(_camera.get(), ×tampNS); + + // no new frame yet, or error + if (frame_next == nullptr) { + //std::cout << "failed acquiring frame\n"; + return {}; + } + + // TODO: investigate zero copy + _surface = { + SDL_ConvertSurfaceFormat(frame_next, SDL_PIXELFORMAT_RGBA8888), + &SDL_DestroySurface + }; + + SDL_ReleaseCameraFrame(_camera.get(), frame_next); + + SDL_LockSurface(_surface.get()); + + return { + _surface->w, + _surface->h, + timestampNS, + { + reinterpret_cast(_surface->pixels), + uint64_t(_surface->w*_surface->h*4) // rgba + } + }; +} + +int64_t SDLVideoFrameStreamReader::have(void) { + return -1; +} + +ByteSpan SDLVideoFrameStreamReader::getNext(void) { + return {}; +} + diff --git a/src/content/stream_reader_sdl_video.hpp b/src/content/stream_reader_sdl_video.hpp new file mode 100644 index 0000000..aa7a371 --- /dev/null +++ b/src/content/stream_reader_sdl_video.hpp @@ -0,0 +1,34 @@ +#pragma once + +#include "./stream_reader.hpp" + +#include + +#include +#include +#include + +struct SDLVideoFrameStreamReader : public RawFrameStreamReaderI { + // meh, empty default + std::unique_ptr _camera; + std::unique_ptr _surface; + + SDLVideoFrameStreamReader(void); + + struct VideoFrame { + int32_t width {0}; + int32_t height {0}; + + uint64_t timestampNS {0}; + + ByteSpan data; + }; + + // data owned by StreamReader, overwritten by next call to getNext*() + VideoFrame getNextVideoFrameRGBA(void); + + public: // interface + int64_t have(void) override; + ByteSpan getNext(void) override; +}; + diff --git a/src/image_loader.hpp b/src/image_loader.hpp index 78f2885..522ce53 100644 --- a/src/image_loader.hpp +++ b/src/image_loader.hpp @@ -27,6 +27,8 @@ struct ImageLoaderI { // only positive values are valid ImageResult crop(int32_t c_x, int32_t c_y, int32_t c_w, int32_t c_h) const; + + // TODO: scale }; virtual ImageResult loadFromMemoryRGBA(const uint8_t* data, uint64_t data_size) = 0; }; diff --git a/src/main.cpp b/src/main.cpp index 88567de..a10fed6 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -9,6 +9,7 @@ #include "./chat_gui/theme.hpp" #include "./start_screen.hpp" +#include "./content/sdl_video_frame_stream2.hpp" #include #include @@ -58,6 +59,26 @@ int main(int argc, char** argv) { } } + // optionally init audio and camera + if (SDL_Init(SDL_INIT_AUDIO) < 0) { + std::cerr << "SDL_Init AUDIO failed (" << SDL_GetError() << ")\n"; + } + if (SDL_Init(SDL_INIT_CAMERA) < 0) { + std::cerr << "SDL_Init CAMERA failed (" << SDL_GetError() << ")\n"; + } else { // HACK + SDLVideoCameraContent vcc; + auto* reader = vcc.aquireReader(); + for (size_t i = 0; i < 200; i++) { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + auto new_frame_opt = reader->getNext(); + if (new_frame_opt.has_value()) { + std::cout << "video frame was " << new_frame_opt.value().surface->w << "x" << new_frame_opt.value().surface->h << " " << new_frame_opt.value().timestampNS << "ns\n"; + } + } + vcc.releaseReader(reader); + } + std::cout << "after sdl video stuffery\n"; + IMGUI_CHECKVERSION(); ImGui::CreateContext();