========================================== How-To dispatch commands to threads (CQRS) ========================================== When you wish to distribute work to worker threads, often enough it is not enough to distribute data alone. Sometimes it's important that you pass to the thread what it is supposed to do. And suddenly you find yourself in the middle of the `Command Query Responsibility Segregation`_ pattern, or CQRS for short. .. _Command Query Responsibility Segregation: https://martinfowler.com/bliki/CQRS.html In this pattern, the user interfaces sends *queries* via some queue to a backend, which processes them. Once it is finished, it sends *commands* via another queue to the user interface, to update what is displayed. And that's great, except the name is confusing. Surely the user interface sends commands to the backend, and retrieves results? That's the naming we're going with here in the :cpp:class:`liberate::concurrency::command_queue` class, at any rate. But the pattern is more or less the same. What we're assuming in fact, is that a *command* makes a round trip through two queues. #. In one thread, it encloses some arguments, and gets pushed into a command queue. #. In another thread, it gets retrieved and processed. It then receives a number of results as, well, the results of the processing, and gets pushed into a results queue. #. The results then get evaluated in the original thread (or another thread, but let's not break the pattern here). As a result of these assumptions, we can think of as a command's context as having: #. A command type, which we define to be of type ``int`` -- some numeric value that you can fill from an enum. #. An optional ``parameters`` type for the arguments. This is any container you want, and could e.g. be a ``std::tuple`` or your own data structure. #. An optional ``results`` type, for the results. Again, the type is whatever you'd like. Let's start by defining this command context. .. sourcecode:: cpp :linenos: :dedent: :caption: Command context definition #include using namespace liberate::concurrency; // Say we define a tuple for the arguments. using args = std::tuple; // And as a result, we need a single string using result = std::string // Then we define the command context like this: using context = command_context; Now let's create a number of commands. Again, the command type is an ``int``, making it easy to define an ``enum`` of sorts. .. sourcecode:: cpp :linenos: :dedent: :caption: Command definition enum command_type { FIND_LONGEST_STRING = 0, // create more }; This is when we can instanciate a command. Note that the command is a ``std::unique_ptr`` to a context. All the arguments except for the first that you pass to :cpp:func:`liberate::concurrency::create_context` are forwarded to the ``args`` type's constructor above, which works for ``std::tuple``. .. sourcecode:: cpp :linenos: :dedent: :caption: Create context auto cmd = create_context(FIND_LONGEST_STRING, 42, "this is my input"); We can now create a command queue, and pass the command into it. The command queue is parametrized by an underlying container type -- it works with e.g. ``std::deque``, but also with :cpp:class:`liberate::concurrency::concurrent_queue`. .. sourcecode:: cpp :linenos: :dedent: :caption: Create queue command_queue_base queue1; command_queue_base queue2; // The queue takes ownership of commands you enqueue queue2.enqueue_command(std::move(cmd)); // And relinquishes ownership again when dequeuing. auto dequeued = queue2.dequeue_command(); // After processing, you can push it into the results queue. queue2.put_results(std::move(dequeued)); // And retrieve the results again. auto results = queue2.get_completed(); That's the basic queue operations in a nutshell. What about the results type we provided to the context? That can be added rather easily. .. sourcecode:: cpp :linenos: :dedent: :caption: Add processing results dequeued->as->results = std::unique_ptr( new std::string{"input"} ); Note that the cast via the ``as()`` function is necessary, because the queue erases specific command types so that multiple differently parametrized command contexts can be used for the same queue. Above, we used the ``command_queue_base`` class, which is relatively simple. The more elaborate :cpp:class:`liberate::concurrency::parametrized_command_queue` is a little fancier, and it's concurrent instanciation is called ``concurrent_command_queue``. .. sourcecode:: cpp :linenos: :dedent: :caption: Be notified of things to do concurrent_command_queue queue{[](concurrent_command_queue & q) { assert(!q.empty()); // We shouldn't do work here, but instead send a notify_one() to some // condition on which worker threads wait. } }; queue.enqueue_command(std::move(cmd)); Enqueuing a command invokes the lambda. In fact, so does putting a result back into the queue. If you want to be separately informed of whether there is work or there are results, you can pass two lambdas instead. This queue is also a little more convenient when it comes to enqueueing commands. .. sourcecode:: cpp :linenos: :dedent: :caption: Convenient enqueueing queue.enqueue_command(FIND_LONGEST_STRING, 42, "this is my input"); Thanks to perfect forwarding, this isn't more costly than the previous method. And much the same can be done for the results, which erases the need for awkward casting. .. sourcecode:: cpp :linenos: :dedent: :caption: Convenient results queue.put_results(dequeued, "input"); And there you have your CQRS pattern. .. seealso:: :doc:`concurrency-queue`