#pragma once #include #include #include #include "./frame_stream2.hpp" #include #include #include #include #include #include // fwd class StreamManager; namespace Components { struct StreamSource { std::string name; std::string frame_type_name; std::function connect_fn; template static StreamSource create(const std::string& name); }; struct StreamSink { std::string name; std::string frame_type_name; template static StreamSink create(const std::string& name); }; template using FrameStream2Source = std::unique_ptr>; template using FrameStream2Sink = std::unique_ptr>; } // Components class StreamManager { friend class StreamManagerUI; // TODO: make this go away ObjectStore2& _os; struct Connection { ObjectHandle src; ObjectHandle sink; struct Data { virtual ~Data(void) {} }; std::unique_ptr data; // stores reader writer type erased std::function pump_fn; // TODO: make it return next interval? std::function unsubscribe_fn; bool on_main_thread {true}; std::atomic_bool stop {false}; // disconnect std::atomic_bool finished {false}; // disconnect // pump thread std::thread pump_thread; // frame interval counters and estimates // TODO Connection(void) = default; Connection( ObjectHandle src_, ObjectHandle sink_, std::unique_ptr&& data_, std::function&& pump_fn_, std::function&& unsubscribe_fn_, bool on_main_thread_ = true ); }; std::vector> _connections; public: StreamManager(ObjectStore2& os); virtual ~StreamManager(void); template bool connect(Object src, Object sink, bool threaded = true); bool connect(Object src, Object sink, bool threaded = true); bool disconnect(Object src, Object sink); bool disconnectAll(Object o); // do we need the time delta? float tick(float); }; // template impls namespace Components { // we require the complete sm type here template StreamSource StreamSource::create(const std::string& name) { return StreamSource{ name, std::string{entt::type_name::value()}, +[](StreamManager& sm, Object src, Object sink, bool threaded) { return sm.connect(src, sink, threaded); }, }; } template StreamSink StreamSink::create(const std::string& name) { return StreamSink{ name, std::string{entt::type_name::value()}, }; } } // Components template bool StreamManager::connect(Object src, Object sink, bool threaded) { auto res = std::find_if( _connections.cbegin(), _connections.cend(), [&](const auto& a) { return a->src == src && a->sink == sink; } ); if (res != _connections.cend()) { // already exists return false; } auto h_src = _os.objectHandle(src); auto h_sink = _os.objectHandle(sink); if (!static_cast(h_src) || !static_cast(h_sink)) { // an object does not exist return false; } if (!h_src.all_of>()) { // src not stream source return false; } if (!h_sink.all_of>()) { // sink not stream sink return false; } auto& src_stream = h_src.get>(); auto& sink_stream = h_sink.get>(); struct inlineData : public Connection::Data { virtual ~inlineData(void) {} std::shared_ptr> reader; std::shared_ptr> writer; }; auto our_data = std::make_unique(); our_data->reader = src_stream->subscribe(); if (!our_data->reader) { return false; } our_data->writer = sink_stream->subscribe(); if (!our_data->writer) { return false; } _connections.push_back(std::make_unique( h_src, h_sink, std::move(our_data), [](Connection& con) -> void { // there might be more stored for (size_t i = 0; i < 10; i++) { auto new_frame_opt = static_cast(con.data.get())->reader->pop(); // TODO: frame interval estimates if (new_frame_opt.has_value()) { static_cast(con.data.get())->writer->push(new_frame_opt.value()); } else { break; } } }, [](Connection& con) -> void { auto* src_stream_ptr = con.src.try_get>(); if (src_stream_ptr != nullptr) { (*src_stream_ptr)->unsubscribe(static_cast(con.data.get())->reader); } auto* sink_stream_ptr = con.sink.try_get>(); if (sink_stream_ptr != nullptr) { (*sink_stream_ptr)->unsubscribe(static_cast(con.data.get())->writer); } }, !threaded )); return true; }