Customizing Redis pubsub for message persistence

Redis Logo

Redis comes packed with a simple yet powerful PubSub API.  It provides low latency and scales well.  A message published on a channel is received by subscriber(s) at the other end.  However, if no active subscriber is found the message is simply lost.  This drawback puts Redis out of the probables list for several use cases where message persistence of unprocessed published messages is desired.  It’s also probably a reason why several open source projects that support Redis as a broker are based upon it’s list push / pop API.  In this post I will demonstrate how to modify Redis PubSub API to support message persistence, opening possibilities for several interesting use cases.

Last Published Message

Ability to fetch the last published message on a particular channel without subscribing to the channel opens doors for several interesting use cases.  src/pubsub.c:publishCommand is where Redis handles publish command.  Let’s add a line of code to persist the most recently published message on a channel:

void publishCommand(redisClient *c) {
    ....

    /* Persist last published message in channel specific key */
    setKey(c->db, c->argv[1], c->argv[2]);

    addReplyLongLong(....);
}

Above, we added a call to src/db.c:setKey function that sets the value of key c->argv[1] (channel name) to c->argv[2] (published message).

Run make from the project root directory and start ./src/redis-server. Now we can do something like:

127.0.0.1:6379> publish channel1 c1m1
(integer) 0
127.0.0.1:6379> get channel1
"c1m1"
127.0.0.1:6379> publish channel1 c1m2
(integer) 0
127.0.0.1:6379> get channel1
"c1m2"

Voila. We published a message with no subscriber. However, an incoming user can still be served with the last published message on the channel by fetching the value of key channel1 without explicitly subscribing to the channel.

Let’s take this idea one step ahead. XMPP Publish-Subscribe (XEP-0060) defines a specification for receiving the last published item. It says,

When a subscription request is successfully processed, the service MAY send the last published item to the new subscriber.

Let’s add this idea to Redis PubSub mechanism. src/pubsub.c:subscribeCommand function is where Redis processes channel subscription requests. Add the following lines of code at the end of this function.

void subscribeCommand(redisClient *c) {
    ....

    /* Send last received message on the subscribed channel(s) */
    robj *o;
    for (j = 1; j < c->argc; j++) {
    	o = lookupKeyRead(c->db, c->argv[j]);
    	if(o != NULL) {
		addReply(c,shared.mbulkhdr[3]);
		addReply(c,shared.messagebulk);
		addReplyBulk(c,c->argv[j]);
		addReplyBulk(c,o);
    	}
    }
}

Here, post subscription, we fetch and send the last published message for all channels that the client just subscribed to. make and restart ./src/redis-server. Now on a new ./src/redis-cli terminal subscribe to channel1:

127.0.0.1:6379> subscribe channel1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel1"
3) (integer) 1
1) "message"
2) "channel1"
3) "c1m2"

Voila! Now Redis server will send the last published message upon subscription. But what about PSUBSCRIBE use case?

src/pubsub.c:psubscribeCommand handles pattern based channel subscription logic. Add following lines of code at the end of this function:

void psubscribeCommand(redisClient *c) {
    ....

    /* Send last received message on the channel(s) matching subscribed patterns */
    for (j = 1; j < c->argc; j++) {
    	robj *pat = c->argv[j];
    	dictIterator *di = dictGetIterator(server.pubsub_channels);
    	dictEntry *de;
    	while((de = dictNext(di)) != NULL) {
		robj *cobj = dictGetKey(de);
		sds channel = cobj->ptr;
		if (stringmatchlen((char*)pat->ptr,
				sdslen(pat->ptr),
				(char*)channel,
				sdslen(channel), 0)) {
			robj *o = lookupKeyRead(c->db, cobj);
			if(o != NULL) {
	                addReply(c,shared.mbulkhdr[4]);
	                addReply(c,shared.pmessagebulk);
	                addReplyBulk(c,pat);
	                addReplyBulk(c,cobj);
	                addReplyBulk(c,o);
			}
		}
	}
	dictReleaseIterator(di);
    }
}

Above, for every subscribed pattern, we iterate over active server.pubsub_channels and check if the active channel matches the subscription pattern. On match we fetch and send the last published message on the channel to the client.

With previous redis-cli subscribe terminal running, open a new terminal and try:

127.0.0.1:6379> psubscribe channel*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "channel*"
3) (integer) 1
1) "pmessage"
2) "channel*"
3) "channel1"
4) "c1m2"

Next

You can checkout my Redis fork and commits under pubsub-persistence branch. Enhancements described above can also be found on this github commit.

Currently it is unclear what Antirez (Sanfilippo Salvatore) plans to do further with PubSub in Redis. It stands on a solid base and recent efforts are rightly put behind Redis cluster. However, I see some interesting enhancements that can be made to Redis PubSub mainline. In the next post I will take the current idea one step ahead and add persistence support for all or only unprocessed published messages in a Redis list (possibly with a cap or expiration on persisted messages).

  • Pingback: Customizing Redis pubsub for message persistence – Part 2 | Code with Music()

  • ankit sheth

    Hi Abhinav,

    Thanks for so much techie and informative blogs.

    Please help me for below scenario, i am very confused :
    Need help for ejabberd with Pubsub or any pubsub , what to use in below scenario:
    — PHP get messages from multiple channels through apis
    — PHP gives messages to Pubsub (ejabberd pubsub) using JAXL
    — Now, if any subscriber offline then how to send the messages (during those time) to only offline subscribers for that same node/topic , when they become online ?
    (Here, PHP only subscribe/ubsubscribe the subscribers from node, so i have that list also i can manage the subscribers- ios/android apps – to give notification when they are ofline/online.)
    So, here my problem is only if there are 10 subscriber to one node say “Testing” and 9 are online, 1 is ofline during specific time (that also i can come to know), now when that one subscriber become online, i want to use same node “Testing” but sent those time messages to only that 1 node.

    Kindly suggest i can perform that with ejabberd(XMPP) pubsub.
    OR
    Please suggest me if i can use redis pubsub with php in above case.

    Suggest me, whatever best solution from your side.

    • Hi Ankit,

      Sorry. I never received notification for this comment and I logged in today into my blog admin after a month to notice your comment.

      I am no longer using XMPP, but I clearly remember you can enable history in XMPP PubSub plugin and from there on your subscribers can pick up from where they left (timestamp wise).