From 9eb71863b3ecc6d6647c51cae58c21c1b9b07fb8 Mon Sep 17 00:00:00 2001 From: devin ivy Date: Thu, 11 May 2023 00:02:18 -0400 Subject: [PATCH] More convenient access to ops by record type (#6) --- src/subscription.ts | 32 +++--------- src/util/subscription.ts | 110 ++++++++++++++++++++++++++++----------- 2 files changed, 88 insertions(+), 54 deletions(-) diff --git a/src/subscription.ts b/src/subscription.ts index 02b03bf..649f7ed 100644 --- a/src/subscription.ts +++ b/src/subscription.ts @@ -1,35 +1,26 @@ -import { ids, lexicons } from './lexicon/lexicons' -import { Record as PostRecord } from './lexicon/types/app/bsky/feed/post' import { OutputSchema as RepoEvent, isCommit, } from './lexicon/types/com/atproto/sync/subscribeRepos' -import { - FirehoseSubscriptionBase, - getPostOperations, -} from './util/subscription' +import { FirehoseSubscriptionBase, getOpsByType } from './util/subscription' export class FirehoseSubscription extends FirehoseSubscriptionBase { async handleEvent(evt: RepoEvent) { if (!isCommit(evt)) return - const postOps = await getPostOperations(evt) - const postsToDelete = postOps.deletes.map((del) => del.uri) - const postsToCreate = postOps.creates + const ops = await getOpsByType(evt) + const postsToDelete = ops.posts.deletes.map((del) => del.uri) + const postsToCreate = ops.posts.creates .filter((create) => { // only alf-related posts - return ( - isPost(create.record) && - create.record.text.toLowerCase().includes('alf') - ) + return create.record.text.toLowerCase().includes('alf') }) .map((create) => { // map alf-related posts to a db row - const record = isPost(create.record) ? create.record : null return { uri: create.uri, cid: create.cid, - replyParent: record?.reply?.parent.uri ?? null, - replyRoot: record?.reply?.root.uri ?? null, + replyParent: create.record?.reply?.parent.uri ?? null, + replyRoot: create.record?.reply?.root.uri ?? null, indexedAt: new Date().toISOString(), } }) @@ -49,12 +40,3 @@ export class FirehoseSubscription extends FirehoseSubscriptionBase { } } } - -export const isPost = (obj: unknown): obj is PostRecord => { - try { - lexicons.assertValidRecord(ids.AppBskyFeedPost, obj) - return true - } catch (err) { - return false - } -} diff --git a/src/util/subscription.ts b/src/util/subscription.ts index 711b642..3eee79e 100644 --- a/src/util/subscription.ts +++ b/src/util/subscription.ts @@ -1,12 +1,16 @@ import { Subscription } from '@atproto/xrpc-server' +import { cborToLexRecord, readCar } from '@atproto/repo' import { ids, lexicons } from '../lexicon/lexicons' +import { Record as PostRecord } from '../lexicon/types/app/bsky/feed/post' +import { Record as RepostRecord } from '../lexicon/types/app/bsky/feed/repost' +import { Record as LikeRecord } from '../lexicon/types/app/bsky/feed/like' +import { Record as FollowRecord } from '../lexicon/types/app/bsky/graph/follow' import { Commit, OutputSchema as RepoEvent, isCommit, } from '../lexicon/types/com/atproto/sync/subscribeRepos' import { Database } from '../db' -import { cborToLexRecord, readCar } from '@atproto/repo' export abstract class FirehoseSubscriptionBase { public sub: Subscription @@ -75,50 +79,98 @@ export abstract class FirehoseSubscriptionBase { } } -export const getPostOperations = async (evt: Commit): Promise => { - const ops: Operations = { creates: [], deletes: [] } - const postOps = evt.ops.filter( - (op) => op.path.split('/')[0] === ids.AppBskyFeedPost, - ) - - if (postOps.length < 1) return ops - +export const getOpsByType = async (evt: Commit): Promise => { const car = await readCar(evt.blocks) + const opsByType: OperationsByType = { + posts: { creates: [], deletes: [] }, + reposts: { creates: [], deletes: [] }, + likes: { creates: [], deletes: [] }, + follows: { creates: [], deletes: [] }, + } - for (const op of postOps) { - // updates not supported yet - if (op.action === 'update') continue + for (const op of evt.ops) { const uri = `at://${evt.repo}/${op.path}` - if (op.action === 'delete') { - ops.deletes.push({ uri }) - } else if (op.action === 'create') { + const [collection] = op.path.split('/') + + if (op.action === 'update') continue // updates not supported yet + + if (op.action === 'create') { if (!op.cid) continue - const postBytes = await car.blocks.get(op.cid) - if (!postBytes) continue - ops.creates.push({ - uri, - cid: op.cid.toString(), - author: evt.repo, - record: cborToLexRecord(postBytes), - }) + const recordBytes = car.blocks.get(op.cid) + if (!recordBytes) continue + const record = cborToLexRecord(recordBytes) + const create = { uri, cid: op.cid.toString(), author: evt.repo } + if (collection === ids.AppBskyFeedPost && isPost(record)) { + opsByType.posts.creates.push({ record, ...create }) + } else if (collection === ids.AppBskyFeedRepost && isRepost(record)) { + opsByType.reposts.creates.push({ record, ...create }) + } else if (collection === ids.AppBskyFeedLike && isLike(record)) { + opsByType.likes.creates.push({ record, ...create }) + } else if (collection === ids.AppBskyGraphFollow && isFollow(record)) { + opsByType.follows.creates.push({ record, ...create }) + } + } + + if (op.action === 'delete') { + if (collection === ids.AppBskyFeedPost) { + opsByType.posts.deletes.push({ uri }) + } else if (collection === ids.AppBskyFeedRepost) { + opsByType.reposts.deletes.push({ uri }) + } else if (collection === ids.AppBskyFeedLike) { + opsByType.likes.deletes.push({ uri }) + } else if (collection === ids.AppBskyGraphFollow) { + opsByType.follows.deletes.push({ uri }) + } } } - return ops + return opsByType } -type CreateOp = { +type OperationsByType = { + posts: Operations + reposts: Operations + likes: Operations + follows: Operations +} + +type Operations> = { + creates: CreateOp[] + deletes: DeleteOp[] +} + +type CreateOp = { uri: string cid: string author: string - record: Record + record: T } type DeleteOp = { uri: string } -type Operations = { - creates: CreateOp[] - deletes: DeleteOp[] +export const isPost = (obj: unknown): obj is PostRecord => { + return isType(obj, ids.AppBskyFeedPost) +} + +export const isRepost = (obj: unknown): obj is RepostRecord => { + return isType(obj, ids.AppBskyFeedRepost) +} + +export const isLike = (obj: unknown): obj is LikeRecord => { + return isType(obj, ids.AppBskyFeedLike) +} + +export const isFollow = (obj: unknown): obj is FollowRecord => { + return isType(obj, ids.AppBskyGraphFollow) +} + +const isType = (obj: unknown, nsid: string) => { + try { + lexicons.assertValidRecord(nsid, obj) + return true + } catch (err) { + return false + } }