68 lines
1.7 KiB
JavaScript
68 lines
1.7 KiB
JavaScript
import _logger from "../logger/index.js";
|
|
|
|
//Creates a consumer that automatically removes itsefl upon timing out
|
|
function consumer(timeoutMs, cb, call, remove) {
|
|
const c = {
|
|
cb,
|
|
};
|
|
setTimeout(() => {
|
|
remove(c);
|
|
cb({
|
|
message: "Timedout",
|
|
status: 4, //Deadline exceeded
|
|
});
|
|
}, timeoutMs);
|
|
call.on("cancelled", () => remove(c));
|
|
return c;
|
|
}
|
|
|
|
async function next(topic) {
|
|
const consumer = topic.consumers.values().next().value;
|
|
const msg = topic.queue.shift();
|
|
if (consumer && msg) {
|
|
//Fire and forget. At-most-once semantics.
|
|
topic.consumers.delete(consumer);
|
|
consumer.cb(undefined, { message: msg });
|
|
} else {
|
|
if (msg) {
|
|
topic.queue.unshift(msg);
|
|
}
|
|
}
|
|
}
|
|
|
|
export default function create({ logger = _logger("impl") } = {}) {
|
|
const topics = {};
|
|
|
|
//If logging is above info, display the content of the queues every 5s for debuggin
|
|
if (logger.levels[logger.level] >= 4) {
|
|
setInterval(() => {
|
|
console.log(topics);
|
|
}, 5000);
|
|
}
|
|
|
|
function getTopic(topic) {
|
|
// Consumers are stored in a Map to allow O(1) deletion on timeout while
|
|
// allowing iteration in order of insertion
|
|
return (topics[topic] = topics[topic] || {
|
|
queue: [],
|
|
consumers: new Map(),
|
|
});
|
|
}
|
|
|
|
return {
|
|
publish: (call, cb) => {
|
|
const topic = getTopic(call.request.topic);
|
|
topic.queue.push(call.request.message);
|
|
cb(undefined, {});
|
|
next(topic);
|
|
},
|
|
consume: (call, cb) => {
|
|
const { topic: topicStr, timeoutMs } = call.request;
|
|
const topic = getTopic(topicStr);
|
|
const c = consumer(timeoutMs, cb, call, (c) => topic.consumers.delete(c));
|
|
topic.consumers.set(c, c);
|
|
next(topic);
|
|
},
|
|
};
|
|
}
|