import { Pool, type QueryResult, type QueryResultRow } from 'pg'; import type { AppConfig } from './config.js'; import { databaseMigrations } from './db/migrations.js'; const migrationTableSql = ` CREATE TABLE IF NOT EXISTS schema_migrations ( id TEXT PRIMARY KEY, name TEXT NOT NULL, applied_at TEXT NOT NULL ) `; type MigrationRow = QueryResultRow & { id: string; name: string; appliedAt: string; }; export type AppDatabase = { query( text: string, params?: readonly unknown[], ): Promise>; close(): Promise; }; type QueryablePool = Pick; function wrapPool(pool: QueryablePool): AppDatabase { return { query( text: string, params: readonly unknown[] = [], ) { return pool.query(text, [...params]); }, async close() { await pool.end(); }, }; } function validateDatabaseUrl(databaseUrl: string) { const trimmed = databaseUrl.trim(); if (!trimmed) { throw new Error('DATABASE_URL 不能为空'); } if (trimmed.startsWith('pg-mem://')) { return; } let protocol = ''; try { protocol = new URL(trimmed).protocol; } catch { throw new Error( 'DATABASE_URL 只支持 PostgreSQL 连接串或 pg-mem 测试连接', ); } if (protocol !== 'postgresql:' && protocol !== 'postgres:') { throw new Error( 'DATABASE_URL 只支持 PostgreSQL 连接串或 pg-mem 测试连接', ); } } export function summarizeDatabaseTarget(databaseUrl: string) { const trimmed = databaseUrl.trim(); if (!trimmed) { return '[missing]'; } if (trimmed.startsWith('pg-mem://')) { return trimmed; } try { const url = new URL(trimmed); const databaseName = url.pathname.replace(/^\/+/u, '') || 'postgres'; const portSuffix = url.port ? `:${url.port}` : ''; return `${url.protocol}//${url.hostname}${portSuffix}/${databaseName}`; } catch { return '[configured]'; } } async function ensureMigrationTable(db: AppDatabase) { await db.query(migrationTableSql); } export async function listAppliedMigrations(db: AppDatabase) { await ensureMigrationTable(db); const result = await db.query( `SELECT id, name, applied_at AS "appliedAt" FROM schema_migrations ORDER BY id`, ); return result.rows.map((row) => ({ id: row.id, name: row.name, appliedAt: row.appliedAt, })); } async function runMigrations(db: AppDatabase) { await ensureMigrationTable(db); const appliedMigrations = new Set( (await listAppliedMigrations(db)).map((migration) => migration.id), ); for (const migration of databaseMigrations) { if (appliedMigrations.has(migration.id)) { continue; } await db.query('BEGIN'); try { for (const statement of migration.statements) { await db.query(statement); } await db.query( `INSERT INTO schema_migrations (id, name, applied_at) VALUES ($1, $2, $3)`, [migration.id, migration.name, new Date().toISOString()], ); await db.query('COMMIT'); } catch (error) { await db.query('ROLLBACK'); throw new Error( `failed to apply database migration ${migration.id}: ${error instanceof Error ? error.message : 'unknown error'}`, ); } } } async function createInMemoryDatabase() { const { newDb } = await import('pg-mem'); const memoryDb = newDb({ autoCreateForeignKeyIndices: true, noAstCoverageCheck: true, }); const adapter = memoryDb.adapters.createPg(); const pool = new adapter.Pool() as unknown as QueryablePool; const db = wrapPool(pool); await runMigrations(db); return db; } export async function createDatabase(config: AppConfig) { validateDatabaseUrl(config.databaseUrl); if (config.databaseUrl.startsWith('pg-mem://')) { return createInMemoryDatabase(); } const pool = new Pool({ connectionString: config.databaseUrl, }); const db = wrapPool(pool); await db.query('SELECT 1'); await runMigrations(db); return db; }