fix: handle firehose subscription error reconnect (Close #44) (#46)

This commit is contained in:
Edison Lee 2023-06-16 05:52:04 +08:00 committed by GitHub
parent 3e4011acc2
commit f4b8159264
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 23 additions and 12 deletions

View File

@ -19,4 +19,7 @@ FEEDGEN_HOSTNAME="example.com"
FEEDGEN_PUBLISHER_DID="did:plc:abcde...." FEEDGEN_PUBLISHER_DID="did:plc:abcde...."
# Only use this if you want a service did different from did:web # Only use this if you want a service did different from did:web
# FEEDGEN_SERVICE_DID="did:plc:abcde..." # FEEDGEN_SERVICE_DID="did:plc:abcde..."
# Delay between reconnect attempts to the firehose subscription endpoint (in milliseconds)
FEEDGEN_SUBSCRIPTION_RECONNECT_DELAY=3000

View File

@ -15,4 +15,5 @@ export type Config = {
subscriptionEndpoint: string subscriptionEndpoint: string
serviceDid: string serviceDid: string
publisherDid: string publisherDid: string
subscriptionReconnectDelay: number
} }

View File

@ -15,6 +15,8 @@ const run = async () => {
'wss://bsky.social', 'wss://bsky.social',
publisherDid: publisherDid:
maybeStr(process.env.FEEDGEN_PUBLISHER_DID) ?? 'did:example:alice', maybeStr(process.env.FEEDGEN_PUBLISHER_DID) ?? 'did:example:alice',
subscriptionReconnectDelay:
maybeInt(process.env.FEEDGEN_SUBSCRIPTION_RECONNECT_DELAY) ?? 3000,
hostname, hostname,
serviceDid, serviceDid,
}) })

View File

@ -63,7 +63,7 @@ export class FeedGenerator {
async start(): Promise<http.Server> { async start(): Promise<http.Server> {
await migrateToLatest(this.db) await migrateToLatest(this.db)
this.firehose.run() this.firehose.run(this.cfg.subscriptionReconnectDelay)
this.server = this.app.listen(this.cfg.port, this.cfg.listenhost) this.server = this.app.listen(this.cfg.port, this.cfg.listenhost)
await events.once(this.server, 'listening') await events.once(this.server, 'listening')
return this.server return this.server

View File

@ -36,17 +36,22 @@ export abstract class FirehoseSubscriptionBase {
abstract handleEvent(evt: RepoEvent): Promise<void> abstract handleEvent(evt: RepoEvent): Promise<void>
async run() { async run(subscriptionReconnectDelay: number) {
for await (const evt of this.sub) { try {
try { for await (const evt of this.sub) {
await this.handleEvent(evt) try {
} catch (err) { await this.handleEvent(evt)
console.error('repo subscription could not handle message', err) } catch (err) {
} console.error('repo subscription could not handle message', err)
// update stored cursor every 20 events or so }
if (isCommit(evt) && evt.seq % 20 === 0) { // update stored cursor every 20 events or so
await this.updateCursor(evt.seq) if (isCommit(evt) && evt.seq % 20 === 0) {
await this.updateCursor(evt.seq)
}
} }
} catch (err) {
console.error('repo subscription errored', err)
setTimeout(() => this.run(subscriptionReconnectDelay), subscriptionReconnectDelay)
} }
} }