.. _program_listing_file_liberate_concurrency_concurrent_queue.h: Program Listing for File concurrent_queue.h =========================================== |exhale_lsh| :ref:`Return to documentation for file ` (``liberate/concurrency/concurrent_queue.h``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp /* * This file is part of liberate. * * Author(s): Jens Finkhaeuser * * Copyright (c) 2011 Jens Finkhaeuser. * Copyright (c) 2012-2014 Unwesen Ltd. * Copyright (c) 2015-2021 Jens Finkhaeuser. * Copyright (c) 2022 Interpeer gUG (haftungsbeschränkt) * * SPDX-License-Identifier: GPL-3.0-only * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #ifndef LIBERATE_CONCURRENCY_CONCURRENT_QUEUE_H #define LIBERATE_CONCURRENCY_CONCURRENT_QUEUE_H #ifndef __cplusplus #error You are trying to include a C++ only header file #endif #include #include #include namespace liberate::concurrency { template class concurrent_queue { public: /*************************************************************************** * STL-ish types **/ using size_type = size_t; using value_type = valueT; /*************************************************************************** * Implementation **/ inline concurrent_queue() { m_first = m_last = new node(nullptr); m_producer_lock = m_consumer_lock = false; } inline ~concurrent_queue() { while (m_consumer_lock.exchange(true)) {} while (m_producer_lock.exchange(true)) {} while (nullptr != m_first) { node * tmp = m_first; m_first = tmp->m_next; delete tmp; } } // cppcheck-suppress constParameter inline void push(valueT const & value) { node * tmp = new node(new valueT(value)); while (m_producer_lock.exchange(true)) {} m_last->m_next = tmp; m_last = tmp; m_producer_lock = false; } template inline void push_range(iterT const & begin, iterT const & end) { for (iterT iter = begin ; iter != end ; ++iter) { push(*iter); } } inline bool pop(valueT & result) { while (m_consumer_lock.exchange(true)) {} node * first = m_first; node * next = m_first->m_next; if (nullptr == next) { m_consumer_lock = false; return false; } valueT * val = next->m_value; next->m_value = nullptr; m_first = next; m_consumer_lock = false; result = *val; delete val; delete first; return true; } inline std::tuple pop() { valueT val{}; auto res = pop(val); return {res, val}; } inline bool empty() const { while (m_consumer_lock.exchange(true)) {} bool ret = (nullptr == m_first->m_next); m_consumer_lock = false; return ret; } inline size_type size() const { while (m_consumer_lock.exchange(true)) {} size_type count = 0; node * cur = m_first->m_next; for ( ; nullptr != cur ; cur = cur->m_next, ++count) {} m_consumer_lock = false; return count; } private: struct node { explicit node(valueT * value) : m_value(value) , m_next(nullptr) { } ~node() { m_next = nullptr; delete m_value; } valueT * m_value; std::atomic m_next; }; node * m_first; mutable std::atomic m_consumer_lock; node * m_last; mutable std::atomic m_producer_lock; }; } // namespace liberate::concurrency #endif // guard