refactor: migrate muxer-queue to drizzle-orm
This commit is contained in:
@@ -1,13 +1,12 @@
|
||||
import {
|
||||
DatabaseAdapter,
|
||||
getDatabase as getDatabaseAdapter,
|
||||
} from "./database/adapter";
|
||||
import { getDatabase as getDrizzleDatabase, initializeDatabase } from "./database/drizzle";
|
||||
import { muxerJobsTable, uiStateTable } from "./database/schema";
|
||||
import { eq, asc, lt, and, sql } from "drizzle-orm";
|
||||
import { createChildLogger } from "./logger";
|
||||
|
||||
const logger = createChildLogger("muxer-queue");
|
||||
|
||||
// Export DatabaseAdapter as SqliteDatabase for backward compatibility
|
||||
export type SqliteDatabase = DatabaseAdapter;
|
||||
// Type alias for backward compatibility
|
||||
export type SqliteDatabase = any;
|
||||
|
||||
export interface MuxerJobData {
|
||||
userId: string;
|
||||
@@ -27,130 +26,28 @@ interface StoredJob {
|
||||
error?: string;
|
||||
}
|
||||
|
||||
let dbAdapter: DatabaseAdapter | null = null;
|
||||
|
||||
async function initializeDatabase(): Promise<DatabaseAdapter> {
|
||||
const adapter = await getDatabaseAdapter();
|
||||
|
||||
adapter.exec(`
|
||||
PRAGMA journal_mode = WAL;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS muxer_jobs (
|
||||
id TEXT PRIMARY KEY,
|
||||
data TEXT NOT NULL,
|
||||
status TEXT NOT NULL DEFAULT 'pending',
|
||||
attempts INTEGER NOT NULL DEFAULT 0,
|
||||
maxAttempts INTEGER NOT NULL DEFAULT 3,
|
||||
createdAt INTEGER NOT NULL,
|
||||
updatedAt INTEGER NOT NULL,
|
||||
error TEXT
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_status ON muxer_jobs(status);
|
||||
CREATE INDEX IF NOT EXISTS idx_createdAt ON muxer_jobs(createdAt);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS messages (
|
||||
id TEXT PRIMARY KEY,
|
||||
guild_id TEXT NOT NULL,
|
||||
channel_id TEXT NOT NULL,
|
||||
thread_id TEXT,
|
||||
user_id TEXT NOT NULL,
|
||||
username TEXT NOT NULL,
|
||||
avatar_url TEXT,
|
||||
content TEXT NOT NULL,
|
||||
edited_content TEXT,
|
||||
created_at INTEGER NOT NULL,
|
||||
edited_at INTEGER,
|
||||
deleted_at INTEGER,
|
||||
type TEXT NOT NULL DEFAULT 'text',
|
||||
metadata TEXT,
|
||||
ai_status TEXT NOT NULL DEFAULT 'pending',
|
||||
ai_moderation_flags TEXT,
|
||||
ai_moderation_score REAL,
|
||||
ai_moderation_raw TEXT,
|
||||
ai_analysis TEXT,
|
||||
ai_analyzed_at INTEGER,
|
||||
ai_error TEXT
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_channel ON messages(channel_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_user ON messages(user_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_created ON messages(created_at DESC);
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_thread ON messages(thread_id);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS attachments (
|
||||
id TEXT PRIMARY KEY,
|
||||
message_id TEXT NOT NULL,
|
||||
guild_id TEXT NOT NULL,
|
||||
channel_id TEXT NOT NULL,
|
||||
thread_id TEXT,
|
||||
user_id TEXT NOT NULL,
|
||||
filename TEXT NOT NULL,
|
||||
size INTEGER NOT NULL,
|
||||
type TEXT NOT NULL,
|
||||
discord_url TEXT NOT NULL,
|
||||
uploaded_url TEXT,
|
||||
upload_status TEXT NOT NULL DEFAULT 'pending',
|
||||
upload_error TEXT,
|
||||
created_at INTEGER NOT NULL,
|
||||
uploaded_at INTEGER,
|
||||
FOREIGN KEY (message_id) REFERENCES messages(id)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_attachments_channel ON attachments(channel_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_attachments_message ON attachments(message_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_attachments_status ON attachments(upload_status);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS ui_state (
|
||||
key TEXT PRIMARY KEY,
|
||||
value TEXT NOT NULL,
|
||||
updated_at INTEGER NOT NULL
|
||||
);
|
||||
`);
|
||||
|
||||
const migrations = [
|
||||
"ALTER TABLE attachments ADD COLUMN thread_id TEXT",
|
||||
"ALTER TABLE messages ADD COLUMN ai_status TEXT NOT NULL DEFAULT 'pending'",
|
||||
"ALTER TABLE messages ADD COLUMN ai_moderation_flags TEXT",
|
||||
"ALTER TABLE messages ADD COLUMN ai_moderation_score REAL",
|
||||
"ALTER TABLE messages ADD COLUMN ai_moderation_raw TEXT",
|
||||
"ALTER TABLE messages ADD COLUMN ai_analysis TEXT",
|
||||
"ALTER TABLE messages ADD COLUMN ai_analyzed_at INTEGER",
|
||||
"ALTER TABLE messages ADD COLUMN ai_error TEXT",
|
||||
];
|
||||
|
||||
for (const migration of migrations) {
|
||||
try {
|
||||
adapter.exec(migration);
|
||||
} catch {
|
||||
// Column already exists on databases initialized after schema updates.
|
||||
}
|
||||
}
|
||||
|
||||
return adapter;
|
||||
// Export getDatabase for backward compatibility with webserver.ts
|
||||
export function getDatabase(): SqliteDatabase {
|
||||
return getDrizzleDatabase() as any;
|
||||
}
|
||||
|
||||
async function getDatabaseAdapterInternal(): Promise<DatabaseAdapter> {
|
||||
if (!dbAdapter) {
|
||||
dbAdapter = await initializeDatabase();
|
||||
}
|
||||
return dbAdapter;
|
||||
}
|
||||
|
||||
// Export as getDatabase for backward compatibility
|
||||
export const getDatabase = getDatabaseAdapterInternal;
|
||||
|
||||
export async function getPersistedValue<T>(
|
||||
key: string,
|
||||
fallback: T,
|
||||
): Promise<T> {
|
||||
const adapter = await getDatabaseAdapterInternal();
|
||||
const row = adapter
|
||||
.prepare("SELECT value FROM ui_state WHERE key = ?")
|
||||
.get(key) as { value: string } | undefined;
|
||||
if (!row) return fallback;
|
||||
await initializeDatabase();
|
||||
const db = getDrizzleDatabase() as any;
|
||||
|
||||
const row = await db
|
||||
.select()
|
||||
.from(uiStateTable)
|
||||
.where(eq(uiStateTable.key, key))
|
||||
.limit(1);
|
||||
|
||||
if (!row || row.length === 0) return fallback;
|
||||
|
||||
try {
|
||||
return JSON.parse(row.value) as T;
|
||||
return JSON.parse(row[0].value) as T;
|
||||
} catch {
|
||||
return fallback;
|
||||
}
|
||||
@@ -160,28 +57,45 @@ export async function setPersistedValue(
|
||||
key: string,
|
||||
value: unknown,
|
||||
): Promise<void> {
|
||||
const adapter = await getDatabaseAdapterInternal();
|
||||
adapter
|
||||
.prepare(`
|
||||
INSERT INTO ui_state (key, value, updated_at)
|
||||
VALUES (?, ?, ?)
|
||||
ON CONFLICT(key) DO UPDATE SET value = excluded.value, updated_at = excluded.updated_at
|
||||
`)
|
||||
.run(key, JSON.stringify(value), Date.now());
|
||||
await initializeDatabase();
|
||||
const db = getDrizzleDatabase() as any;
|
||||
|
||||
await db
|
||||
.insert(uiStateTable)
|
||||
.values({
|
||||
key,
|
||||
value: JSON.stringify(value),
|
||||
updated_at: Date.now(),
|
||||
})
|
||||
.onConflictDoUpdate({
|
||||
target: uiStateTable.key,
|
||||
set: {
|
||||
value: JSON.stringify(value),
|
||||
updated_at: Date.now(),
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
export async function enqueueMuxerJob(data: MuxerJobData): Promise<string> {
|
||||
try {
|
||||
const adapter = await getDatabaseAdapterInternal();
|
||||
await initializeDatabase();
|
||||
const db = getDrizzleDatabase() as any;
|
||||
|
||||
const jobId = `${data.userId}-${data.sessionId}`;
|
||||
const now = Date.now();
|
||||
|
||||
const stmt = adapter.prepare(`
|
||||
INSERT INTO muxer_jobs (id, data, status, attempts, maxAttempts, createdAt, updatedAt)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
`);
|
||||
|
||||
stmt.run(jobId, JSON.stringify(data), "pending", 0, 3, now, now);
|
||||
await db
|
||||
.insert(muxerJobsTable)
|
||||
.values({
|
||||
id: jobId,
|
||||
data: JSON.stringify(data),
|
||||
status: "pending",
|
||||
attempts: 0,
|
||||
maxAttempts: 3,
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
})
|
||||
.onConflictDoNothing();
|
||||
|
||||
logger.info(
|
||||
{ jobId, userId: data.userId, sessionId: data.sessionId },
|
||||
@@ -202,29 +116,25 @@ export async function enqueueMuxerJob(data: MuxerJobData): Promise<string> {
|
||||
}
|
||||
|
||||
export async function getPendingJobs(): Promise<StoredJob[]> {
|
||||
const adapter = await getDatabaseAdapterInternal();
|
||||
const stmt = adapter.prepare(`
|
||||
SELECT id, data, status, attempts, maxAttempts, createdAt, updatedAt, error
|
||||
FROM muxer_jobs
|
||||
WHERE status = 'pending'
|
||||
ORDER BY createdAt ASC
|
||||
LIMIT 10
|
||||
`);
|
||||
await initializeDatabase();
|
||||
const db = getDrizzleDatabase() as any;
|
||||
|
||||
const rows = stmt.all() as Array<{
|
||||
id: string;
|
||||
data: string;
|
||||
status: string;
|
||||
attempts: number;
|
||||
maxAttempts: number;
|
||||
createdAt: number;
|
||||
updatedAt: number;
|
||||
error?: string;
|
||||
}>;
|
||||
const rows = await db
|
||||
.select()
|
||||
.from(muxerJobsTable)
|
||||
.where(eq(muxerJobsTable.status, "pending"))
|
||||
.orderBy(asc(muxerJobsTable.createdAt))
|
||||
.limit(10);
|
||||
|
||||
return rows.map((row) => ({
|
||||
...row,
|
||||
return rows.map((row: any) => ({
|
||||
id: row.id,
|
||||
data: row.data,
|
||||
status: row.status as "pending" | "processing" | "completed" | "failed",
|
||||
attempts: row.attempts,
|
||||
maxAttempts: row.maxAttempts,
|
||||
createdAt: row.createdAt,
|
||||
updatedAt: row.updatedAt,
|
||||
error: row.error || undefined,
|
||||
}));
|
||||
}
|
||||
|
||||
@@ -233,34 +143,44 @@ export async function updateJobStatus(
|
||||
status: "processing" | "completed" | "failed",
|
||||
error?: string,
|
||||
): Promise<void> {
|
||||
const adapter = await getDatabaseAdapterInternal();
|
||||
await initializeDatabase();
|
||||
const db = getDrizzleDatabase() as any;
|
||||
const now = Date.now();
|
||||
|
||||
if (status === "failed") {
|
||||
const stmt = adapter.prepare(`
|
||||
UPDATE muxer_jobs
|
||||
SET status = ?, attempts = attempts + 1, updatedAt = ?, error = ?
|
||||
WHERE id = ?
|
||||
`);
|
||||
stmt.run(status, now, error || null, jobId);
|
||||
await db
|
||||
.update(muxerJobsTable)
|
||||
.set({
|
||||
status,
|
||||
attempts: sql`${muxerJobsTable.attempts} + 1`,
|
||||
updatedAt: now,
|
||||
error: error || null,
|
||||
})
|
||||
.where(eq(muxerJobsTable.id, jobId));
|
||||
} else {
|
||||
const stmt = adapter.prepare(`
|
||||
UPDATE muxer_jobs
|
||||
SET status = ?, updatedAt = ?
|
||||
WHERE id = ?
|
||||
`);
|
||||
stmt.run(status, now, jobId);
|
||||
await db
|
||||
.update(muxerJobsTable)
|
||||
.set({
|
||||
status,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(eq(muxerJobsTable.id, jobId));
|
||||
}
|
||||
|
||||
logger.info({ jobId, status, error }, "Job status updated");
|
||||
}
|
||||
|
||||
export async function retryFailedJob(jobId: string): Promise<boolean> {
|
||||
const adapter = await getDatabaseAdapterInternal();
|
||||
await initializeDatabase();
|
||||
const db = getDrizzleDatabase() as any;
|
||||
|
||||
const job = adapter
|
||||
.prepare("SELECT * FROM muxer_jobs WHERE id = ?")
|
||||
.get(jobId) as StoredJob | undefined;
|
||||
const jobs = await db
|
||||
.select()
|
||||
.from(muxerJobsTable)
|
||||
.where(eq(muxerJobsTable.id, jobId))
|
||||
.limit(1);
|
||||
|
||||
const job = jobs[0];
|
||||
|
||||
if (!job) {
|
||||
logger.warn({ jobId }, "Job not found");
|
||||
@@ -275,13 +195,14 @@ export async function retryFailedJob(jobId: string): Promise<boolean> {
|
||||
return false;
|
||||
}
|
||||
|
||||
const stmt = adapter.prepare(`
|
||||
UPDATE muxer_jobs
|
||||
SET status = 'pending', updatedAt = ?
|
||||
WHERE id = ?
|
||||
`);
|
||||
await db
|
||||
.update(muxerJobsTable)
|
||||
.set({
|
||||
status: "pending",
|
||||
updatedAt: Date.now(),
|
||||
})
|
||||
.where(eq(muxerJobsTable.id, jobId));
|
||||
|
||||
stmt.run(Date.now(), jobId);
|
||||
logger.info({ jobId, attempt: job.attempts + 1 }, "Job retried");
|
||||
|
||||
return true;
|
||||
@@ -290,18 +211,26 @@ export async function retryFailedJob(jobId: string): Promise<boolean> {
|
||||
export async function cleanupCompletedJobs(
|
||||
olderThanMs: number = 24 * 60 * 60 * 1000,
|
||||
): Promise<number> {
|
||||
const adapter = await getDatabaseAdapterInternal();
|
||||
await initializeDatabase();
|
||||
const db = getDrizzleDatabase() as any;
|
||||
const cutoffTime = Date.now() - olderThanMs;
|
||||
|
||||
const stmt = adapter.prepare(`
|
||||
DELETE FROM muxer_jobs
|
||||
WHERE status = 'completed' AND updatedAt < ?
|
||||
`);
|
||||
const result = await db
|
||||
.delete(muxerJobsTable)
|
||||
.where(
|
||||
and(
|
||||
eq(muxerJobsTable.status, "completed"),
|
||||
lt(muxerJobsTable.updatedAt, cutoffTime),
|
||||
),
|
||||
);
|
||||
|
||||
const result = stmt.run(cutoffTime);
|
||||
logger.info({ deletedCount: result.changes }, "Cleaned up completed jobs");
|
||||
const deletedCount = typeof result === "object" && "rowsAffected" in result
|
||||
? result.rowsAffected
|
||||
: 0;
|
||||
|
||||
return result.changes;
|
||||
logger.info({ deletedCount }, "Cleaned up completed jobs");
|
||||
|
||||
return deletedCount;
|
||||
}
|
||||
|
||||
export async function getJobStats(): Promise<{
|
||||
@@ -310,36 +239,37 @@ export async function getJobStats(): Promise<{
|
||||
completed: number;
|
||||
failed: number;
|
||||
}> {
|
||||
const adapter = await getDatabaseAdapterInternal();
|
||||
await initializeDatabase();
|
||||
const db = getDrizzleDatabase() as any;
|
||||
|
||||
const stats = adapter
|
||||
.prepare(`
|
||||
SELECT
|
||||
SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending,
|
||||
SUM(CASE WHEN status = 'processing' THEN 1 ELSE 0 END) as processing,
|
||||
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,
|
||||
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed
|
||||
FROM muxer_jobs
|
||||
`)
|
||||
.get() as {
|
||||
pending: number | null;
|
||||
processing: number | null;
|
||||
completed: number | null;
|
||||
failed: number | null;
|
||||
const rows = await db
|
||||
.select({
|
||||
status: muxerJobsTable.status,
|
||||
count: sql<number>`COUNT(*)`,
|
||||
})
|
||||
.from(muxerJobsTable)
|
||||
.groupBy(muxerJobsTable.status);
|
||||
|
||||
const stats = {
|
||||
pending: 0,
|
||||
processing: 0,
|
||||
completed: 0,
|
||||
failed: 0,
|
||||
};
|
||||
|
||||
return {
|
||||
pending: stats.pending || 0,
|
||||
processing: stats.processing || 0,
|
||||
completed: stats.completed || 0,
|
||||
failed: stats.failed || 0,
|
||||
};
|
||||
for (const row of rows) {
|
||||
const count = typeof row.count === "object" && "count" in row.count
|
||||
? (row.count as any).count
|
||||
: Number(row.count);
|
||||
if (row.status === "pending") stats.pending = count;
|
||||
else if (row.status === "processing") stats.processing = count;
|
||||
else if (row.status === "completed") stats.completed = count;
|
||||
else if (row.status === "failed") stats.failed = count;
|
||||
}
|
||||
|
||||
return stats;
|
||||
}
|
||||
|
||||
export async function closeQueue(): Promise<void> {
|
||||
if (dbAdapter) {
|
||||
await dbAdapter.close();
|
||||
dbAdapter = null;
|
||||
logger.info("Muxer queue closed");
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user