Program Listing for File command.h
↰ Return to documentation for file (liberate/concurrency/command.h
)
/*
* This file is part of liberate.
*
* Author(s): Jens Finkhaeuser <jens@finkhaeuser.de>
*
* Copyright (c) 2022 Interpeer gUG (haftungsbeschränkt)
*
* SPDX-License-Identifier: GPL-3.0-only
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef LIBERATE_CONCURRENCY_COMMAND_H
#define LIBERATE_CONCURRENCY_COMMAND_H
#ifndef __cplusplus
#error You are trying to include a C++ only header file
#endif
#include <liberate.h>
#include <memory>
#include <functional>
#include <liberate/concurrency/concurrent_queue.h>
namespace liberate::concurrency::command {
using command_type = int;
struct command_context_base
{
inline explicit command_context_base(command_type _type)
: type{_type}
{
}
virtual ~command_context_base() = default;
template<
typename derivedT
>
inline derivedT *
as(command_type const & target_type)
{
if (target_type != type) {
return nullptr;
}
return reinterpret_cast<derivedT *>(this);
}
template<
typename derivedT
>
inline derivedT const *
as(command_type const & target_type) const
{
if (target_type != type) {
return nullptr;
}
return reinterpret_cast<derivedT const *>(this);
}
command_type const type;
void * context = nullptr;
};
struct void_t {};
template <
typename paramsT = void_t,
typename resultsT = void_t
>
struct command_context : public command_context_base
{
using params_type = paramsT;
using results_type = resultsT;
inline explicit command_context(command_type _type)
: command_context_base{_type}
{
}
virtual ~command_context() = default;
std::unique_ptr<params_type> parameters = {};
std::unique_ptr<results_type> results = {};
};
template <
typename commandT,
typename... argsT
>
inline std::unique_ptr<command_context_base>
create_context(command_type type, argsT && ...args)
{
auto ptr = std::make_unique<commandT>(type);
ptr->parameters = std::unique_ptr<
typename commandT::params_type
>{new typename commandT::params_type{std::forward<argsT>(args)...}};
return ptr;
}
template <
template <typename...> typename queueT
>
class command_queue_base
{
public:
using command_ptr = std::unique_ptr<command_context_base>;
virtual ~command_queue_base() = default;
inline void enqueue_command(command_ptr && command)
{
push(to_process, std::move(command));
}
inline command_ptr dequeue_command()
{
return pop(to_process);
}
inline void put_results(command_ptr && results)
{
push(done, std::move(results));
}
inline command_ptr get_completed()
{
return pop(done);
}
private:
using queue_impl = queueT<command_ptr>;
queue_impl to_process;
queue_impl done;
inline static void push(queue_impl & queue, command_ptr && value)
{
queue.push_back(std::move(value));
}
inline static command_ptr pop(queue_impl & queue)
{
if (queue.empty()) {
return {};
}
auto res = std::move(queue.front());
queue.pop_front();
return res;
}
};
template <>
class command_queue_base<::liberate::concurrency::concurrent_queue>
{
public:
using command_ptr = std::shared_ptr<command_context_base>;
virtual ~command_queue_base() = default;
inline void enqueue_command(command_ptr && command)
{
push(to_process, std::move(command));
}
inline command_ptr dequeue_command()
{
return pop(to_process);
}
inline void put_results(command_ptr && results)
{
push(done, std::move(results));
}
inline command_ptr get_completed()
{
return pop(done);
}
private:
using queue_impl = typename ::liberate::concurrency::concurrent_queue<command_ptr>;
queue_impl to_process = {};
queue_impl done = {};
inline static void push(queue_impl & queue, command_ptr && command)
{
queue.push(std::move(command));
}
inline static command_ptr pop(queue_impl & queue)
{
command_ptr res;
if (queue.pop(res)) {
return res;
}
return {};
}
};
template <
template <typename> typename queueT
>
class parametrized_command_queue : private command_queue_base<queueT>
{
public:
using base_type = command_queue_base<queueT>;
using command_ptr = typename base_type::command_ptr;
using notification_function = std::function<void (parametrized_command_queue &)>;
inline parametrized_command_queue() = default;
inline explicit parametrized_command_queue(
notification_function general_notification
)
: m_command_func{general_notification}
, m_result_func{general_notification}
{
}
inline explicit parametrized_command_queue(
notification_function command_notification,
notification_function result_notification
)
: m_command_func{command_notification}
, m_result_func{result_notification}
{
}
virtual ~parametrized_command_queue() = default;
template <
typename commandT,
typename... argsT
>
inline void
enqueue_command(command_type type, argsT && ...args)
{
auto ptr = create_context<commandT>(type, std::forward<argsT>(args)...);
enqueue_command(std::move(ptr));
}
inline void enqueue_command(command_ptr && command)
{
base_type::enqueue_command(std::move(command));
++m_command_size;
if (m_command_func) {
m_command_func(*this);
}
}
inline command_ptr dequeue_command()
{
auto ret = std::move(base_type::dequeue_command());
if (ret) {
--m_command_size;
}
return ret;
}
inline void put_results(command_ptr && results)
{
base_type::put_results(std::move(results));
++m_result_size;
if (m_result_func) {
m_result_func(*this);
}
}
template <
typename commandT,
typename... argsT
>
inline void put_results(command_ptr && results, argsT && ...args)
{
if (results) {
// There can be no error here as we're passing the type we're going to
// compare to.
auto derived = results->template as<commandT>(results->type);
auto ptr = std::unique_ptr<typename commandT::results_type>(
new typename commandT::results_type{std::forward<argsT>(args)...}
);
derived->results = std::move(ptr);
}
put_results(std::move(results));
}
inline command_ptr get_completed()
{
auto ret = std::move(base_type::get_completed());
if (ret) {
--m_result_size;
}
return ret;
}
inline std::size_t commands() const
{
return m_command_size;
}
inline std::size_t results() const
{
return m_result_size;
}
inline bool empty() const
{
return m_command_size == 0 && m_result_size == 0;
}
private:
std::atomic<std::size_t> m_command_size = 0;
std::atomic<std::size_t> m_result_size = 0;
notification_function m_command_func = {};
notification_function m_result_func = {};
};
using concurrent_command_queue = parametrized_command_queue<
::liberate::concurrency::concurrent_queue
>;
} // namespace liberate::concurrency::command
#endif // guard