pubsub-server/server/pubsub.js

68 lines
1.7 KiB
JavaScript

import _logger from "../logger/index.js";
//Creates a consumer that automatically removes itsefl upon timing out
function consumer(timeoutMs, cb, call, remove) {
const c = {
cb,
};
setTimeout(() => {
remove(c);
cb({
message: "Timedout",
status: 4, //Deadline exceeded
});
}, timeoutMs);
call.on("cancelled", () => remove(c));
return c;
}
async function next(topic) {
const consumer = topic.consumers.values().next().value;
const msg = topic.queue.shift();
if (consumer && msg) {
//Fire and forget. At-most-once semantics.
topic.consumers.delete(consumer);
consumer.cb(undefined, { message: msg });
} else {
if (msg) {
topic.queue.unshift(msg);
}
}
}
export default function create({ logger = _logger("impl") } = {}) {
const topics = {};
//If logging is above info, display the content of the queues every 5s for debuggin
if (logger.levels[logger.level] >= 4) {
setInterval(() => {
console.log(topics);
}, 5000);
}
function getTopic(topic) {
// Consumers are stored in a Map to allow O(1) deletion on timeout while
// allowing iteration in order of insertion
return (topics[topic] = topics[topic] || {
queue: [],
consumers: new Map(),
});
}
return {
publish: (call, cb) => {
const topic = getTopic(call.request.topic);
topic.queue.push(call.request.message);
cb(undefined, {});
next(topic);
},
consume: (call, cb) => {
const { topic: topicStr, timeoutMs } = call.request;
const topic = getTopic(topicStr);
const c = consumer(timeoutMs, cb, call, (c) => topic.consumers.delete(c));
topic.consumers.set(c, c);
next(topic);
},
};
}