More convenient access to ops by record type (#6)

This commit is contained in:
devin ivy 2023-05-11 00:02:18 -04:00 committed by GitHub
parent 71c2ee061e
commit 9eb71863b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 88 additions and 54 deletions

View File

@ -1,35 +1,26 @@
import { ids, lexicons } from './lexicon/lexicons'
import { Record as PostRecord } from './lexicon/types/app/bsky/feed/post'
import { import {
OutputSchema as RepoEvent, OutputSchema as RepoEvent,
isCommit, isCommit,
} from './lexicon/types/com/atproto/sync/subscribeRepos' } from './lexicon/types/com/atproto/sync/subscribeRepos'
import { import { FirehoseSubscriptionBase, getOpsByType } from './util/subscription'
FirehoseSubscriptionBase,
getPostOperations,
} from './util/subscription'
export class FirehoseSubscription extends FirehoseSubscriptionBase { export class FirehoseSubscription extends FirehoseSubscriptionBase {
async handleEvent(evt: RepoEvent) { async handleEvent(evt: RepoEvent) {
if (!isCommit(evt)) return if (!isCommit(evt)) return
const postOps = await getPostOperations(evt) const ops = await getOpsByType(evt)
const postsToDelete = postOps.deletes.map((del) => del.uri) const postsToDelete = ops.posts.deletes.map((del) => del.uri)
const postsToCreate = postOps.creates const postsToCreate = ops.posts.creates
.filter((create) => { .filter((create) => {
// only alf-related posts // only alf-related posts
return ( return create.record.text.toLowerCase().includes('alf')
isPost(create.record) &&
create.record.text.toLowerCase().includes('alf')
)
}) })
.map((create) => { .map((create) => {
// map alf-related posts to a db row // map alf-related posts to a db row
const record = isPost(create.record) ? create.record : null
return { return {
uri: create.uri, uri: create.uri,
cid: create.cid, cid: create.cid,
replyParent: record?.reply?.parent.uri ?? null, replyParent: create.record?.reply?.parent.uri ?? null,
replyRoot: record?.reply?.root.uri ?? null, replyRoot: create.record?.reply?.root.uri ?? null,
indexedAt: new Date().toISOString(), indexedAt: new Date().toISOString(),
} }
}) })
@ -49,12 +40,3 @@ export class FirehoseSubscription extends FirehoseSubscriptionBase {
} }
} }
} }
export const isPost = (obj: unknown): obj is PostRecord => {
try {
lexicons.assertValidRecord(ids.AppBskyFeedPost, obj)
return true
} catch (err) {
return false
}
}

View File

