From 21202e0447df1f1d4e666fd3533e6324747d80a1 Mon Sep 17 00:00:00 2001 From: Dominic Smith Date: Fri, 11 Oct 2024 13:59:54 +1100 Subject: [PATCH 1/2] Add reconnect logic --- src/rabbit.test.ts | 11 ++++----- src/rabbit.ts | 60 ++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 58 insertions(+), 13 deletions(-) diff --git a/src/rabbit.test.ts b/src/rabbit.test.ts index f1dd8a7..2e01334 100644 --- a/src/rabbit.test.ts +++ b/src/rabbit.test.ts @@ -9,18 +9,17 @@ const noopLogger = { }; async function setup(t: ExecutionContext) { - const conn = await connect(process.env.RABBIT_URL || "amqp://127.0.0.1"); - t.teardown(async () => { - await conn.close(); - }); - const exchangeName = `loke-queue.test-${ulid()}`; const rabbit = new RabbitHelper({ - amqpConnection: conn, + createConnection: () => + connect(process.env.RABBIT_URL || "amqp://127.0.0.1"), logger: noopLogger, exchangeName, }); + t.teardown(async () => { + await rabbit.close(); + }); await rabbit.assertExchange(); t.teardown(async () => { diff --git a/src/rabbit.ts b/src/rabbit.ts index a56ddab..7468a2a 100644 --- a/src/rabbit.ts +++ b/src/rabbit.ts @@ -22,15 +22,19 @@ export interface RabbitData { } export class RabbitHelper { - private amqpConn: Connection; + private createConnection: () => Promise; private logger: Logger; private exchangeName: string; // Tested having a channel pool, made no difference to performance private useChan: Promise | null = null; + private useConn: Promise | null = null; constructor(opts: { - /** The amqplib connection */ - amqpConnection: Connection; + /** + * A function that returns a new connection, this is used to create a new + * connection initially when the current one is disconnected + */ + createConnection: () => Promise; /** The exchange name to publish to, defaults to "pubsub" */ exchangeName?: string; /** Logger used for reporting errors */ @@ -38,9 +42,9 @@ export class RabbitHelper { }) { const { exchangeName = "pubsub" } = opts; - this.amqpConn = opts.amqpConnection; this.logger = opts.logger; this.exchangeName = exchangeName; + this.createConnection = opts.createConnection; } /** @@ -52,7 +56,8 @@ export class RabbitHelper { /** An optional signal use for aborting the operation */ signal?: AbortSignal; }): Promise<{ data: () => Promise> }> { - const ch = await this.amqpConn.createChannel(); + const conn = await this.getConnection(); + const ch = await conn.createChannel(); try { await ch.prefetch(1); @@ -122,7 +127,8 @@ export class RabbitHelper { handler: MessageHandler>; }): Promise { const inProgress = new Set>(); - const ch = await this.amqpConn.createChannel(); + const conn = await this.getConnection(); + const ch = await conn.createChannel(); try { await ch.prefetch(args.maxConcurrent || 20); @@ -289,7 +295,7 @@ export class RabbitHelper { async usingChannel(fn: (ch: Channel) => Promise): Promise { let ch: Channel; if (!this.useChan) { - this.useChan = Promise.resolve(this.amqpConn.createChannel()); + this.useChan = this.getConnection().then((conn) => conn.createChannel()); ch = await this.useChan; ch.once("close", () => { this.useChan = null; @@ -300,6 +306,46 @@ export class RabbitHelper { return await fn(ch); } + + async close(): Promise { + if (this.useConn) { + const conn = await this.useConn; + await conn.close(); + } + } + + private async getConnection(): Promise { + if (this.useConn) { + return this.useConn; + } + + const connP = this.createConnection().then((conn) => { + conn.once("error", (err) => { + conn.close(); + this.logger.error( + `RabbitMQ connection error - invalidating connection: ${err}` + ); + if (this.useConn === connP) { + this.useConn = null; + } + }); + + conn.once("close", () => { + this.logger.error( + "RabbitMQ connection closed unexpectedly - invalidating connection" + ); + if (this.useConn === connP) { + this.useConn = null; + } + }); + + return conn; + }); + + this.useConn = connP; + + return this.useConn; + } } function unixTime() { From 52432aab20a70aaf9a5111fdbb0c53b4a53e9e8b Mon Sep 17 00:00:00 2001 From: Dominic Smith Date: Mon, 7 Apr 2025 14:58:55 +1000 Subject: [PATCH 2/2] WIP --- src/rabbit.ts | 63 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 43 insertions(+), 20 deletions(-) diff --git a/src/rabbit.ts b/src/rabbit.ts index 7468a2a..53de8a7 100644 --- a/src/rabbit.ts +++ b/src/rabbit.ts @@ -295,10 +295,21 @@ export class RabbitHelper { async usingChannel(fn: (ch: Channel) => Promise): Promise { let ch: Channel; if (!this.useChan) { - this.useChan = this.getConnection().then((conn) => conn.createChannel()); + const chanP = this.getConnection().then( + (conn) => conn.createChannel(), + (err) => { + if (this.useChan === chanP) { + this.useChan = null; + } + throw err; + } + ); + this.useChan = chanP; ch = await this.useChan; ch.once("close", () => { - this.useChan = null; + if (this.useChan === chanP) { + this.useChan = null; + } }); } else { ch = await this.useChan; @@ -319,28 +330,40 @@ export class RabbitHelper { return this.useConn; } - const connP = this.createConnection().then((conn) => { - conn.once("error", (err) => { - conn.close(); - this.logger.error( - `RabbitMQ connection error - invalidating connection: ${err}` - ); - if (this.useConn === connP) { - this.useConn = null; - } - }); + const connP = this.createConnection().then( + (conn) => { + conn.once("error", (err) => { + conn.close(); + this.logger.error( + `RabbitMQ connection error - invalidating connection: ${err}` + ); + if (this.useConn === connP) { + console.error("Invalidating connection"); + this.useConn = null; + } + }); - conn.once("close", () => { - this.logger.error( - "RabbitMQ connection closed unexpectedly - invalidating connection" - ); + conn.once("close", () => { + this.logger.error( + "RabbitMQ connection closed unexpectedly - invalidating connection" + ); + if (this.useConn === connP) { + console.error("Invalidating connection"); + this.useConn = null; + } + }); + + return conn; + }, + (err) => { + this.logger.error(`Failed to connect to RabbitMQ: ${err}`); if (this.useConn === connP) { + console.error("Invalidating connection"); this.useConn = null; } - }); - - return conn; - }); + throw err; + } + ); this.useConn = connP;