Server pushes
Redis servers can send various push messages to the client. Boost.Redis supports these as first-class operations. This page describes how to receive and handle them.
The most common case is
Pub/Sub messages
triggered by PUBLISH. The following example shows a typical receiver:
auto receiver(std::shared_ptr<connection> conn) -> asio::awaitable<void>
{
generic_flat_response resp;
conn->set_receive_response(resp);
// Subscribe to the channel 'mychannel'. You can add any number of channels here.
request req;
req.subscribe({"mychannel"});
co_await conn->async_exec(req);
// You're now subscribed to 'mychannel'. Pushes sent over this channel will be stored
// in resp. If the connection encounters a network error and reconnects to the server,
// it will automatically subscribe to 'mychannel' again. This is transparent to the user.
// You need to use specialized request::subscribe() function (instead of request::push)
// to enable this behavior.
// Loop to read Redis push messages.
while (conn->will_reconnect()) {
// Wait for pushes
auto [ec] = co_await conn->async_receive2(asio::as_tuple);
// Check for errors and cancellations
if (ec) {
std::cerr << "Error during receive: " << ec << std::endl;
break;
}
// This can happen if a SUBSCRIBE command errored (e.g. insufficient permissions)
if (resp.has_error()) {
std::cerr << "The receive response contains an error: " << resp.error().diagnostic
<< std::endl;
break;
}
// The response must be consumed without suspending the
// coroutine i.e. without the use of async operations.
for (push_view elem : push_parser(resp.value())) {
std::cout << "Received message from channel " << elem.channel << ": " << elem.payload
<< "\n";
}
resp.value().clear();
}
}
Summary of the steps:
-
Call
connection::set_receive_responsebefore any other receive-related calls so that the connection stores incoming pushes in the given object. The library does not copy the response; you must keep it alive for the duration of the receive loop. -
Build a request with
request::subscribe(orpsubscribe) and execute it. If the connection drops and is re-established, an equivalentSUBSCRIBEis sent automatically. -
Loop while
connection::will_reconnectis true (i.e. until the connection is cancelled). -
Call
connection::async_receive2to wait until at least one push is available. This function also participates in push flow control (see Flow control). -
After completion,
respholds the raw RESP3 nodes for the received pushes (resp3::node_view). For Pub/Sub messages produced byPUBLISH, usepush_parserto iterate over them. Each parsed message exposes the channel, the payload, and, for pattern subscriptions (PSUBSCRIBE), the matched pattern. -
Call
resp.value().clear()to discard the current push data and make room for the next batch.
| Consume all push data without suspending the current coroutine (or otherwise re-entering the event loop). See Suspending the receiver for how to handle cases where you must suspend. |
| You can use the same connection for both pushes and normal request/response traffic. Boost.Redis relies on RESP3 to multiplex them on a single connection. |
Dynamic Pub/Sub tracking
When you execute a request built with request::subscribe
(or psubscribe, unsubscribe, punsubscribe), the connection tracks it. After a reconnect, it
re-issues the subscribe commands to restore any subscriptions that were active before the reconnect.
You can dynamically subscribe and unsubscribe to channels; tracking is fully dynamic.
Flow control
If the server sends pushes faster than the client consumes them, the connection applies flow control. After a certain number of pushes have been read and not yet consumed, it stops reading until the application drains the receive buffer. This creates backpressure at the TCP level.
The pending count is reset each time async_receive2 completes. You must still free
memory by calling resp3::flat_tree::clear() (or equivalent) on the response, as in
the example.
Do not call async_exec from within the receiver loop. The response to your request may be behind enough pushes might to trigger the flow control mechanism, causing a deadlock.
|
Subscribe confirmations
Every time you subscribe to a channel, Redis sends a push as a confirmation. These confirmations
are stored in your receive response and cause async_receive2 to complete like any
other push. push_parser skips these messages, since they don’t contain application-level information.
As a result, async_receive2 will complete while the push_parser range is empty
when only confirmations are received. Your code should handle an empty range correctly.
To work with subscribe confirmations or other push shapes, use the raw nodes in
generic_flat_response
directly (see Advanced scenarios).
Suspending the receiver
You must not suspend the receiver coroutine before fully consuming the current push data. If you need to perform I/O with that data (e.g. send it over a WebSocket), first copy or serialize it without suspending, then clear the response, and only then perform the async operation.
For example, the following is incorrect:
auto [ec] = co_await conn->async_receive2(asio::as_tuple);
// DON'T DO THIS: more pushes may be read while the WebSocket send runs,
// and they are discarded when you clear(), causing a race.
co_await websocket.async_send(push_parser(resp.value()));
resp.value().clear();
Instead, copy or compose the message without I/O, then clear, then send:
auto [ec] = co_await conn->async_receive2(asio::as_tuple);
// Compose the WebSocket message without any I/O (no suspension).
// msg must not reference resp.
auto msg = compose_websocket_message(push_parser(resp.value()));
resp.value().clear();
// Now it's OK to suspend the coroutine
co_await websocket.async_write(msg);
SUBSCRIBE errors
SUBSCRIBE can fail (e.g. due to ACL rules). Because of how the protocol works, such
errors may be delivered as push-like data. Using
generic_flat_response
(an alias for adapter::result<resp3::flat_tree>, similar to std::expected) lets you detect them: check
resp.has_error() and handle the error before iterating. See the example at the top
of this page.
Advanced scenarios
push_parser only recognizes standard
Pub/Sub message and pmessage pushes. Other push types are skipped, including:
-
Subscribe/unsubscribe confirmations.
-
Client-side caching invalidation messages.
-
Output of the
MONITORcommand.
If you need to handle these, work directly with the nodes in generic_flat_response
instead of push_parser. The overall receiver pattern (set response, subscribe,
loop with async_receive2, consume, clear) stays the same.