@ -1,12 +1,16 @@
import { Subscription } from '@atproto/xrpc-server' import { Subscription } from '@atproto/xrpc-server'
import { cborToLexRecord, readCar } from '@atproto/repo'
import { ids, lexicons } from '../lexicon/lexicons' import { ids, lexicons } from '../lexicon/lexicons'
import { Record as PostRecord } from '../lexicon/types/app/bsky/feed/post'
import { Record as RepostRecord } from '../lexicon/types/app/bsky/feed/repost'
import { Record as LikeRecord } from '../lexicon/types/app/bsky/feed/like'
import { Record as FollowRecord } from '../lexicon/types/app/bsky/graph/follow'
import { import {
Commit, Commit,
OutputSchema as RepoEvent, OutputSchema as RepoEvent,
isCommit, isCommit,
} from '../lexicon/types/com/atproto/sync/subscribeRepos' } from '../lexicon/types/com/atproto/sync/subscribeRepos'
import { Database } from '../db' import { Database } from '../db'
import { cborToLexRecord, readCar } from '@atproto/repo'
export abstract class FirehoseSubscriptionBase { export abstract class FirehoseSubscriptionBase {
public sub: Subscription<RepoEvent> public sub: Subscription<RepoEvent>
@ -75,50 +79,98 @@ export abstract class FirehoseSubscriptionBase {
} }
} }
export const getPostOperations = async (evt: Commit): Promise<Operations> => { export const getOpsByType = async (evt: Commit): Promise<OperationsByType> => {
const ops: Operations = { creates: [], deletes: [] }
const postOps = evt.ops.filter(
(op) => op.path.split('/')[0] === ids.AppBskyFeedPost,
)
if (postOps.length < 1) return ops
const car = await readCar(evt.blocks) const car = await readCar(evt.blocks)
const opsByType: OperationsByType = {
posts: { creates: [], deletes: [] },
reposts: { creates: [], deletes: [] },
likes: { creates: [], deletes: [] },
follows: { creates: [], deletes: [] },
}
for (const op of postOps) { for (const op of evt.ops) {
// updates not supported yet
if (op.action === 'update') continue
const uri = `at://${evt.repo}/${op.path}` const uri = `at://${evt.repo}/${op.path}`
if (op.action === 'delete') { const [collection] = op.path.split('/')
ops.deletes.push({ uri })
} else if (op.action === 'create') { if (op.action === 'update') continue // updates not supported yet
if (op.action === 'create') {
if (!op.cid) continue if (!op.cid) continue
const postBytes = await car.blocks.get(op.cid) const recordBytes = car.blocks.get(op.cid)
if (!postBytes) continue if (!recordBytes) continue
ops.creates.push({ const record = cborToLexRecord(recordBytes)
uri, const create = { uri, cid: op.cid.toString(), author: evt.repo }
cid: op.cid.toString(), if (collection === ids.AppBskyFeedPost && isPost(record)) {
author: evt.repo, opsByType.posts.creates.push({ record, ...create })
record: cborToLexRecord(postBytes), } else if (collection === ids.AppBskyFeedRepost && isRepost(record)) {
}) opsByType.reposts.creates.push({ record, ...create })
} else if (collection === ids.AppBskyFeedLike && isLike(record)) {
opsByType.likes.creates.push({ record, ...create })
} else if (collection === ids.AppBskyGraphFollow && isFollow(record)) {
opsByType.follows.creates.push({ record, ...create })
} }
} }
return ops if (op.action === 'delete') {
if (collection === ids.AppBskyFeedPost) {
opsByType.posts.deletes.push({ uri })
} else if (collection === ids.AppBskyFeedRepost) {
opsByType.reposts.deletes.push({ uri })
} else if (collection === ids.AppBskyFeedLike) {
opsByType.likes.deletes.push({ uri })
} else if (collection === ids.AppBskyGraphFollow) {
opsByType.follows.deletes.push({ uri })
}
}
} }
type CreateOp = { return opsByType
}
type OperationsByType = {
posts: Operations<PostRecord>
reposts: Operations<RepostRecord>
likes: Operations<LikeRecord>
follows: Operations<FollowRecord>
}
type Operations<T = Record<string, unknown>> = {
creates: CreateOp<T>[]
deletes: DeleteOp[]
}
type CreateOp<T> = {
uri: string uri: string
cid: string cid: string
author: string author: string
record: Record<string, unknown> record: T
} }
type DeleteOp = { type DeleteOp = {
uri: string uri: string
} }
type Operations = { export const isPost = (obj: unknown): obj is PostRecord => {
creates: CreateOp[] return isType(obj, ids.AppBskyFeedPost)
deletes: DeleteOp[] }
export const isRepost = (obj: unknown): obj is RepostRecord => {
return isType(obj, ids.AppBskyFeedRepost)
}
export const isLike = (obj: unknown): obj is LikeRecord => {
return isType(obj, ids.AppBskyFeedLike)
}
export const isFollow = (obj: unknown): obj is FollowRecord => {
return isType(obj, ids.AppBskyGraphFollow)
}
const isType = (obj: unknown, nsid: string) => {
try {
lexicons.assertValidRecord(nsid, obj)
return true
} catch (err) {
return false
}
} }