denonbu-feed/src/subscription.ts

61 lines
1.7 KiB
TypeScript
Raw Normal View History

2023-05-10 14:56:30 +00:00
import { ids, lexicons } from './lexicon/lexicons'
import { Record as PostRecord } from './lexicon/types/app/bsky/feed/post'
import {
OutputSchema as RepoEvent,
isCommit,
} from './lexicon/types/com/atproto/sync/subscribeRepos'
import {
FirehoseSubscriptionBase,
getPostOperations,
} from './util/subscription'
export class FirehoseSubscription extends FirehoseSubscriptionBase {
async handleEvent(evt: RepoEvent) {
if (!isCommit(evt)) return
const postOps = await getPostOperations(evt)
const postsToDelete = postOps.deletes.map((del) => del.uri)
const postsToCreate = postOps.creates
.filter((create) => {
// only alf-related posts
return (
isPost(create.record) &&
create.record.text.toLowerCase().includes('alf')
)
})
.map((create) => {
// map alf-related posts to a db row
const record = isPost(create.record) ? create.record : null
return {
uri: create.uri,
cid: create.cid,
replyParent: record?.reply?.parent.uri ?? null,
replyRoot: record?.reply?.root.uri ?? null,
indexedAt: new Date().toISOString(),
}
})
if (postsToDelete.length > 0) {
await this.db
.deleteFrom('post')
2023-05-10 14:56:30 +00:00
.where('uri', 'in', postsToDelete)
.execute()
}
if (postsToCreate.length > 0) {
await this.db
.insertInto('post')
2023-05-10 14:56:30 +00:00
.values(postsToCreate)
.onConflict((oc) => oc.doNothing())
.execute()
}
}
}
export const isPost = (obj: unknown): obj is PostRecord => {
try {
lexicons.assertValidRecord(ids.AppBskyFeedPost, obj)
return true
} catch (err) {
return false
}
}