Program Listing for File concurrent_queue.h

Return to documentation for file (liberate/concurrency/concurrent_queue.h)

/*
 * This file is part of liberate.
 *
 * Author(s): Jens Finkhaeuser <jens@finkhaeuser.de>
 *
 * Copyright (c) 2011 Jens Finkhaeuser.
 * Copyright (c) 2012-2014 Unwesen Ltd.
 * Copyright (c) 2015-2021 Jens Finkhaeuser.
 * Copyright (c) 2022 Interpeer gUG (haftungsbeschränkt)
 *
 * SPDX-License-Identifier: GPL-3.0-only
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
#ifndef LIBERATE_CONCURRENCY_CONCURRENT_QUEUE_H
#define LIBERATE_CONCURRENCY_CONCURRENT_QUEUE_H

#ifndef __cplusplus
#error You are trying to include a C++ only header file
#endif

#include <liberate.h>

#include <atomic>
#include <cstddef>

namespace liberate::concurrency {

template <typename valueT>
class concurrent_queue
{
public:
  /***************************************************************************
   * STL-ish types
   **/
  using size_type = size_t;
  using value_type = valueT;
  /***************************************************************************
   * Implementation
   **/

  inline concurrent_queue()
  {
    m_first = m_last = new node(nullptr);
    m_producer_lock = m_consumer_lock = false;
  }



  inline ~concurrent_queue()
  {
    while (m_consumer_lock.exchange(true)) {}
    while (m_producer_lock.exchange(true)) {}

    while (nullptr != m_first) {
      node * tmp = m_first;
      m_first = tmp->m_next;
      delete tmp;
    }
  }



  // cppcheck-suppress constParameter
  inline void push(valueT const & value)
  {
    node * tmp = new node(new valueT(value));

    while (m_producer_lock.exchange(true)) {}

    m_last->m_next = tmp;
    m_last = tmp;

    m_producer_lock = false;
  }



  template <typename iterT>
  inline void push_range(iterT const & begin, iterT const & end)
  {
    for (iterT iter = begin ; iter != end ; ++iter) {
      push(*iter);
    }
  }



  inline bool pop(valueT & result)
  {
    while (m_consumer_lock.exchange(true)) {}

    node * first = m_first;
    node * next = m_first->m_next;

    if (nullptr == next) {
      m_consumer_lock = false;
      return false;
    }

    valueT * val = next->m_value;
    next->m_value = nullptr;
    m_first = next;
    m_consumer_lock = false;

    result = *val;
    delete val;
    delete first;

    return true;
  }


  inline std::tuple<bool, valueT>
  pop()
  {
    valueT val{};
    auto res = pop(val);
    return {res, val};
  }


  inline bool empty() const
  {
    while (m_consumer_lock.exchange(true)) {}

    bool ret = (nullptr == m_first->m_next);

    m_consumer_lock = false;

    return ret;
  }



  inline size_type size() const
  {
    while (m_consumer_lock.exchange(true)) {}

    size_type count = 0;
    node * cur = m_first->m_next;
    for ( ; nullptr != cur ; cur = cur->m_next, ++count) {}

    m_consumer_lock = false;

    return count;
  }


private:

  struct node
  {
    explicit node(valueT * value)
      : m_value(value)
      , m_next(nullptr)
    {
    }

    ~node()
    {
      m_next = nullptr;
      delete m_value;
    }

    valueT *            m_value;
    std::atomic<node *> m_next;
  };

  node *                    m_first;
  mutable std::atomic<bool> m_consumer_lock;

  node *                    m_last;
  mutable std::atomic<bool> m_producer_lock;
};

} // namespace liberate::concurrency

#endif // guard