.. _program_listing_file_liberate_concurrency_command.h: Program Listing for File command.h ================================== |exhale_lsh| :ref:`Return to documentation for file ` (``liberate/concurrency/command.h``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp /* * This file is part of liberate. * * Author(s): Jens Finkhaeuser * * Copyright (c) 2022 Interpeer gUG (haftungsbeschränkt) * * SPDX-License-Identifier: GPL-3.0-only * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #ifndef LIBERATE_CONCURRENCY_COMMAND_H #define LIBERATE_CONCURRENCY_COMMAND_H #ifndef __cplusplus #error You are trying to include a C++ only header file #endif #include #include #include #include namespace liberate::concurrency::command { using command_type = int; struct command_context_base { inline explicit command_context_base(command_type _type) : type{_type} { } virtual ~command_context_base() = default; template< typename derivedT > inline derivedT * as(command_type const & target_type) { if (target_type != type) { return nullptr; } return reinterpret_cast(this); } template< typename derivedT > inline derivedT const * as(command_type const & target_type) const { if (target_type != type) { return nullptr; } return reinterpret_cast(this); } command_type const type; void * context = nullptr; }; struct void_t {}; template < typename paramsT = void_t, typename resultsT = void_t > struct command_context : public command_context_base { using params_type = paramsT; using results_type = resultsT; inline explicit command_context(command_type _type) : command_context_base{_type} { } virtual ~command_context() = default; std::unique_ptr parameters = {}; std::unique_ptr results = {}; }; template < typename commandT, typename... argsT > inline std::unique_ptr create_context(command_type type, argsT && ...args) { auto ptr = std::make_unique(type); ptr->parameters = std::unique_ptr< typename commandT::params_type >{new typename commandT::params_type{std::forward(args)...}}; return ptr; } template < template typename queueT > class command_queue_base { public: using command_ptr = std::unique_ptr; virtual ~command_queue_base() = default; inline void enqueue_command(command_ptr && command) { push(to_process, std::move(command)); } inline command_ptr dequeue_command() { return pop(to_process); } inline void put_results(command_ptr && results) { push(done, std::move(results)); } inline command_ptr get_completed() { return pop(done); } private: using queue_impl = queueT; queue_impl to_process; queue_impl done; inline static void push(queue_impl & queue, command_ptr && value) { queue.push_back(std::move(value)); } inline static command_ptr pop(queue_impl & queue) { if (queue.empty()) { return {}; } auto res = std::move(queue.front()); queue.pop_front(); return res; } }; template <> class command_queue_base<::liberate::concurrency::concurrent_queue> { public: using command_ptr = std::shared_ptr; virtual ~command_queue_base() = default; inline void enqueue_command(command_ptr && command) { push(to_process, std::move(command)); } inline command_ptr dequeue_command() { return pop(to_process); } inline void put_results(command_ptr && results) { push(done, std::move(results)); } inline command_ptr get_completed() { return pop(done); } private: using queue_impl = typename ::liberate::concurrency::concurrent_queue; queue_impl to_process = {}; queue_impl done = {}; inline static void push(queue_impl & queue, command_ptr && command) { queue.push(std::move(command)); } inline static command_ptr pop(queue_impl & queue) { command_ptr res; if (queue.pop(res)) { return res; } return {}; } }; template < template typename queueT > class parametrized_command_queue : private command_queue_base { public: using base_type = command_queue_base; using command_ptr = typename base_type::command_ptr; using notification_function = std::function; inline parametrized_command_queue() = default; inline explicit parametrized_command_queue( notification_function general_notification ) : m_command_func{general_notification} , m_result_func{general_notification} { } inline explicit parametrized_command_queue( notification_function command_notification, notification_function result_notification ) : m_command_func{command_notification} , m_result_func{result_notification} { } virtual ~parametrized_command_queue() = default; template < typename commandT, typename... argsT > inline void enqueue_command(command_type type, argsT && ...args) { auto ptr = create_context(type, std::forward(args)...); enqueue_command(std::move(ptr)); } inline void enqueue_command(command_ptr && command) { base_type::enqueue_command(std::move(command)); ++m_command_size; if (m_command_func) { m_command_func(*this); } } inline command_ptr dequeue_command() { auto ret = std::move(base_type::dequeue_command()); if (ret) { --m_command_size; } return ret; } inline void put_results(command_ptr && results) { base_type::put_results(std::move(results)); ++m_result_size; if (m_result_func) { m_result_func(*this); } } template < typename commandT, typename... argsT > inline void put_results(command_ptr && results, argsT && ...args) { if (results) { // There can be no error here as we're passing the type we're going to // compare to. auto derived = results->template as(results->type); auto ptr = std::unique_ptr( new typename commandT::results_type{std::forward(args)...} ); derived->results = std::move(ptr); } put_results(std::move(results)); } inline command_ptr get_completed() { auto ret = std::move(base_type::get_completed()); if (ret) { --m_result_size; } return ret; } inline std::size_t commands() const { return m_command_size; } inline std::size_t results() const { return m_result_size; } inline bool empty() const { return m_command_size == 0 && m_result_size == 0; } private: std::atomic m_command_size = 0; std::atomic m_result_size = 0; notification_function m_command_func = {}; notification_function m_result_func = {}; }; using concurrent_command_queue = parametrized_command_queue< ::liberate::concurrency::concurrent_queue >; } // namespace liberate::concurrency::command #endif // guard