Customizing Redis pubsub for message persistence – Part 2

Redis Logo

In the last post we saw how Redis can easily be modified to persist the last published message on PubSub channels. Without subscribing to the PubSub channel we were able to get the last published message from Redis db. In this post, I will take that idea one step ahead and add native capabilities within Redis to persist all the unprocessed messages published on PubSub channel in channel specific lists. We’ll also preserve our capability to send the last published message to clients upon subscription.

But why are we doing this?

Popular open source application that provide support for Redis are based out of it’s list class of API. For example, let’s look at Celery which is a distributed task queue written in Python. Start redis-cli MONITOR on a terminal and then start celery in another window as follows:

$ pip install celery
$ celery worker --queues=testing --broker=redis:// --without-gossip --without-mingle --without-heartbeat

You’ll find celery polling Redis periodically as indicated by log lines like "BRPOP" "testing" "testing\x06\x163" "testing\x06\x166" "testing\x06\x169" "1". Celery uses the Redis PubSub mechanism (that we disabled in the above command) only for internal features. Sentry, a popular exception logging and aggregation library, internally depends upon Celery. There is an open pull request that claims to add Redis pubsub based support to Celery. In the world of Ruby, background processing frameworks like Requeue and Sidekiq depend upon Redis list class of API’s.

However, with no native support for persistence of PubSub messages in Redis, it’s not difficult to understand why adopting to Redis PubSub can be tricky for some. Currently, Redis simply drops the message if no subscribers are found. Hence, question really is whether your application is tolerant to loss of published messages (for example, dropped messages while you were upgrading your application) ?

To solve persistence problem with Redis pubsub, the usual approach is to start multiple application instances. Some instances can continue to serve while others get deployed. However still, your active instances might be experiencing a network partition and unable to receive published messages. After all, primary goal is to guarantee processing of every message received by Redis irrespective of whether we are using list or pubsub based backend. A native support to solve persistence problems with Redis PubSub is clearly desirable.

Persisting dropped Redis PubSub messages in a list

In the last post we added a single line of code to persist the last published message on channels in a separate Redis key. We’ll update implementation to push every received message at the end of a channel specific list. Replace `setKey(c->db, c->argv[1], c->argv[2]);` line that we added the last time with following code:

// Persist messages in list only if no receivers were found
if (receivers == 0) {
    int j, pushed = 0, where = REDIS_TAIL;

    // Fetch list key from the database
    robj *lobj = lookupKeyWrite(c->db,c->argv[1]);

    // For every published message on the channel
    for (j = 2; j < c->argc; j++) {
        c->argv[j] = tryObjectEncoding(c->argv[j]);

        // Ensure we have our quicklist initialized
        if (!lobj) {
            lobj = createQuicklistObject();
            quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
                                server.list_compress_depth);
            dbAdd(c->db,c->argv[1],lobj);
        }

        // Push message at the tail of the list
        listTypePush(lobj,c->argv[j],where);
        pushed++;
    }

    // Signal key watchers and internal event subscribers
    if (pushed) {
        char *event = (where == REDIS_HEAD) ? "lpush" : "rpush";
        signalModifiedKey(c->db,c->argv[1]);
        notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,c->argv[1],c->db->id);
    }
    server.dirty += pushed;
}

I have added some comments in the code for clarity. This code is merely is a rip off of src/t_list.c:pushGenericCommand function. We don’t want to send replies to the client that are usually sent after an RPUSH command. Frankly, we can further refactor pushGenericCommand function to turn the above code into a single function call.

make test and start ./src/redis-server. Using ./src/redis-cli try:

$ ./src/redis-cli 
127.0.0.1:6379> publish persistent-channel this
(integer) 0
127.0.0.1:6379> publish persistent-channel is
(integer) 0
127.0.0.1:6379> publish persistent-channel gonna
(integer) 0
127.0.0.1:6379> publish persistent-channel be
(integer) 0
127.0.0.1:6379> publish persistent-channel awesome
(integer) 0
127.0.0.1:6379> lrange persistent-channel 0 -1
1) "this"
2) "is"
3) "gonna"
4) "be"
5) "awesome"
127.0.0.1:6379>

Voila! Since we published a few messages with no active subscriber, they all got persisted in a list named after the channel itself. Now incoming subscribers can process pending messages by fetching them from the list which otherwise would have been dropped.

Delivering unprocessed messages to subscribers upon subscription

Instead of depending upon clients to poll for channel list length, server can deliver unprocessed messages to subscribers upon successful subscription. Since this can get overwhelming for subscribers if there are several pending messages waiting in the list, we may not want to do this at all and leave it up to the clients. Let’s preserve our feature from the last post i.e. to deliver the last published message to clients upon subscription.

Here we don’t want to remove the last published message from our persistent list. We simply wish to send it to the incoming subscribers. For example, imagine cases like user status, location and mood being published on separated channels. Here is a method that will give back the last element from the list without removing it (equivalent to LRANGE key -1 -1):

robj *llast(redisClient *c, robj *key) {
    listTypeEntry entry;
    robj *o, *value = NULL;
    long llen;

    // Fetch list object from db
    o = lookupKeyRead(c->db, key);

    // Ensure key contains a list
    if(o != NULL && o->encoding == REDIS_ENCODING_QUICKLIST) {
        // Get list iterator for "lrange key -1 -1" use case
        llen = listTypeLength(o);
        listTypeIterator *iter = listTypeInitIterator(o, llen-1, REDIS_TAIL);

        // Fetch last item, prepare value
        listTypeNext(iter, &entry);
        quicklistEntry *qe = &entry.entry;
        if (qe->value) {
            value = createStringObject((char *)qe->value,
                                       qe->sz);
        } else {
            value = createStringObjectFromLongLong(qe->longval);
        }
        listTypeReleaseIterator(iter);
    }
    return value;
}

We now need to replace our modifications within subscription handling methods from the last post. Note, callee must call decrRefCount on the returned robj to free up created string object after delivery. Kindly check this github commit for all the changes since the last post. You can also checkout my Redis fork and directly play with the modified code.

Conclusion

We saw how easy it is to modify Redis for fun and profit. By adding native persistence capabilities, we offload our task of ensuring processing of every message received by Redis cluster. After all, none of the magical client side logics will ever be as reliable as native support. By the way, Redis 3.0.0 was released this week with native support for clustering, checkout while it’s hot.

Leave your comments and feedback.