Customizing Redis pubsub for message persistence

Redis Logo

Redis comes packed with a simple yet powerful PubSub API.  It provides low latency and scales well.  A message published on a channel is received by subscriber(s) at the other end.  However, if no active subscriber is found the message is simply lost.  This drawback puts Redis out of the probables list for several use cases where message persistence of unprocessed published messages is desired.  It’s also probably a reason why several open source projects that support Redis as a broker are based upon it’s list push / pop API.  In this post I will demonstrate how to modify Redis PubSub API to support message persistence, opening possibilities for several interesting use cases.

Last Published Message

Ability to fetch the last published message on a particular channel without subscribing to the channel opens doors for several interesting use cases.  src/pubsub.c:publishCommand is where Redis handles publish command.  Let’s add a line of code to persist the most recently published message on a channel:

void publishCommand(redisClient *c) {
    ....

    /* Persist last published message in channel specific key */
    setKey(c->db, c->argv[1], c->argv[2]);

    addReplyLongLong(....);
}

Above, we added a call to src/db.c:setKey function that sets the value of key c->argv[1] (channel name) to c->argv[2] (published message).

Run make from the project root directory and start ./src/redis-server. Now we can do something like:

127.0.0.1:6379> publish channel1 c1m1
(integer) 0
127.0.0.1:6379> get channel1
"c1m1"
127.0.0.1:6379> publish channel1 c1m2
(integer) 0
127.0.0.1:6379> get channel1
"c1m2"

Voila. We published a message with no subscriber. However, an incoming user can still be served with the last published message on the channel by fetching the value of key channel1 without explicitly subscribing to the channel.

Let’s take this idea one step ahead. XMPP Publish-Subscribe (XEP-0060) defines a specification for receiving the last published item. It says,

When a subscription request is successfully processed, the service MAY send the last published item to the new subscriber.

Let’s add this idea to Redis PubSub mechanism. src/pubsub.c:subscribeCommand function is where Redis processes channel subscription requests. Add the following lines of code at the end of this function.

void subscribeCommand(redisClient *c) {
    ....

    /* Send last received message on the subscribed channel(s) */
    robj *o;
    for (j = 1; j < c->argc; j++) {
    	o = lookupKeyRead(c->db, c->argv[j]);
    	if(o != NULL) {
		addReply(c,shared.mbulkhdr[3]);
		addReply(c,shared.messagebulk);
		addReplyBulk(c,c->argv[j]);
		addReplyBulk(c,o);
    	}
    }
}

Here, post subscription, we fetch and send the last published message for all channels that the client just subscribed to. make and restart ./src/redis-server. Now on a new ./src/redis-cli terminal subscribe to channel1:

127.0.0.1:6379> subscribe channel1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel1"
3) (integer) 1
1) "message"
2) "channel1"
3) "c1m2"

Voila! Now Redis server will send the last published message upon subscription. But what about PSUBSCRIBE use case?

src/pubsub.c:psubscribeCommand handles pattern based channel subscription logic. Add following lines of code at the end of this function:

void psubscribeCommand(redisClient *c) {
    ....

    /* Send last received message on the channel(s) matching subscribed patterns */
    for (j = 1; j < c->argc; j++) {
    	robj *pat = c->argv[j];
    	dictIterator *di = dictGetIterator(server.pubsub_channels);
    	dictEntry *de;
    	while((de = dictNext(di)) != NULL) {
		robj *cobj = dictGetKey(de);
		sds channel = cobj->ptr;
		if (stringmatchlen((char*)pat->ptr,
				sdslen(pat->ptr),
				(char*)channel,
				sdslen(channel), 0)) {
			robj *o = lookupKeyRead(c->db, cobj);
			if(o != NULL) {
	                addReply(c,shared.mbulkhdr[4]);
	                addReply(c,shared.pmessagebulk);
	                addReplyBulk(c,pat);
	                addReplyBulk(c,cobj);
	                addReplyBulk(c,o);
			}
		}
	}
	dictReleaseIterator(di);
    }
}

Above, for every subscribed pattern, we iterate over active server.pubsub_channels and check if the active channel matches the subscription pattern. On match we fetch and send the last published message on the channel to the client.

With previous redis-cli subscribe terminal running, open a new terminal and try:

127.0.0.1:6379> psubscribe channel*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "channel*"
3) (integer) 1
1) "pmessage"
2) "channel*"
3) "channel1"
4) "c1m2"

Next

You can checkout my Redis fork and commits under pubsub-persistence branch. Enhancements described above can also be found on this github commit.

Currently it is unclear what Antirez (Sanfilippo Salvatore) plans to do further with PubSub in Redis. It stands on a solid base and recent efforts are rightly put behind Redis cluster. However, I see some interesting enhancements that can be made to Redis PubSub mainline. In the next post I will take the current idea one step ahead and add persistence support for all or only unprocessed published messages in a Redis list (possibly with a cap or expiration on persisted messages).