live tail subscription
This commit is contained in:
parent
ff09bc5337
commit
bca915f6ae
@ -30,7 +30,7 @@ export class FeedGenerator {
|
|||||||
static create(config?: Partial<Config>) {
|
static create(config?: Partial<Config>) {
|
||||||
const cfg: Config = {
|
const cfg: Config = {
|
||||||
port: config?.port ?? 3000,
|
port: config?.port ?? 3000,
|
||||||
sqliteLocation: config?.sqliteLocation ?? 'test.sqlite',
|
sqliteLocation: config?.sqliteLocation ?? ':memory:',
|
||||||
subscriptionEndpoint: config?.subscriptionEndpoint ?? 'wss://bsky.social',
|
subscriptionEndpoint: config?.subscriptionEndpoint ?? 'wss://bsky.social',
|
||||||
serviceDid: config?.serviceDid ?? 'did:example:test',
|
serviceDid: config?.serviceDid ?? 'did:example:test',
|
||||||
}
|
}
|
||||||
|
@ -8,6 +8,11 @@ export class FirehoseSubscription extends FirehoseSubscriptionBase {
|
|||||||
async handleEvent(evt: RepoEvent) {
|
async handleEvent(evt: RepoEvent) {
|
||||||
if (!isCommit(evt)) return
|
if (!isCommit(evt)) return
|
||||||
const ops = await getOpsByType(evt)
|
const ops = await getOpsByType(evt)
|
||||||
|
if (ops.posts.creates.length > 0) {
|
||||||
|
for (const op of ops.posts.creates) {
|
||||||
|
console.log(op.record.text)
|
||||||
|
}
|
||||||
|
}
|
||||||
const postsToDelete = ops.posts.deletes.map((del) => del.uri)
|
const postsToDelete = ops.posts.deletes.map((del) => del.uri)
|
||||||
const postsToCreate = ops.posts.creates
|
const postsToCreate = ops.posts.creates
|
||||||
.filter((create) => {
|
.filter((create) => {
|
||||||
|
@ -36,7 +36,6 @@ export abstract class FirehoseSubscriptionBase {
|
|||||||
abstract handleEvent(evt: RepoEvent): Promise<void>
|
abstract handleEvent(evt: RepoEvent): Promise<void>
|
||||||
|
|
||||||
async run() {
|
async run() {
|
||||||
await this.ensureCursor()
|
|
||||||
for await (const evt of this.sub) {
|
for await (const evt of this.sub) {
|
||||||
try {
|
try {
|
||||||
await this.handleEvent(evt)
|
await this.handleEvent(evt)
|
||||||
@ -50,17 +49,6 @@ export abstract class FirehoseSubscriptionBase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async ensureCursor() {
|
|
||||||
await this.db
|
|
||||||
.insertInto('sub_state')
|
|
||||||
.values({
|
|
||||||
service: this.service,
|
|
||||||
cursor: 0,
|
|
||||||
})
|
|
||||||
.onConflict((oc) => oc.doNothing())
|
|
||||||
.execute()
|
|
||||||
}
|
|
||||||
|
|
||||||
async updateCursor(cursor: number) {
|
async updateCursor(cursor: number) {
|
||||||
await this.db
|
await this.db
|
||||||
.updateTable('sub_state')
|
.updateTable('sub_state')
|
||||||
@ -69,13 +57,13 @@ export abstract class FirehoseSubscriptionBase {
|
|||||||
.execute()
|
.execute()
|
||||||
}
|
}
|
||||||
|
|
||||||
async getCursor(): Promise<{ cursor: number }> {
|
async getCursor(): Promise<{ cursor?: number }> {
|
||||||
const res = await this.db
|
const res = await this.db
|
||||||
.selectFrom('sub_state')
|
.selectFrom('sub_state')
|
||||||
.selectAll()
|
.selectAll()
|
||||||
.where('service', '=', this.service)
|
.where('service', '=', this.service)
|
||||||
.executeTakeFirst()
|
.executeTakeFirst()
|
||||||
return res ? { cursor: res.cursor } : { cursor: 0 }
|
return res ? { cursor: res.cursor } : {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user