Customizing Redis pubsub for message persistence – Part 2

Redis Logo

In the last post we saw how Redis can easily be modified to persist the last published message on PubSub channels. Without subscribing to the PubSub channel we were able to get the last published message from Redis db. In this post, I will take that idea one step ahead and add native capabilities within Redis to persist all the unprocessed messages published on PubSub channel in channel specific lists. We’ll also preserve our capability to send the last published message to clients upon subscription.

But why are we doing this?

Popular open source application that provide support for Redis are based out of it’s list class of API. For example, let’s look at Celery which is a distributed task queue written in Python. Start redis-cli MONITOR on a terminal and then start celery in another window as follows:

$ pip install celery
$ celery worker --queues=testing --broker=redis:// --without-gossip --without-mingle --without-heartbeat

You’ll find celery polling Redis periodically as indicated by log lines like "BRPOP" "testing" "testing\x06\x163" "testing\x06\x166" "testing\x06\x169" "1". Celery uses the Redis PubSub mechanism (that we disabled in the above command) only for internal features. Sentry, a popular exception logging and aggregation library, internally depends upon Celery. There is an open pull request that claims to add Redis pubsub based support to Celery. In the world of Ruby, background processing frameworks like Requeue and Sidekiq depend upon Redis list class of API’s.

However, with no native support for persistence of PubSub messages in Redis, it’s not difficult to understand why adopting to Redis PubSub can be tricky for some. Currently, Redis simply drops the message if no subscribers are found. Hence, question really is whether your application is tolerant to loss of published messages (for example, dropped messages while you were upgrading your application) ?

To solve persistence problem with Redis pubsub, the usual approach is to start multiple application instances. Some instances can continue to serve while others get deployed. However still, your active instances might be experiencing a network partition and unable to receive published messages. After all, primary goal is to guarantee processing of every message received by Redis irrespective of whether we are using list or pubsub based backend. A native support to solve persistence problems with Redis PubSub is clearly desirable.

Persisting dropped Redis PubSub messages in a list

In the last post we added a single line of code to persist the last published message on channels in a separate Redis key. We’ll update implementation to push every received message at the end of a channel specific list. Replace `setKey(c->db, c->argv[1], c->argv[2]);` line that we added the last time with following code:

// Persist messages in list only if no receivers were found
if (receivers == 0) {
    int j, pushed = 0, where = REDIS_TAIL;

    // Fetch list key from the database
    robj *lobj = lookupKeyWrite(c->db,c->argv[1]);

    // For every published message on the channel
    for (j = 2; j < c->argc; j++) {
        c->argv[j] = tryObjectEncoding(c->argv[j]);

        // Ensure we have our quicklist initialized
        if (!lobj) {
            lobj = createQuicklistObject();
            quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,

        // Push message at the tail of the list

    // Signal key watchers and internal event subscribers
    if (pushed) {
        char *event = (where == REDIS_HEAD) ? "lpush" : "rpush";
    server.dirty += pushed;

I have added some comments in the code for clarity. This code is merely is a rip off of src/t_list.c:pushGenericCommand function. We don’t want to send replies to the client that are usually sent after an RPUSH command. Frankly, we can further refactor pushGenericCommand function to turn the above code into a single function call.

make test and start ./src/redis-server. Using ./src/redis-cli try:

$ ./src/redis-cli> publish persistent-channel this
(integer) 0> publish persistent-channel is
(integer) 0> publish persistent-channel gonna
(integer) 0> publish persistent-channel be
(integer) 0> publish persistent-channel awesome
(integer) 0> lrange persistent-channel 0 -1
1) "this"
2) "is"
3) "gonna"
4) "be"
5) "awesome">

Voila! Since we published a few messages with no active subscriber, they all got persisted in a list named after the channel itself. Now incoming subscribers can process pending messages by fetching them from the list which otherwise would have been dropped.

Delivering unprocessed messages to subscribers upon subscription

Instead of depending upon clients to poll for channel list length, server can deliver unprocessed messages to subscribers upon successful subscription. Since this can get overwhelming for subscribers if there are several pending messages waiting in the list, we may not want to do this at all and leave it up to the clients. Let’s preserve our feature from the last post i.e. to deliver the last published message to clients upon subscription.

Here we don’t want to remove the last published message from our persistent list. We simply wish to send it to the incoming subscribers. For example, imagine cases like user status, location and mood being published on separated channels. Here is a method that will give back the last element from the list without removing it (equivalent to LRANGE key -1 -1):

