I have an application that will provide notifications to clients. It has two configurations:
Default: A single web api instance that notifies the appropriate subscribers
Alternative: Under high load, there will be multiple web api instances. To coordinate messages between instances, I will leverage a redis backplane. When an api instance receives a message, it sends it to the redis backplane, which then pushes it back down to all api instances.
I have a config.useRedis option that should control whether Redis should be used. To get things working, I created some pretty monolithic code:
//Channels will reference the channel name - which is our device id - and an array
//of websockets that are interested in that device
let channels = {};
let redisSubscribers = [];
// Send a message to the appropriate websocket clients listening on the right channel
// As we check the channel, we'll do a little housekeeping as well
const broadcast = (data) => {
const message = JSON.parse(data);
const deviceId = message.deviceId;
if (Object.keys(channels).some(key => +key === deviceId)) {
//Perform a cleanup of any closed sockets
channels[deviceId] = channels[deviceId].filter(socket => socket.readyState === 1);
channels[deviceId].forEach(ws => ws.send(data));
//If the channel is empty, nuke the channel and close the redis subscription
//if appropriate
if (channels[deviceId].length === 0) {
delete channels[deviceId];
if (config.useRedis) {
const subscriber = redisSubscribers.find(sub => +sub.deviceId === deviceId);
if(subscriber) {
subscriber.quit();
redisSubscribers = redisSubscribers.filter(sub => sub !== subscriber);
}
}
}
}
}
...
wss.on('connection', (ws) => {
console.log('socket established...')
const querystring = url.parse(ws.upgradeReq.url, true).query;
const deviceId = querystring.deviceId;
if (!Object.keys(channels).includes(deviceId)) {
channels[deviceId] = [ws];
if(config.useRedis) {
//Init redis subscriber
...
}
}
else {
channels[deviceId].push(ws);
}
ws.on('message', (data) => {
if(config.useRedis) {
redisPublisher.publish(deviceId, data);
}
else {
const message = JSON.parse(data);
broadcast(message);
}
});
});
I'd like to compose the behaviour instead, which would provide more flexibility in future in case I want to move to something else besides Redis.
So, if Redis is enabled for the app, in the broadcast function, I want to enhance the function with additional logic. In the connection handler, I want to init a Redis subscriber to receive messages from the backplane. In the message handler, I want to replace the default publishing logic with the custom Redis logic.
Note: The following has not been tested, I just threw some code together to demonstrate the course. Consider it pseudo-code :)
websocketserver.js
const websocketServer = () => {
let channels = {};
const broadcast = (function(data) {
return function(enhancement) {
if (Object.keys(channels).some(key => +key === deviceId)) {
//Perform a cleanup of any closed sockets
channels[deviceId] = channels[deviceId].filter(socket => socket.readyState === 1);
channels[deviceId].forEach(ws => ws.send(data));
//If the channel is empty, nuke the channel
if (channels[deviceId].length === 0) {
delete channels[deviceId];
}
// If any "enhancement" should be applied, execute the passed in function
if(typeof enhancement === 'function') {
enhancement(deviceId)
}
}
}
})
const onConnection = () => {
return function(enhancedSubscribe, enhancedPublish) {
console.log('socket established...')
const querystring = url.parse(ws.upgradeReq.url, true).query;
const deviceId = querystring.deviceId;
if (!Object.keys(channels).includes(deviceId)) {
channels[deviceId] = [ws];
if(typeof === 'enhancedSubscribe') {
enhancedSubscribe(deviceId)
}
}
else {
channels[deviceId].push(ws);
}
ws.on('message', (enhancedPublish) => {
if(typeof enhancedPublish === 'function'){
enhancedPublish(data);
}
else {
const message = JSON.parse(data);
broadcast(message);
}
}
}
};
return {
broadcast,
onConnection,
onMessage
}
}
redisServer.js
const redisServer = () => {
let redisSubscribers = [];
const removeSubscriber = () => {
const subscriber = redisSubscribers.find(sub => +sub.deviceId === deviceId);
if(subscriber) {
subscriber.quit();
redisSubscribers = redisSubscribers.filter(sub => sub !== subscriber);
}
}
const enhancedSubscribe = (deviceId, broadcast) => {
//Init redis subscriber
...
}
const enhancedPublish = (deviceId, data) => {
redisPublisher.publish(deviceId, data);
}
return {
removeSubscriber,
enhancedSubscribe,
enhancedPublish
}
}
I attempt to compose the behaviour I want like this:
serverFactory.js
const server = () => {
const broadcast = (data) => {
let webSocketBroadcast = websocketServer.broadcast(data);
if (config.useRedis) {
return webSocketBroadcast();
}
else{
return webSocketBroadcast(redisServer.enchanceBroadcast);
}
}
const onConnection = (ws) => {
let websocketServerOnConnection = websocketServer.onConnection();
if (config.useRedis) {
retuen websocketServerOnConnection()
}
else
}
}
return {
broadcast,
onconnection,
onMessage
}
}
So, I am attempting to extend/enhance the websocket behaviour with partial application. Does this make sense? Anything further I can do to improve this or make it more flexible?