Program Listing for File concurrent_queue.h
↰ Return to documentation for file (liberate/concurrency/concurrent_queue.h
)
/*
* This file is part of liberate.
*
* Author(s): Jens Finkhaeuser <jens@finkhaeuser.de>
*
* Copyright (c) 2011 Jens Finkhaeuser.
* Copyright (c) 2012-2014 Unwesen Ltd.
* Copyright (c) 2015-2021 Jens Finkhaeuser.
* Copyright (c) 2022 Interpeer gUG (haftungsbeschränkt)
*
* SPDX-License-Identifier: GPL-3.0-only
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef LIBERATE_CONCURRENCY_CONCURRENT_QUEUE_H
#define LIBERATE_CONCURRENCY_CONCURRENT_QUEUE_H
#ifndef __cplusplus
#error You are trying to include a C++ only header file
#endif
#include <liberate.h>
#include <atomic>
#include <cstddef>
namespace liberate::concurrency {
template <typename valueT>
class concurrent_queue
{
public:
/***************************************************************************
* STL-ish types
**/
using size_type = size_t;
using value_type = valueT;
/***************************************************************************
* Implementation
**/
inline concurrent_queue()
{
m_first = m_last = new node(nullptr);
m_producer_lock = m_consumer_lock = false;
}
inline ~concurrent_queue()
{
while (m_consumer_lock.exchange(true)) {}
while (m_producer_lock.exchange(true)) {}
while (nullptr != m_first) {
node * tmp = m_first;
m_first = tmp->m_next;
delete tmp;
}
}
// cppcheck-suppress constParameter
inline void push(valueT const & value)
{
node * tmp = new node(new valueT(value));
while (m_producer_lock.exchange(true)) {}
m_last->m_next = tmp;
m_last = tmp;
m_producer_lock = false;
}
template <typename iterT>
inline void push_range(iterT const & begin, iterT const & end)
{
for (iterT iter = begin ; iter != end ; ++iter) {
push(*iter);
}
}
inline bool pop(valueT & result)
{
while (m_consumer_lock.exchange(true)) {}
node * first = m_first;
node * next = m_first->m_next;
if (nullptr == next) {
m_consumer_lock = false;
return false;
}
valueT * val = next->m_value;
next->m_value = nullptr;
m_first = next;
m_consumer_lock = false;
result = *val;
delete val;
delete first;
return true;
}
inline std::tuple<bool, valueT>
pop()
{
valueT val{};
auto res = pop(val);
return {res, val};
}
inline bool empty() const
{
while (m_consumer_lock.exchange(true)) {}
bool ret = (nullptr == m_first->m_next);
m_consumer_lock = false;
return ret;
}
inline size_type size() const
{
while (m_consumer_lock.exchange(true)) {}
size_type count = 0;
node * cur = m_first->m_next;
for ( ; nullptr != cur ; cur = cur->m_next, ++count) {}
m_consumer_lock = false;
return count;
}
private:
struct node
{
explicit node(valueT * value)
: m_value(value)
, m_next(nullptr)
{
}
~node()
{
m_next = nullptr;
delete m_value;
}
valueT * m_value;
std::atomic<node *> m_next;
};
node * m_first;
mutable std::atomic<bool> m_consumer_lock;
node * m_last;
mutable std::atomic<bool> m_producer_lock;
};
} // namespace liberate::concurrency
#endif // guard