robj *llast(redisClient *c, robj *key) {
    listTypeEntry entry;
    robj *o, *value = NULL;
    long llen;

    // Fetch list object from db
    o = lookupKeyRead(c->db, key);

    // Ensure key contains a list
    if(o != NULL && o->encoding == REDIS_ENCODING_QUICKLIST) {
        // Get list iterator for "lrange key -1 -1" use case
        llen = listTypeLength(o);
        listTypeIterator *iter = listTypeInitIterator(o, llen-1, REDIS_TAIL);

        // Fetch last item, prepare value
        listTypeNext(iter, &entry);
        quicklistEntry *qe = &entry.entry;
        if (qe->value) {
            value = createStringObject((char *)qe->value,
        } else {
            value = createStringObjectFromLongLong(qe->longval);
    return value;

We now need to replace our modifications within subscription handling methods from the last post. Note, callee must call decrRefCount on the returned robj to free up created string object after delivery. Kindly check this github commit for all the changes since the last post. You can also checkout my Redis fork and directly play with the modified code.


We saw how easy it is to modify Redis for fun and profit. By adding native persistence capabilities, we offload our task of ensuring processing of every message received by Redis cluster. After all, none of the magical client side logics will ever be as reliable as native support. By the way, Redis 3.0.0 was released this week with native support for clustering, checkout while it’s hot.

Leave your comments and feedback.

Customizing Redis pubsub for message persistence

Redis Logo

Redis comes packed with a simple yet powerful PubSub API.  It provides low latency and scales well.  A message published on a channel is received by subscriber(s) at the other end.  However, if no active subscriber is found the message is simply lost.  This drawback puts Redis out of the probables list for several use cases where message persistence of unprocessed published messages is desired.  It’s also probably a reason why several open source projects that support Redis as a broker are based upon it’s list push / pop API.  In this post I will demonstrate how to modify Redis PubSub API to support message persistence, opening possibilities for several interesting use cases.

Last Published Message

Ability to fetch the last published message on a particular channel without subscribing to the channel opens doors for several interesting use cases.  src/pubsub.c:publishCommand is where Redis handles publish command.  Let’s add a line of code to persist the most recently published message on a channel:

void publishCommand(redisClient *c) {

    /* Persist last published message in channel specific key */
    setKey(c->db, c->argv[1], c->argv[2]);


Above, we added a call to src/db.c:setKey function that sets the value of key c->argv[1] (channel name) to c->argv[2] (published message).

Run make from the project root directory and start ./src/redis-server. Now we can do something like:> publish channel1 c1m1
(integer) 0> get channel1
"c1m1"> publish channel1 c1m2
(integer) 0> get channel1

Voila. We published a message with no subscriber. However, an incoming user can still be served with the last published message on the channel by fetching the value of key channel1 without explicitly subscribing to the channel.

Let’s take this idea one step ahead. XMPP Publish-Subscribe (XEP-0060) defines a specification for receiving the last published item. It says,

When a subscription request is successfully processed, the service MAY send the last published item to the new subscriber.

Let’s add this idea to Redis PubSub mechanism. src/pubsub.c:subscribeCommand function is where Redis processes channel subscription requests. Add the following lines of code at the end of this function.

void subscribeCommand(redisClient *c) {

    /* Send last received message on the subscribed channel(s) */
    robj *o;
    for (j = 1; j < c->argc; j++) {
    	o = lookupKeyRead(c->db, c->argv[j]);
    	if(o != NULL) {

Here, post subscription, we fetch and send the last published message for all channels that the client just subscribed to. make and restart ./src/redis-server. Now on a new ./src/redis-cli terminal subscribe to channel1:> subscribe channel1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel1"
3) (integer) 1
1) "message"
2) "channel1"
3) "c1m2"

Voila! Now Redis server will send the last published message upon subscription. But what about PSUBSCRIBE use case?

src/pubsub.c:psubscribeCommand handles pattern based channel subscription logic. Add following lines of code at the end of this function:

void psubscribeCommand(redisClient *c) {

    /* Send last received message on the channel(s) matching subscribed patterns */
    for (j = 1; j < c->argc; j++) {
    	robj *pat = c->argv[j];
    	dictIterator *di = dictGetIterator(server.pubsub_channels);
    	dictEntry *de;
    	while((de = dictNext(di)) != NULL) {
		robj *cobj = dictGetKey(de);
		sds channel = cobj->ptr;
		if (stringmatchlen((char*)pat->ptr,
				sdslen(channel), 0)) {
			robj *o = lookupKeyRead(c->db, cobj);
			if(o != NULL) {

Above, for every subscribed pattern, we iterate over active server.pubsub_channels and check if the active channel matches the subscription pattern. On match we fetch and send the last published message on the channel to the client.

With previous redis-cli subscribe terminal running, open a new terminal and try:> psubscribe channel*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "channel*"
3) (integer) 1
1) "pmessage"
2) "channel*"
3) "channel1"
4) "c1m2"


You can checkout my Redis fork and commits under pubsub-persistence branch. Enhancements described above can also be found on this github commit.

Currently it is unclear what Antirez (Sanfilippo Salvatore) plans to do further with PubSub in Redis. It stands on a solid base and recent efforts are rightly put behind Redis cluster. However, I see some interesting enhancements that can be made to Redis PubSub mainline. In the next post I will take the current idea one step ahead and add persistence support for all or only unprocessed published messages in a Redis list (possibly with a cap or expiration on persisted messages).