diff --git a/src/rabbit.test.ts b/src/rabbit.test.ts index f1dd8a7..2e01334 100644 --- a/src/rabbit.test.ts +++ b/src/rabbit.test.ts @@ -9,18 +9,17 @@ const noopLogger = { }; async function setup(t: ExecutionContext) { - const conn = await connect(process.env.RABBIT_URL || "amqp://127.0.0.1"); - t.teardown(async () => { - await conn.close(); - }); - const exchangeName = `loke-queue.test-${ulid()}`; const rabbit = new RabbitHelper({ - amqpConnection: conn, + createConnection: () => + connect(process.env.RABBIT_URL || "amqp://127.0.0.1"), logger: noopLogger, exchangeName, }); + t.teardown(async () => { + await rabbit.close(); + }); await rabbit.assertExchange(); t.teardown(async () => { diff --git a/src/rabbit.ts b/src/rabbit.ts index a56ddab..53de8a7 100644 --- a/src/rabbit.ts +++ b/src/rabbit.ts @@ -22,15 +22,19 @@ export interface RabbitData { } export class RabbitHelper { - private amqpConn: Connection; + private createConnection: () => Promise; private logger: Logger; private exchangeName: string; // Tested having a channel pool, made no difference to performance private useChan: Promise | null = null; + private useConn: Promise | null = null; constructor(opts: { - /** The amqplib connection */ - amqpConnection: Connection; + /** + * A function that returns a new connection, this is used to create a new + * connection initially when the current one is disconnected + */ + createConnection: () => Promise; /** The exchange name to publish to, defaults to "pubsub" */ exchangeName?: string; /** Logger used for reporting errors */ @@ -38,9 +42,9 @@ export class RabbitHelper { }) { const { exchangeName = "pubsub" } = opts; - this.amqpConn = opts.amqpConnection; this.logger = opts.logger; this.exchangeName = exchangeName; + this.createConnection = opts.createConnection; } /** @@ -52,7 +56,8 @@ export class RabbitHelper { /** An optional signal use for aborting the operation */ signal?: AbortSignal; }): Promise<{ data: () => Promise> }> { - const ch = await this.amqpConn.createChannel(); + const conn = await this.getConnection(); + const ch = await conn.createChannel(); try { await ch.prefetch(1); @@ -122,7 +127,8 @@ export class RabbitHelper { handler: MessageHandler>; }): Promise { const inProgress = new Set>(); - const ch = await this.amqpConn.createChannel(); + const conn = await this.getConnection(); + const ch = await conn.createChannel(); try { await ch.prefetch(args.maxConcurrent || 20); @@ -289,10 +295,21 @@ export class RabbitHelper { async usingChannel(fn: (ch: Channel) => Promise): Promise { let ch: Channel; if (!this.useChan) { - this.useChan = Promise.resolve(this.amqpConn.createChannel()); + const chanP = this.getConnection().then( + (conn) => conn.createChannel(), + (err) => { + if (this.useChan === chanP) { + this.useChan = null; + } + throw err; + } + ); + this.useChan = chanP; ch = await this.useChan; ch.once("close", () => { - this.useChan = null; + if (this.useChan === chanP) { + this.useChan = null; + } }); } else { ch = await this.useChan; @@ -300,6 +317,58 @@ export class RabbitHelper { return await fn(ch); } + + async close(): Promise { + if (this.useConn) { + const conn = await this.useConn; + await conn.close(); + } + } + + private async getConnection(): Promise { + if (this.useConn) { + return this.useConn; + } + + const connP = this.createConnection().then( + (conn) => { + conn.once("error", (err) => { + conn.close(); + this.logger.error( + `RabbitMQ connection error - invalidating connection: ${err}` + ); + if (this.useConn === connP) { + console.error("Invalidating connection"); + this.useConn = null; + } + }); + + conn.once("close", () => { + this.logger.error( + "RabbitMQ connection closed unexpectedly - invalidating connection" + ); + if (this.useConn === connP) { + console.error("Invalidating connection"); + this.useConn = null; + } + }); + + return conn; + }, + (err) => { + this.logger.error(`Failed to connect to RabbitMQ: ${err}`); + if (this.useConn === connP) { + console.error("Invalidating connection"); + this.useConn = null; + } + throw err; + } + ); + + this.useConn = connP; + + return this.useConn; + } } function unixTime() {