Program Listing for File command.h

Return to documentation for file (liberate/concurrency/command.h)

/*
 * This file is part of liberate.
 *
 * Author(s): Jens Finkhaeuser <jens@finkhaeuser.de>
 *
 * Copyright (c) 2022 Interpeer gUG (haftungsbeschränkt)
 *
 * SPDX-License-Identifier: GPL-3.0-only
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
#ifndef LIBERATE_CONCURRENCY_COMMAND_H
#define LIBERATE_CONCURRENCY_COMMAND_H

#ifndef __cplusplus
#error You are trying to include a C++ only header file
#endif

#include <liberate.h>

#include <memory>
#include <functional>

#include <liberate/concurrency/concurrent_queue.h>

namespace liberate::concurrency::command {

using command_type = int;

struct command_context_base
{
  inline explicit command_context_base(command_type _type)
    : type{_type}
  {
  }

  virtual ~command_context_base() = default;

  template<
    typename derivedT
  >
  inline derivedT *
  as(command_type const & target_type)
  {
    if (target_type != type) {
      return nullptr;
    }
    return reinterpret_cast<derivedT *>(this);
  }

  template<
    typename derivedT
  >
  inline derivedT const *
  as(command_type const & target_type) const
  {
    if (target_type != type) {
      return nullptr;
    }
    return reinterpret_cast<derivedT const *>(this);
  }


  command_type const  type;
  void *              context = nullptr;
};


struct void_t {};


template <
  typename paramsT = void_t,
  typename resultsT = void_t
>
struct command_context : public command_context_base
{
  using params_type = paramsT;
  using results_type = resultsT;

  inline explicit command_context(command_type _type)
    : command_context_base{_type}
  {
  }

  virtual ~command_context() = default;

  std::unique_ptr<params_type>  parameters = {};
  std::unique_ptr<results_type> results = {};
};


template <
  typename commandT,
  typename... argsT
>
inline std::unique_ptr<command_context_base>
create_context(command_type type, argsT && ...args)
{
  auto ptr = std::make_unique<commandT>(type);
  ptr->parameters = std::unique_ptr<
    typename commandT::params_type
  >{new typename commandT::params_type{std::forward<argsT>(args)...}};
  return ptr;
}


template <
  template <typename...> typename queueT
>
class command_queue_base
{
public:
  using command_ptr = std::unique_ptr<command_context_base>;

  virtual ~command_queue_base() = default;

  inline void enqueue_command(command_ptr && command)
  {
    push(to_process, std::move(command));
  }

  inline command_ptr dequeue_command()
  {
    return pop(to_process);
  }


  inline void put_results(command_ptr && results)
  {
    push(done, std::move(results));
  }


  inline command_ptr get_completed()
  {
    return pop(done);
  }


private:

  using queue_impl = queueT<command_ptr>;
  queue_impl  to_process;
  queue_impl  done;

  inline static void push(queue_impl & queue, command_ptr && value)
  {
    queue.push_back(std::move(value));
  }

  inline static command_ptr pop(queue_impl & queue)
  {
    if (queue.empty()) {
      return {};
    }
    auto res = std::move(queue.front());
    queue.pop_front();
    return res;
  }

};


template <>
class command_queue_base<::liberate::concurrency::concurrent_queue>
{
public:
  using command_ptr = std::shared_ptr<command_context_base>;

  virtual ~command_queue_base() = default;

  inline void enqueue_command(command_ptr && command)
  {
    push(to_process, std::move(command));
  }

  inline command_ptr dequeue_command()
  {
    return pop(to_process);
  }

  inline void put_results(command_ptr && results)
  {
    push(done, std::move(results));
  }

  inline command_ptr get_completed()
  {
    return pop(done);
  }

private:
  using queue_impl = typename ::liberate::concurrency::concurrent_queue<command_ptr>;
  queue_impl  to_process = {};
  queue_impl  done = {};


  inline static void push(queue_impl & queue, command_ptr && command)
  {
    queue.push(std::move(command));
  }

  inline static command_ptr pop(queue_impl & queue)
  {
    command_ptr res;
    if (queue.pop(res)) {
      return res;
    }
    return {};
  }
};


template <
  template <typename> typename queueT
>
class parametrized_command_queue : private command_queue_base<queueT>
{
public:
  using base_type = command_queue_base<queueT>;
  using command_ptr = typename base_type::command_ptr;

  using notification_function = std::function<void (parametrized_command_queue &)>;

  inline parametrized_command_queue() = default;

  inline explicit parametrized_command_queue(
      notification_function general_notification
  )
    : m_command_func{general_notification}
    , m_result_func{general_notification}
  {
  }

  inline explicit parametrized_command_queue(
      notification_function command_notification,
      notification_function result_notification
  )
    : m_command_func{command_notification}
    , m_result_func{result_notification}
  {
  }

  virtual ~parametrized_command_queue() = default;


  template <
    typename commandT,
    typename... argsT
  >
  inline void
  enqueue_command(command_type type, argsT && ...args)
  {
    auto ptr = create_context<commandT>(type, std::forward<argsT>(args)...);
    enqueue_command(std::move(ptr));
  }

  inline void enqueue_command(command_ptr && command)
  {
    base_type::enqueue_command(std::move(command));
    ++m_command_size;
    if (m_command_func) {
      m_command_func(*this);
    }
  }


  inline command_ptr dequeue_command()
  {
    auto ret = std::move(base_type::dequeue_command());
    if (ret) {
      --m_command_size;
    }
    return ret;
  }


  inline void put_results(command_ptr && results)
  {
    base_type::put_results(std::move(results));
    ++m_result_size;
    if (m_result_func) {
      m_result_func(*this);
    }
  }

  template <
    typename commandT,
    typename... argsT
  >
  inline void put_results(command_ptr && results, argsT && ...args)
  {
    if (results) {
      // There can be no error here as we're passing the type we're going to
      // compare to.
      auto derived = results->template as<commandT>(results->type);
      auto ptr = std::unique_ptr<typename commandT::results_type>(
        new typename commandT::results_type{std::forward<argsT>(args)...}
      );
      derived->results = std::move(ptr);
    }
    put_results(std::move(results));
  }


  inline command_ptr get_completed()
  {
    auto ret = std::move(base_type::get_completed());
    if (ret) {
      --m_result_size;
    }
    return ret;
  }


  inline std::size_t commands() const
  {
    return m_command_size;
  }

  inline std::size_t results() const
  {
    return m_result_size;
  }

  inline bool empty() const
  {
    return m_command_size == 0 && m_result_size == 0;
  }

private:
  std::atomic<std::size_t>  m_command_size = 0;
  std::atomic<std::size_t>  m_result_size  = 0;

  notification_function     m_command_func = {};
  notification_function     m_result_func = {};
};

using concurrent_command_queue = parametrized_command_queue<
  ::liberate::concurrency::concurrent_queue
>;

} // namespace liberate::concurrency::command


#endif // guard