How-To dispatch commands to threads (CQRS)
When you wish to distribute work to worker threads, often enough it is not enough to distribute data alone. Sometimes it’s important that you pass to the thread what it is supposed to do. And suddenly you find yourself in the middle of the Command Query Responsibility Segregation pattern, or CQRS for short.
In this pattern, the user interfaces sends queries via some queue to a backend, which processes them. Once it is finished, it sends commands via another queue to the user interface, to update what is displayed.
And that’s great, except the name is confusing. Surely the user interface
sends commands to the backend, and retrieves results? That’s the naming we’re
going with here in the liberate::concurrency::command_queue
class, at any rate. But the pattern is more or less the same.
What we’re assuming in fact, is that a command makes a round trip through two queues.
In one thread, it encloses some arguments, and gets pushed into a command queue.
In another thread, it gets retrieved and processed. It then receives a number of results as, well, the results of the processing, and gets pushed into a results queue.
The results then get evaluated in the original thread (or another thread, but let’s not break the pattern here).
As a result of these assumptions, we can think of as a command’s context as having:
A command type, which we define to be of type
int
– some numeric value that you can fill from an enum.An optional
parameters
type for the arguments. This is any container you want, and could e.g. be astd::tuple
or your own data structure.An optional
results
type, for the results. Again, the type is whatever you’d like.
Let’s start by defining this command context.
1#include <liberate/concurrency/command.h>
2
3using namespace liberate::concurrency;
4
5// Say we define a tuple for the arguments.
6using args = std::tuple<int, std::string>;
7// And as a result, we need a single string
8using result = std::string
9// Then we define the command context like this:
10using context = command_context<args, result>;
Now let’s create a number of commands. Again, the command type is an int
,
making it easy to define an enum
of sorts.
1enum command_type
2{
3 FIND_LONGEST_STRING = 0,
4 // create more
5};
This is when we can instanciate a command. Note that the command is a
std::unique_ptr
to a context. All the arguments except for the first that
you pass to liberate::concurrency::create_context()
are forwarded
to the args
type’s constructor above, which works for std::tuple
.
1auto cmd = create_context<context>(FIND_LONGEST_STRING, 42,
2 "this is my input");
We can now create a command queue, and pass the command into it. The command
queue is parametrized by an underlying container type – it works with e.g.
std::deque
, but also with liberate::concurrency::concurrent_queue
.
1command_queue_base<std::deque> queue1;
2command_queue_base<concurrent_queue> queue2;
3
4// The queue takes ownership of commands you enqueue
5queue2.enqueue_command(std::move(cmd));
6
7// And relinquishes ownership again when dequeuing.
8auto dequeued = queue2.dequeue_command();
9
10// After processing, you can push it into the results queue.
11queue2.put_results(std::move(dequeued));
12
13// And retrieve the results again.
14auto results = queue2.get_completed();
That’s the basic queue operations in a nutshell. What about the results type we provided to the context? That can be added rather easily.
1dequeued->as<context>->results = std::unique_ptr<std::string>(
2 new std::string{"input"}
3);
Note that the cast via the as<T>()
function is necessary, because the queue
erases specific command types so that multiple differently parametrized command
contexts can be used for the same queue.
Above, we used the command_queue_base
class, which is relatively simple.
The more elaborate liberate::concurrency::parametrized_command_queue
is a little fancier, and it’s concurrent instanciation is called
concurrent_command_queue
.
1concurrent_command_queue queue{[](concurrent_command_queue & q)
2 {
3 assert(!q.empty());
4 // We shouldn't do work here, but instead send a notify_one() to some
5 // condition on which worker threads wait.
6 }
7};
8
9queue.enqueue_command(std::move(cmd));
Enqueuing a command invokes the lambda. In fact, so does putting a result back into the queue. If you want to be separately informed of whether there is work or there are results, you can pass two lambdas instead.
This queue is also a little more convenient when it comes to enqueueing commands.
1queue.enqueue_command<context>(FIND_LONGEST_STRING, 42,
2 "this is my input");
Thanks to perfect forwarding, this isn’t more costly than the previous method. And much the same can be done for the results, which erases the need for awkward casting.
1queue.put_results<context>(dequeued, "input");
And there you have your CQRS pattern.