From 71c2ee061e9de900125a59625351986ebdc1dc08 Mon Sep 17 00:00:00 2001 From: devin ivy Date: Thu, 11 May 2023 00:00:44 -0400 Subject: [PATCH] Setup simple migration system (#7) * Setup migrations, misc tidy/fixes * Simplify migration provider --- src/db/index.ts | 28 +++++++++------------------- src/db/migrations.ts | 31 +++++++++++++++++++++++++++++++ src/db/schema.ts | 17 +++++++++++++++++ src/feed-generation.ts | 8 ++++---- src/index.ts | 2 +- src/server.ts | 15 +++++++-------- src/subscription.ts | 4 ++-- src/util/subscription.ts | 2 +- 8 files changed, 72 insertions(+), 35 deletions(-) create mode 100644 src/db/migrations.ts create mode 100644 src/db/schema.ts diff --git a/src/db/index.ts b/src/db/index.ts index 3c269ad..9275d46 100644 --- a/src/db/index.ts +++ b/src/db/index.ts @@ -1,5 +1,7 @@ -import { Kysely, SqliteDialect } from 'kysely' import SqliteDb from 'better-sqlite3' +import { Kysely, Migrator, SqliteDialect } from 'kysely' +import { DatabaseSchema } from './schema' +import { migrationProvider } from './migrations' export const createDb = (location: string): Database => { return new Kysely({ @@ -9,22 +11,10 @@ export const createDb = (location: string): Database => { }) } +export const migrateToLatest = async (db: Database) => { + const migrator = new Migrator({ db, provider: migrationProvider }) + const { error } = await migrator.migrateToLatest() + if (error) throw error +} + export type Database = Kysely - -export type PostTable = { - uri: string - cid: string - replyParent: string | null - replyRoot: string | null - indexedAt: string -} - -export type SubStateTable = { - service: string - cursor: number -} - -export type DatabaseSchema = { - posts: PostTable - sub_state: SubStateTable -} diff --git a/src/db/migrations.ts b/src/db/migrations.ts new file mode 100644 index 0000000..9660078 --- /dev/null +++ b/src/db/migrations.ts @@ -0,0 +1,31 @@ +import { Kysely, Migration, MigrationProvider } from 'kysely' + +const migrations: Record = {} + +export const migrationProvider: MigrationProvider = { + async getMigrations() { + return migrations + }, +} + +migrations['001'] = { + async up(db: Kysely) { + await db.schema + .createTable('post') + .addColumn('uri', 'varchar', (col) => col.primaryKey()) + .addColumn('cid', 'varchar', (col) => col.notNull()) + .addColumn('replyParent', 'varchar') + .addColumn('replyRoot', 'varchar') + .addColumn('indexedAt', 'varchar', (col) => col.notNull()) + .execute() + await db.schema + .createTable('sub_state') + .addColumn('service', 'varchar', (col) => col.primaryKey()) + .addColumn('cursor', 'integer', (col) => col.notNull()) + .execute() + }, + async down(db: Kysely) { + await db.schema.dropTable('post').execute() + await db.schema.dropTable('sub_state').execute() + }, +} diff --git a/src/db/schema.ts b/src/db/schema.ts new file mode 100644 index 0000000..8b2455d --- /dev/null +++ b/src/db/schema.ts @@ -0,0 +1,17 @@ +export type DatabaseSchema = { + post: Post + sub_state: SubState +} + +export type Post = { + uri: string + cid: string + replyParent: string | null + replyRoot: string | null + indexedAt: string +} + +export type SubState = { + service: string + cursor: number +} diff --git a/src/feed-generation.ts b/src/feed-generation.ts index 30b9fd8..4a7aa64 100644 --- a/src/feed-generation.ts +++ b/src/feed-generation.ts @@ -8,7 +8,7 @@ export default function (server: Server, db: Database) { throw new InvalidRequestError('algorithm unsupported') } let builder = db - .selectFrom('posts') + .selectFrom('post') .selectAll() .orderBy('indexedAt', 'desc') .orderBy('cid', 'desc') @@ -20,9 +20,9 @@ export default function (server: Server, db: Database) { } const timeStr = new Date(parseInt(indexedAt, 10)).toISOString() builder = builder - .where('posts.indexedAt', '<', timeStr) - .orWhere((qb) => qb.where('posts.indexedAt', '=', timeStr)) - .where('posts.cid', '<', cid) + .where('post.indexedAt', '<', timeStr) + .orWhere((qb) => qb.where('post.indexedAt', '=', timeStr)) + .where('post.cid', '<', cid) } const res = await builder.execute() diff --git a/src/index.ts b/src/index.ts index 8770d69..36a578b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -9,7 +9,7 @@ const run = async () => { subscriptionEndpoint: maybeStr(process.env.FEEDGEN_SUBSCRIPTION_ENDPOINT), }) await server.start() - console.log(`🤖 running feed generator at localhost${server.cfg.port}`) + console.log(`🤖 running feed generator at localhost:${server.cfg.port}`) } const maybeStr = (val?: string) => { diff --git a/src/server.ts b/src/server.ts index 9b59d75..1a5c36b 100644 --- a/src/server.ts +++ b/src/server.ts @@ -3,7 +3,7 @@ import events from 'events' import express from 'express' import { createServer } from './lexicon' import feedGeneration from './feed-generation' -import { createDb, Database } from './db' +import { createDb, Database, migrateToLatest } from './db' import { FirehoseSubscription } from './subscription' export type Config = { @@ -32,7 +32,7 @@ export class FeedGenerator { } static create(config?: Partial) { - const cfg = { + const cfg: Config = { port: config?.port ?? 3000, sqliteLocation: config?.sqliteLocation ?? 'test.sqlite', subscriptionEndpoint: config?.subscriptionEndpoint ?? 'wss://bsky.social', @@ -56,12 +56,11 @@ export class FeedGenerator { } async start(): Promise { - await this.firehose.run() - const server = this.app.listen(this.cfg.port) - server.keepAliveTimeout = 90000 - this.server = server - await events.once(server, 'listening') - return server + await migrateToLatest(this.db) + this.firehose.run() + this.server = this.app.listen(this.cfg.port) + await events.once(this.server, 'listening') + return this.server } } diff --git a/src/subscription.ts b/src/subscription.ts index 943b8d1..02b03bf 100644 --- a/src/subscription.ts +++ b/src/subscription.ts @@ -36,13 +36,13 @@ export class FirehoseSubscription extends FirehoseSubscriptionBase { if (postsToDelete.length > 0) { await this.db - .deleteFrom('posts') + .deleteFrom('post') .where('uri', 'in', postsToDelete) .execute() } if (postsToCreate.length > 0) { await this.db - .insertInto('posts') + .insertInto('post') .values(postsToCreate) .onConflict((oc) => oc.doNothing()) .execute() diff --git a/src/util/subscription.ts b/src/util/subscription.ts index f3a892c..711b642 100644 --- a/src/util/subscription.ts +++ b/src/util/subscription.ts @@ -78,7 +78,7 @@ export abstract class FirehoseSubscriptionBase { export const getPostOperations = async (evt: Commit): Promise => { const ops: Operations = { creates: [], deletes: [] } const postOps = evt.ops.filter( - (op) => op.path.split('/')[1] === ids.AppBskyFeedPost, + (op) => op.path.split('/')[0] === ids.AppBskyFeedPost, ) if (postOps.length < 1) return ops