From f4b815926432bedaf66a7da99b6c8eb65144f2c6 Mon Sep 17 00:00:00 2001 From: Edison Lee Date: Fri, 16 Jun 2023 05:52:04 +0800 Subject: [PATCH] fix: handle firehose subscription error reconnect (Close #44) (#46) --- .env.example | 5 ++++- src/config.ts | 1 + src/index.ts | 2 ++ src/server.ts | 2 +- src/util/subscription.ts | 25 +++++++++++++++---------- 5 files changed, 23 insertions(+), 12 deletions(-) diff --git a/.env.example b/.env.example index 5139bb5..d6e5de9 100644 --- a/.env.example +++ b/.env.example @@ -19,4 +19,7 @@ FEEDGEN_HOSTNAME="example.com" FEEDGEN_PUBLISHER_DID="did:plc:abcde...." # Only use this if you want a service did different from did:web -# FEEDGEN_SERVICE_DID="did:plc:abcde..." \ No newline at end of file +# FEEDGEN_SERVICE_DID="did:plc:abcde..." + +# Delay between reconnect attempts to the firehose subscription endpoint (in milliseconds) +FEEDGEN_SUBSCRIPTION_RECONNECT_DELAY=3000 diff --git a/src/config.ts b/src/config.ts index 3b521f7..c394fc9 100644 --- a/src/config.ts +++ b/src/config.ts @@ -15,4 +15,5 @@ export type Config = { subscriptionEndpoint: string serviceDid: string publisherDid: string + subscriptionReconnectDelay: number } diff --git a/src/index.ts b/src/index.ts index 34c6107..f05fa1b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -15,6 +15,8 @@ const run = async () => { 'wss://bsky.social', publisherDid: maybeStr(process.env.FEEDGEN_PUBLISHER_DID) ?? 'did:example:alice', + subscriptionReconnectDelay: + maybeInt(process.env.FEEDGEN_SUBSCRIPTION_RECONNECT_DELAY) ?? 3000, hostname, serviceDid, }) diff --git a/src/server.ts b/src/server.ts index 90a227b..ddbd2d2 100644 --- a/src/server.ts +++ b/src/server.ts @@ -63,7 +63,7 @@ export class FeedGenerator { async start(): Promise { await migrateToLatest(this.db) - this.firehose.run() + this.firehose.run(this.cfg.subscriptionReconnectDelay) this.server = this.app.listen(this.cfg.port, this.cfg.listenhost) await events.once(this.server, 'listening') return this.server diff --git a/src/util/subscription.ts b/src/util/subscription.ts index 4061f3f..87ab116 100644 --- a/src/util/subscription.ts +++ b/src/util/subscription.ts @@ -36,17 +36,22 @@ export abstract class FirehoseSubscriptionBase { abstract handleEvent(evt: RepoEvent): Promise - async run() { - for await (const evt of this.sub) { - try { - await this.handleEvent(evt) - } catch (err) { - console.error('repo subscription could not handle message', err) - } - // update stored cursor every 20 events or so - if (isCommit(evt) && evt.seq % 20 === 0) { - await this.updateCursor(evt.seq) + async run(subscriptionReconnectDelay: number) { + try { + for await (const evt of this.sub) { + try { + await this.handleEvent(evt) + } catch (err) { + console.error('repo subscription could not handle message', err) + } + // update stored cursor every 20 events or so + if (isCommit(evt) && evt.seq % 20 === 0) { + await this.updateCursor(evt.seq) + } } + } catch (err) { + console.error('repo subscription errored', err) + setTimeout(() => this.run(subscriptionReconnectDelay), subscriptionReconnectDelay) } }