Server pushes

Redis servers can send various push messages to the client. Boost.Redis supports these as first-class operations. This page describes how to receive and handle them.

The most common case is Pub/Sub messages triggered by PUBLISH. The following example shows a typical receiver:

auto receiver(std::shared_ptr<connection> conn) -> asio::awaitable<void>
{
   generic_flat_response resp;
   conn->set_receive_response(resp);

   // Subscribe to the channel 'mychannel'. You can add any number of channels here.
   request req;
   req.subscribe({"mychannel"});
   co_await conn->async_exec(req);

   // You're now subscribed to 'mychannel'. Pushes sent over this channel will be stored
   // in resp. If the connection encounters a network error and reconnects to the server,
   // it will automatically subscribe to 'mychannel' again. This is transparent to the user.
   // You need to use specialized request::subscribe() function (instead of request::push)
   // to enable this behavior.

   // Loop to read Redis push messages.
   while (conn->will_reconnect()) {
      // Wait for pushes
      auto [ec] = co_await conn->async_receive2(asio::as_tuple);

      // Check for errors and cancellations
      if (ec) {
         std::cerr << "Error during receive: " << ec << std::endl;
         break;
      }

      // This can happen if a SUBSCRIBE command errored (e.g. insufficient permissions)
      if (resp.has_error()) {
         std::cerr << "The receive response contains an error: " << resp.error().diagnostic
                   << std::endl;
         break;
      }

      // The response must be consumed without suspending the
      // coroutine i.e. without the use of async operations.
      for (push_view elem : push_parser(resp.value())) {
         std::cout << "Received message from channel " << elem.channel << ": " << elem.payload
                   << "\n";
      }

      resp.value().clear();
   }
}

Summary of the steps:

  • Call connection::set_receive_response before any other receive-related calls so that the connection stores incoming pushes in the given object. The library does not copy the response; you must keep it alive for the duration of the receive loop.

  • Build a request with request::subscribe (or psubscribe) and execute it. If the connection drops and is re-established, an equivalent SUBSCRIBE is sent automatically.

  • Loop while connection::will_reconnect is true (i.e. until the connection is cancelled).

  • Call connection::async_receive2 to wait until at least one push is available. This function also participates in push flow control (see Flow control).

  • After completion, resp holds the raw RESP3 nodes for the received pushes (resp3::node_view). For Pub/Sub messages produced by PUBLISH, use push_parser to iterate over them. Each parsed message exposes the channel, the payload, and, for pattern subscriptions (PSUBSCRIBE), the matched pattern.

  • Call resp.value().clear() to discard the current push data and make room for the next batch.

Consume all push data without suspending the current coroutine (or otherwise re-entering the event loop). See Suspending the receiver for how to handle cases where you must suspend.
You can use the same connection for both pushes and normal request/response traffic. Boost.Redis relies on RESP3 to multiplex them on a single connection.

Dynamic Pub/Sub tracking

When you execute a request built with request::subscribe (or psubscribe, unsubscribe, punsubscribe), the connection tracks it. After a reconnect, it re-issues the subscribe commands to restore any subscriptions that were active before the reconnect. You can dynamically subscribe and unsubscribe to channels; tracking is fully dynamic.

Flow control

If the server sends pushes faster than the client consumes them, the connection applies flow control. After a certain number of pushes have been read and not yet consumed, it stops reading until the application drains the receive buffer. This creates backpressure at the TCP level.

The pending count is reset each time async_receive2 completes. You must still free memory by calling resp3::flat_tree::clear() (or equivalent) on the response, as in the example.

Do not call async_exec from within the receiver loop. The response to your request may be behind enough pushes might to trigger the flow control mechanism, causing a deadlock.

Subscribe confirmations

Every time you subscribe to a channel, Redis sends a push as a confirmation. These confirmations are stored in your receive response and cause async_receive2 to complete like any other push. push_parser skips these messages, since they don’t contain application-level information.

As a result, async_receive2 will complete while the push_parser range is empty when only confirmations are received. Your code should handle an empty range correctly.

To work with subscribe confirmations or other push shapes, use the raw nodes in generic_flat_response directly (see Advanced scenarios).

Suspending the receiver

You must not suspend the receiver coroutine before fully consuming the current push data. If you need to perform I/O with that data (e.g. send it over a WebSocket), first copy or serialize it without suspending, then clear the response, and only then perform the async operation.

For example, the following is incorrect:

auto [ec] = co_await conn->async_receive2(asio::as_tuple);
// DON'T DO THIS: more pushes may be read while the WebSocket send runs,
// and they are discarded when you clear(), causing a race.
co_await websocket.async_send(push_parser(resp.value()));
resp.value().clear();

Instead, copy or compose the message without I/O, then clear, then send:

auto [ec] = co_await conn->async_receive2(asio::as_tuple);

// Compose the WebSocket message without any I/O (no suspension).
// msg must not reference resp.
auto msg = compose_websocket_message(push_parser(resp.value()));

resp.value().clear();

// Now it's OK to suspend the coroutine
co_await websocket.async_write(msg);

SUBSCRIBE errors

SUBSCRIBE can fail (e.g. due to ACL rules). Because of how the protocol works, such errors may be delivered as push-like data. Using generic_flat_response (an alias for adapter::result<resp3::flat_tree>, similar to std::expected) lets you detect them: check resp.has_error() and handle the error before iterating. See the example at the top of this page.

Advanced scenarios

push_parser only recognizes standard Pub/Sub message and pmessage pushes. Other push types are skipped, including:

  • Subscribe/unsubscribe confirmations.

  • Client-side caching invalidation messages.

  • Output of the MONITOR command.

If you need to handle these, work directly with the nodes in generic_flat_response instead of push_parser. The overall receiver pattern (set response, subscribe, loop with async_receive2, consume, clear) stays the same.