import _logger from "../logger/index.js"; //Creates a consumer that automatically removes itsefl upon timing out function consumer(timeoutMs, cb, call, remove) { const c = { cb, }; setTimeout(() => { remove(c); cb({ message: "Timedout", status: 4, //Deadline exceeded }); }, timeoutMs); call.on("cancelled", () => remove(c)); return c; } async function next(topic) { const consumer = topic.consumers.values().next().value; const msg = topic.queue.shift(); if (consumer && msg) { //Fire and forget. At-most-once semantics. topic.consumers.delete(consumer); consumer.cb(undefined, { message: msg }); } else { if (msg) { topic.queue.unshift(msg); } } } export default function create({ logger = _logger("impl") } = {}) { const topics = {}; //If logging is above info, display the content of the queues every 5s for debuggin if (logger.levels[logger.level] >= 4) { setInterval(() => { console.log(topics); }, 5000); } function getTopic(topic) { // Consumers are stored in a Map to allow O(1) deletion on timeout while // allowing iteration in order of insertion return (topics[topic] = topics[topic] || { queue: [], consumers: new Map(), }); } return { publish: (call, cb) => { const topic = getTopic(call.request.topic); topic.queue.push(call.request.message); cb(undefined, {}); next(topic); }, consume: (call, cb) => { const { topic: topicStr, timeoutMs } = call.request; const topic = getTopic(topicStr); const c = consumer(timeoutMs, cb, call, (c) => topic.consumers.delete(c)); topic.consumers.set(c, c); next(topic); }, }; }