166 lines
4.6 KiB
JavaScript
166 lines
4.6 KiB
JavaScript
import test from "ava";
|
|
import pubsub from "./pubsub.js";
|
|
|
|
const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
|
|
|
|
//Should mock those (TODO)
|
|
function publishCall(topic, message) {
|
|
return {
|
|
request: {
|
|
topic,
|
|
message,
|
|
},
|
|
};
|
|
}
|
|
|
|
function consumeCall(topic, timeoutMs = 10000) {
|
|
return {
|
|
request: {
|
|
topic,
|
|
timeoutMs,
|
|
},
|
|
on: () => {},
|
|
};
|
|
}
|
|
|
|
test.beforeEach((t) => {
|
|
t.context.pubsub = pubsub();
|
|
});
|
|
|
|
|
|
test("Published messages are available to be consumed", async (t) => {
|
|
const { pubsub } = t.context;
|
|
t.plan(2);
|
|
await new Promise((resolve) => {
|
|
pubsub.publish(publishCall("topic", "message"), (error) => {
|
|
t.is(error, undefined);
|
|
});
|
|
pubsub.consume(consumeCall("topic"), (error, { message }) => {
|
|
t.is(message, "message");
|
|
resolve();
|
|
});
|
|
});
|
|
});
|
|
|
|
test("Published messages automatically create topics", async (t) => {
|
|
const { pubsub } = t.context;
|
|
t.plan(4);
|
|
await new Promise((resolve) => {
|
|
pubsub.publish(publishCall("topic", "message"), (error) => {
|
|
t.is(error, undefined);
|
|
});
|
|
pubsub.publish(publishCall("topic2", "message2"), (error) => {
|
|
t.is(error, undefined);
|
|
});
|
|
pubsub.consume(consumeCall("topic"), (error, { message }) => {
|
|
t.is(message, "message");
|
|
});
|
|
pubsub.consume(consumeCall("topic2"), (error, { message }) => {
|
|
t.is(message, "message2");
|
|
resolve();
|
|
});
|
|
});
|
|
});
|
|
|
|
test("Consumer waits for a published message", async (t) => {
|
|
const { pubsub } = t.context;
|
|
t.plan(2);
|
|
await new Promise((resolve) => {
|
|
pubsub.consume(consumeCall("topic"), (error, { message }) => {
|
|
t.is(message, "message");
|
|
});
|
|
pubsub.publish(publishCall("topic", "message"), (error) => {
|
|
t.is(error, undefined);
|
|
resolve();
|
|
});
|
|
});
|
|
});
|
|
|
|
test("Consumer timeout if no message published", async (t) => {
|
|
const { pubsub } = t.context;
|
|
t.plan(2);
|
|
await new Promise((resolve) => {
|
|
pubsub.consume(consumeCall("topic", 0), (error, nothing) => {
|
|
t.is(nothing, undefined);
|
|
t.deepEqual(error, { message: "Timedout", status: 4 });
|
|
resolve();
|
|
});
|
|
});
|
|
});
|
|
|
|
test("Multiple consumers wait for multiple messages, FIFO", async (t) => {
|
|
const { pubsub } = t.context;
|
|
t.plan(6);
|
|
await new Promise((resolve) => {
|
|
pubsub.consume(consumeCall("topic"), (error, { message }) => {
|
|
t.is(message, "message1");
|
|
});
|
|
pubsub.consume(consumeCall("topic"), (error, { message }) => {
|
|
t.is(message, "message2");
|
|
});
|
|
pubsub.consume(consumeCall("topic"), (error, { message }) => {
|
|
t.is(message, "message3");
|
|
});
|
|
pubsub.publish(publishCall("topic", "message1"), (error) => {
|
|
t.is(error, undefined);
|
|
});
|
|
pubsub.publish(publishCall("topic", "message2"), (error) => {
|
|
t.is(error, undefined);
|
|
});
|
|
pubsub.publish(publishCall("topic", "message3"), (error) => {
|
|
t.is(error, undefined);
|
|
resolve();
|
|
});
|
|
});
|
|
});
|
|
|
|
test("Multiple consumers, only consumers that have not timed out get a message, FIFO", async (t) => {
|
|
const { pubsub } = t.context;
|
|
t.plan(8);
|
|
await new Promise(async (resolve) => {
|
|
pubsub.consume(consumeCall("topic"), (error, { message }) => {
|
|
t.is(message, "message1");
|
|
});
|
|
pubsub.consume(consumeCall("topic", 0), (error, nothing) => {
|
|
t.is(nothing, undefined);
|
|
t.deepEqual(error, { message: "Timedout", status: 4 });
|
|
});
|
|
pubsub.consume(consumeCall("topic"), (error, { message }) => {
|
|
t.is(message, "message2");
|
|
});
|
|
pubsub.consume(consumeCall("topic"), (error, { message }) => {
|
|
t.is(message, "message3");
|
|
});
|
|
//Allow consumer to timeout
|
|
await sleep(1);
|
|
pubsub.publish(publishCall("topic", "message1"), (error) => {
|
|
t.is(error, undefined);
|
|
});
|
|
pubsub.publish(publishCall("topic", "message2"), (error) => {
|
|
t.is(error, undefined);
|
|
});
|
|
pubsub.publish(publishCall("topic", "message3"), (error) => {
|
|
t.is(error, undefined);
|
|
resolve();
|
|
});
|
|
});
|
|
});
|
|
|
|
test("Multiple consumers trying to read a message, only the first will succeed", async (t) => {
|
|
const { pubsub } = t.context;
|
|
t.plan(4);
|
|
await new Promise(async (resolve) => {
|
|
pubsub.publish(publishCall("topic", "message1"), (error) => {
|
|
t.is(error, undefined);
|
|
});
|
|
pubsub.consume(consumeCall("topic"), (error, { message }) => {
|
|
t.is(message, "message1");
|
|
});
|
|
pubsub.consume(consumeCall("topic", 100), (error, nothing) => {
|
|
t.is(nothing, undefined);
|
|
t.deepEqual(error, { message: "Timedout", status: 4 });
|
|
resolve()
|
|
});
|
|
});
|
|
});
|