diff --git a/src/muxer-queue.ts b/src/muxer-queue.ts index 5c61f32..320ef8b 100644 --- a/src/muxer-queue.ts +++ b/src/muxer-queue.ts @@ -1,13 +1,12 @@ -import { - DatabaseAdapter, - getDatabase as getDatabaseAdapter, -} from "./database/adapter"; +import { getDatabase as getDrizzleDatabase, initializeDatabase } from "./database/drizzle"; +import { muxerJobsTable, uiStateTable } from "./database/schema"; +import { eq, asc, lt, and, sql } from "drizzle-orm"; import { createChildLogger } from "./logger"; const logger = createChildLogger("muxer-queue"); -// Export DatabaseAdapter as SqliteDatabase for backward compatibility -export type SqliteDatabase = DatabaseAdapter; +// Type alias for backward compatibility +export type SqliteDatabase = any; export interface MuxerJobData { userId: string; @@ -27,130 +26,28 @@ interface StoredJob { error?: string; } -let dbAdapter: DatabaseAdapter | null = null; - -async function initializeDatabase(): Promise { - const adapter = await getDatabaseAdapter(); - - adapter.exec(` - PRAGMA journal_mode = WAL; - - CREATE TABLE IF NOT EXISTS muxer_jobs ( - id TEXT PRIMARY KEY, - data TEXT NOT NULL, - status TEXT NOT NULL DEFAULT 'pending', - attempts INTEGER NOT NULL DEFAULT 0, - maxAttempts INTEGER NOT NULL DEFAULT 3, - createdAt INTEGER NOT NULL, - updatedAt INTEGER NOT NULL, - error TEXT - ); - - CREATE INDEX IF NOT EXISTS idx_status ON muxer_jobs(status); - CREATE INDEX IF NOT EXISTS idx_createdAt ON muxer_jobs(createdAt); - - CREATE TABLE IF NOT EXISTS messages ( - id TEXT PRIMARY KEY, - guild_id TEXT NOT NULL, - channel_id TEXT NOT NULL, - thread_id TEXT, - user_id TEXT NOT NULL, - username TEXT NOT NULL, - avatar_url TEXT, - content TEXT NOT NULL, - edited_content TEXT, - created_at INTEGER NOT NULL, - edited_at INTEGER, - deleted_at INTEGER, - type TEXT NOT NULL DEFAULT 'text', - metadata TEXT, - ai_status TEXT NOT NULL DEFAULT 'pending', - ai_moderation_flags TEXT, - ai_moderation_score REAL, - ai_moderation_raw TEXT, - ai_analysis TEXT, - ai_analyzed_at INTEGER, - ai_error TEXT - ); - - CREATE INDEX IF NOT EXISTS idx_messages_channel ON messages(channel_id); - CREATE INDEX IF NOT EXISTS idx_messages_user ON messages(user_id); - CREATE INDEX IF NOT EXISTS idx_messages_created ON messages(created_at DESC); - CREATE INDEX IF NOT EXISTS idx_messages_thread ON messages(thread_id); - - CREATE TABLE IF NOT EXISTS attachments ( - id TEXT PRIMARY KEY, - message_id TEXT NOT NULL, - guild_id TEXT NOT NULL, - channel_id TEXT NOT NULL, - thread_id TEXT, - user_id TEXT NOT NULL, - filename TEXT NOT NULL, - size INTEGER NOT NULL, - type TEXT NOT NULL, - discord_url TEXT NOT NULL, - uploaded_url TEXT, - upload_status TEXT NOT NULL DEFAULT 'pending', - upload_error TEXT, - created_at INTEGER NOT NULL, - uploaded_at INTEGER, - FOREIGN KEY (message_id) REFERENCES messages(id) - ); - - CREATE INDEX IF NOT EXISTS idx_attachments_channel ON attachments(channel_id); - CREATE INDEX IF NOT EXISTS idx_attachments_message ON attachments(message_id); - CREATE INDEX IF NOT EXISTS idx_attachments_status ON attachments(upload_status); - - CREATE TABLE IF NOT EXISTS ui_state ( - key TEXT PRIMARY KEY, - value TEXT NOT NULL, - updated_at INTEGER NOT NULL - ); - `); - - const migrations = [ - "ALTER TABLE attachments ADD COLUMN thread_id TEXT", - "ALTER TABLE messages ADD COLUMN ai_status TEXT NOT NULL DEFAULT 'pending'", - "ALTER TABLE messages ADD COLUMN ai_moderation_flags TEXT", - "ALTER TABLE messages ADD COLUMN ai_moderation_score REAL", - "ALTER TABLE messages ADD COLUMN ai_moderation_raw TEXT", - "ALTER TABLE messages ADD COLUMN ai_analysis TEXT", - "ALTER TABLE messages ADD COLUMN ai_analyzed_at INTEGER", - "ALTER TABLE messages ADD COLUMN ai_error TEXT", - ]; - - for (const migration of migrations) { - try { - adapter.exec(migration); - } catch { - // Column already exists on databases initialized after schema updates. - } - } - - return adapter; +// Export getDatabase for backward compatibility with webserver.ts +export function getDatabase(): SqliteDatabase { + return getDrizzleDatabase() as any; } -async function getDatabaseAdapterInternal(): Promise { - if (!dbAdapter) { - dbAdapter = await initializeDatabase(); - } - return dbAdapter; -} - -// Export as getDatabase for backward compatibility -export const getDatabase = getDatabaseAdapterInternal; - export async function getPersistedValue( key: string, fallback: T, ): Promise { - const adapter = await getDatabaseAdapterInternal(); - const row = adapter - .prepare("SELECT value FROM ui_state WHERE key = ?") - .get(key) as { value: string } | undefined; - if (!row) return fallback; + await initializeDatabase(); + const db = getDrizzleDatabase() as any; + + const row = await db + .select() + .from(uiStateTable) + .where(eq(uiStateTable.key, key)) + .limit(1); + + if (!row || row.length === 0) return fallback; + try { - return JSON.parse(row.value) as T; + return JSON.parse(row[0].value) as T; } catch { return fallback; } @@ -160,28 +57,45 @@ export async function setPersistedValue( key: string, value: unknown, ): Promise { - const adapter = await getDatabaseAdapterInternal(); - adapter - .prepare(` - INSERT INTO ui_state (key, value, updated_at) - VALUES (?, ?, ?) - ON CONFLICT(key) DO UPDATE SET value = excluded.value, updated_at = excluded.updated_at - `) - .run(key, JSON.stringify(value), Date.now()); + await initializeDatabase(); + const db = getDrizzleDatabase() as any; + + await db + .insert(uiStateTable) + .values({ + key, + value: JSON.stringify(value), + updated_at: Date.now(), + }) + .onConflictDoUpdate({ + target: uiStateTable.key, + set: { + value: JSON.stringify(value), + updated_at: Date.now(), + }, + }); } export async function enqueueMuxerJob(data: MuxerJobData): Promise { try { - const adapter = await getDatabaseAdapterInternal(); + await initializeDatabase(); + const db = getDrizzleDatabase() as any; + const jobId = `${data.userId}-${data.sessionId}`; const now = Date.now(); - const stmt = adapter.prepare(` - INSERT INTO muxer_jobs (id, data, status, attempts, maxAttempts, createdAt, updatedAt) - VALUES (?, ?, ?, ?, ?, ?, ?) - `); - - stmt.run(jobId, JSON.stringify(data), "pending", 0, 3, now, now); + await db + .insert(muxerJobsTable) + .values({ + id: jobId, + data: JSON.stringify(data), + status: "pending", + attempts: 0, + maxAttempts: 3, + createdAt: now, + updatedAt: now, + }) + .onConflictDoNothing(); logger.info( { jobId, userId: data.userId, sessionId: data.sessionId }, @@ -202,29 +116,25 @@ export async function enqueueMuxerJob(data: MuxerJobData): Promise { } export async function getPendingJobs(): Promise { - const adapter = await getDatabaseAdapterInternal(); - const stmt = adapter.prepare(` - SELECT id, data, status, attempts, maxAttempts, createdAt, updatedAt, error - FROM muxer_jobs - WHERE status = 'pending' - ORDER BY createdAt ASC - LIMIT 10 - `); + await initializeDatabase(); + const db = getDrizzleDatabase() as any; - const rows = stmt.all() as Array<{ - id: string; - data: string; - status: string; - attempts: number; - maxAttempts: number; - createdAt: number; - updatedAt: number; - error?: string; - }>; + const rows = await db + .select() + .from(muxerJobsTable) + .where(eq(muxerJobsTable.status, "pending")) + .orderBy(asc(muxerJobsTable.createdAt)) + .limit(10); - return rows.map((row) => ({ - ...row, + return rows.map((row: any) => ({ + id: row.id, + data: row.data, status: row.status as "pending" | "processing" | "completed" | "failed", + attempts: row.attempts, + maxAttempts: row.maxAttempts, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + error: row.error || undefined, })); } @@ -233,34 +143,44 @@ export async function updateJobStatus( status: "processing" | "completed" | "failed", error?: string, ): Promise { - const adapter = await getDatabaseAdapterInternal(); + await initializeDatabase(); + const db = getDrizzleDatabase() as any; const now = Date.now(); if (status === "failed") { - const stmt = adapter.prepare(` - UPDATE muxer_jobs - SET status = ?, attempts = attempts + 1, updatedAt = ?, error = ? - WHERE id = ? - `); - stmt.run(status, now, error || null, jobId); + await db + .update(muxerJobsTable) + .set({ + status, + attempts: sql`${muxerJobsTable.attempts} + 1`, + updatedAt: now, + error: error || null, + }) + .where(eq(muxerJobsTable.id, jobId)); } else { - const stmt = adapter.prepare(` - UPDATE muxer_jobs - SET status = ?, updatedAt = ? - WHERE id = ? - `); - stmt.run(status, now, jobId); + await db + .update(muxerJobsTable) + .set({ + status, + updatedAt: now, + }) + .where(eq(muxerJobsTable.id, jobId)); } logger.info({ jobId, status, error }, "Job status updated"); } export async function retryFailedJob(jobId: string): Promise { - const adapter = await getDatabaseAdapterInternal(); + await initializeDatabase(); + const db = getDrizzleDatabase() as any; - const job = adapter - .prepare("SELECT * FROM muxer_jobs WHERE id = ?") - .get(jobId) as StoredJob | undefined; + const jobs = await db + .select() + .from(muxerJobsTable) + .where(eq(muxerJobsTable.id, jobId)) + .limit(1); + + const job = jobs[0]; if (!job) { logger.warn({ jobId }, "Job not found"); @@ -275,13 +195,14 @@ export async function retryFailedJob(jobId: string): Promise { return false; } - const stmt = adapter.prepare(` - UPDATE muxer_jobs - SET status = 'pending', updatedAt = ? - WHERE id = ? - `); + await db + .update(muxerJobsTable) + .set({ + status: "pending", + updatedAt: Date.now(), + }) + .where(eq(muxerJobsTable.id, jobId)); - stmt.run(Date.now(), jobId); logger.info({ jobId, attempt: job.attempts + 1 }, "Job retried"); return true; @@ -290,18 +211,26 @@ export async function retryFailedJob(jobId: string): Promise { export async function cleanupCompletedJobs( olderThanMs: number = 24 * 60 * 60 * 1000, ): Promise { - const adapter = await getDatabaseAdapterInternal(); + await initializeDatabase(); + const db = getDrizzleDatabase() as any; const cutoffTime = Date.now() - olderThanMs; - const stmt = adapter.prepare(` - DELETE FROM muxer_jobs - WHERE status = 'completed' AND updatedAt < ? - `); + const result = await db + .delete(muxerJobsTable) + .where( + and( + eq(muxerJobsTable.status, "completed"), + lt(muxerJobsTable.updatedAt, cutoffTime), + ), + ); - const result = stmt.run(cutoffTime); - logger.info({ deletedCount: result.changes }, "Cleaned up completed jobs"); + const deletedCount = typeof result === "object" && "rowsAffected" in result + ? result.rowsAffected + : 0; - return result.changes; + logger.info({ deletedCount }, "Cleaned up completed jobs"); + + return deletedCount; } export async function getJobStats(): Promise<{ @@ -310,36 +239,37 @@ export async function getJobStats(): Promise<{ completed: number; failed: number; }> { - const adapter = await getDatabaseAdapterInternal(); + await initializeDatabase(); + const db = getDrizzleDatabase() as any; - const stats = adapter - .prepare(` - SELECT - SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending, - SUM(CASE WHEN status = 'processing' THEN 1 ELSE 0 END) as processing, - SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed, - SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed - FROM muxer_jobs - `) - .get() as { - pending: number | null; - processing: number | null; - completed: number | null; - failed: number | null; + const rows = await db + .select({ + status: muxerJobsTable.status, + count: sql`COUNT(*)`, + }) + .from(muxerJobsTable) + .groupBy(muxerJobsTable.status); + + const stats = { + pending: 0, + processing: 0, + completed: 0, + failed: 0, }; - return { - pending: stats.pending || 0, - processing: stats.processing || 0, - completed: stats.completed || 0, - failed: stats.failed || 0, - }; + for (const row of rows) { + const count = typeof row.count === "object" && "count" in row.count + ? (row.count as any).count + : Number(row.count); + if (row.status === "pending") stats.pending = count; + else if (row.status === "processing") stats.processing = count; + else if (row.status === "completed") stats.completed = count; + else if (row.status === "failed") stats.failed = count; + } + + return stats; } export async function closeQueue(): Promise { - if (dbAdapter) { - await dbAdapter.close(); - dbAdapter = null; - logger.info("Muxer queue closed"); - } + logger.info("Muxer queue closed"); }