pubsub-server/server/pubsub.test.js

166 lines
4.6 KiB
JavaScript
Raw Permalink Normal View History

2022-07-25 02:33:35 +00:00
import test from "ava";
import pubsub from "./pubsub.js";
const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
//Should mock those (TODO)
function publishCall(topic, message) {
return {
request: {
topic,
message,
},
};
}
function consumeCall(topic, timeoutMs = 10000) {
return {
request: {
topic,
timeoutMs,
},
on: () => {},
};
}
test.beforeEach((t) => {
t.context.pubsub = pubsub();
});
test("Published messages are available to be consumed", async (t) => {
const { pubsub } = t.context;
t.plan(2);
await new Promise((resolve) => {
pubsub.publish(publishCall("topic", "message"), (error) => {
t.is(error, undefined);
});
pubsub.consume(consumeCall("topic"), (error, { message }) => {
t.is(message, "message");
resolve();
});
});
});
test("Published messages automatically create topics", async (t) => {
const { pubsub } = t.context;
t.plan(4);
await new Promise((resolve) => {
pubsub.publish(publishCall("topic", "message"), (error) => {
t.is(error, undefined);
});
pubsub.publish(publishCall("topic2", "message2"), (error) => {
t.is(error, undefined);
});
pubsub.consume(consumeCall("topic"), (error, { message }) => {
t.is(message, "message");
});
pubsub.consume(consumeCall("topic2"), (error, { message }) => {
t.is(message, "message2");
resolve();
});
});
});
test("Consumer waits for a published message", async (t) => {
const { pubsub } = t.context;
t.plan(2);
await new Promise((resolve) => {
pubsub.consume(consumeCall("topic"), (error, { message }) => {
t.is(message, "message");
});
pubsub.publish(publishCall("topic", "message"), (error) => {
t.is(error, undefined);
resolve();
});
});
});
test("Consumer timeout if no message published", async (t) => {
const { pubsub } = t.context;
t.plan(2);
await new Promise((resolve) => {
pubsub.consume(consumeCall("topic", 0), (error, nothing) => {
t.is(nothing, undefined);
t.deepEqual(error, { message: "Timedout", status: 4 });
resolve();
});
});
});
test("Multiple consumers wait for multiple messages, FIFO", async (t) => {
const { pubsub } = t.context;
t.plan(6);
await new Promise((resolve) => {
pubsub.consume(consumeCall("topic"), (error, { message }) => {
t.is(message, "message1");
});
pubsub.consume(consumeCall("topic"), (error, { message }) => {
t.is(message, "message2");
});
pubsub.consume(consumeCall("topic"), (error, { message }) => {
t.is(message, "message3");
});
pubsub.publish(publishCall("topic", "message1"), (error) => {
t.is(error, undefined);
});
pubsub.publish(publishCall("topic", "message2"), (error) => {
t.is(error, undefined);
});
pubsub.publish(publishCall("topic", "message3"), (error) => {
t.is(error, undefined);
resolve();
});
});
});
test("Multiple consumers, only consumers that have not timed out get a message, FIFO", async (t) => {
const { pubsub } = t.context;
t.plan(8);
await new Promise(async (resolve) => {
pubsub.consume(consumeCall("topic"), (error, { message }) => {
t.is(message, "message1");
});
pubsub.consume(consumeCall("topic", 0), (error, nothing) => {
t.is(nothing, undefined);
t.deepEqual(error, { message: "Timedout", status: 4 });
});
pubsub.consume(consumeCall("topic"), (error, { message }) => {
t.is(message, "message2");
});
pubsub.consume(consumeCall("topic"), (error, { message }) => {
t.is(message, "message3");
});
//Allow consumer to timeout
await sleep(1);
pubsub.publish(publishCall("topic", "message1"), (error) => {
t.is(error, undefined);
});
pubsub.publish(publishCall("topic", "message2"), (error) => {
t.is(error, undefined);
});
pubsub.publish(publishCall("topic", "message3"), (error) => {
t.is(error, undefined);
resolve();
});
});
});
test("Multiple consumers trying to read a message, only the first will succeed", async (t) => {
const { pubsub } = t.context;
t.plan(4);
await new Promise(async (resolve) => {
pubsub.publish(publishCall("topic", "message1"), (error) => {
t.is(error, undefined);
});
pubsub.consume(consumeCall("topic"), (error, { message }) => {
t.is(message, "message1");
});
pubsub.consume(consumeCall("topic", 100), (error, nothing) => {
t.is(nothing, undefined);
t.deepEqual(error, { message: "Timedout", status: 4 });
resolve()
});
});
});