refactor: remove old database adapter files
This commit is contained in:
@@ -1,189 +0,0 @@
|
|||||||
import path from "node:path";
|
|
||||||
import Database from "better-sqlite3";
|
|
||||||
import { config } from "../config";
|
|
||||||
import { createChildLogger } from "../logger";
|
|
||||||
import * as postgres from "./postgres";
|
|
||||||
|
|
||||||
const logger = createChildLogger("db-adapter");
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Unified database adapter interface matching SQLite API
|
|
||||||
*/
|
|
||||||
export interface DatabaseStatement {
|
|
||||||
run: (...params: unknown[]) => { changes: number };
|
|
||||||
all: (...params: unknown[]) => unknown[];
|
|
||||||
get: (...params: unknown[]) => unknown;
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface DatabaseAdapter {
|
|
||||||
prepare: (sql: string) => DatabaseStatement;
|
|
||||||
exec: (sql: string) => void;
|
|
||||||
close: () => Promise<void>;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* PostgreSQL adapter implementing DatabaseAdapter interface
|
|
||||||
*/
|
|
||||||
class PostgresAdapter implements DatabaseAdapter {
|
|
||||||
prepare(sql: string): DatabaseStatement {
|
|
||||||
// Convert SQLite placeholders (?) to PostgreSQL placeholders ($1, $2, etc.)
|
|
||||||
const pgSql = this.convertPlaceholders(sql);
|
|
||||||
|
|
||||||
return {
|
|
||||||
run: (...params: unknown[]) => {
|
|
||||||
return this.runSync(pgSql, params);
|
|
||||||
},
|
|
||||||
all: (...params: unknown[]) => {
|
|
||||||
return this.allSync(pgSql, params);
|
|
||||||
},
|
|
||||||
get: (...params: unknown[]) => {
|
|
||||||
return this.getSync(pgSql, params);
|
|
||||||
},
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
exec(sql: string): void {
|
|
||||||
// For PostgreSQL, exec is typically used for schema creation
|
|
||||||
// We'll queue this for execution but note that exec() is synchronous in SQLite
|
|
||||||
// and async in PostgreSQL, so this is a limitation of the adapter
|
|
||||||
logger.warn(
|
|
||||||
"exec() called on PostgreSQL adapter - this is not truly synchronous. Use query() for schema operations.",
|
|
||||||
);
|
|
||||||
// In practice, schema operations should be handled separately via migrations
|
|
||||||
}
|
|
||||||
|
|
||||||
async close(): Promise<void> {
|
|
||||||
await postgres.closePool();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Convert SQLite ? placeholders to PostgreSQL $1, $2, etc.
|
|
||||||
*/
|
|
||||||
private convertPlaceholders(sql: string): string {
|
|
||||||
let paramIndex = 1;
|
|
||||||
return sql.replace(/\?/g, () => `$${paramIndex++}`);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Synchronous wrapper for run() - note: this is a limitation
|
|
||||||
* In production, async operations should be handled properly
|
|
||||||
*/
|
|
||||||
private runSync(sql: string, params: unknown[]): { changes: number } {
|
|
||||||
// This is a placeholder - actual implementation would need async handling
|
|
||||||
// For now, we'll throw an error to indicate this needs proper async handling
|
|
||||||
logger.error(
|
|
||||||
"runSync called on PostgreSQL adapter - this operation must be async",
|
|
||||||
);
|
|
||||||
throw new Error(
|
|
||||||
"PostgreSQL adapter requires async operations. Use query() directly instead of prepare().run()",
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Synchronous wrapper for all() - note: this is a limitation
|
|
||||||
*/
|
|
||||||
private allSync(sql: string, params: unknown[]): unknown[] {
|
|
||||||
logger.error(
|
|
||||||
"allSync called on PostgreSQL adapter - this operation must be async",
|
|
||||||
);
|
|
||||||
throw new Error(
|
|
||||||
"PostgreSQL adapter requires async operations. Use query() directly instead of prepare().all()",
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Synchronous wrapper for get() - note: this is a limitation
|
|
||||||
*/
|
|
||||||
private getSync(sql: string, params: unknown[]): unknown {
|
|
||||||
logger.error(
|
|
||||||
"getSync called on PostgreSQL adapter - this operation must be async",
|
|
||||||
);
|
|
||||||
throw new Error(
|
|
||||||
"PostgreSQL adapter requires async operations. Use query() directly instead of prepare().get()",
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* SQLite adapter wrapping better-sqlite3
|
|
||||||
*/
|
|
||||||
class SqliteAdapter implements DatabaseAdapter {
|
|
||||||
private db: Database.Database;
|
|
||||||
|
|
||||||
constructor(db: Database.Database) {
|
|
||||||
this.db = db;
|
|
||||||
}
|
|
||||||
|
|
||||||
prepare(sql: string): DatabaseStatement {
|
|
||||||
const stmt = this.db.prepare(sql);
|
|
||||||
return {
|
|
||||||
run: (...params: unknown[]) => stmt.run(...params),
|
|
||||||
all: (...params: unknown[]) => stmt.all(...params),
|
|
||||||
get: (...params: unknown[]) => stmt.get(...params),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
exec(sql: string): void {
|
|
||||||
this.db.exec(sql);
|
|
||||||
}
|
|
||||||
|
|
||||||
async close(): Promise<void> {
|
|
||||||
this.db.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// SQLite database instance (lazy initialized)
|
|
||||||
let sqliteDb: Database.Database | null = null;
|
|
||||||
|
|
||||||
function initializeSqliteDatabase(): Database.Database {
|
|
||||||
const dbPath = path.join(process.cwd(), ".muxer-queue.db");
|
|
||||||
return new Database(dbPath);
|
|
||||||
}
|
|
||||||
|
|
||||||
function getSqliteDatabase(): Database.Database {
|
|
||||||
if (!sqliteDb) {
|
|
||||||
sqliteDb = initializeSqliteDatabase();
|
|
||||||
}
|
|
||||||
return sqliteDb;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get database adapter based on configuration
|
|
||||||
* Returns appropriate adapter (PostgreSQL or SQLite)
|
|
||||||
*/
|
|
||||||
export async function getDatabase(): Promise<DatabaseAdapter> {
|
|
||||||
if (config.DATABASE_TYPE === "postgres") {
|
|
||||||
logger.info("Initializing PostgreSQL adapter");
|
|
||||||
const pool = postgres.getPool();
|
|
||||||
logger.info(
|
|
||||||
{
|
|
||||||
host: postgres.buildConfig().host,
|
|
||||||
port: postgres.buildConfig().port,
|
|
||||||
database: postgres.buildConfig().database,
|
|
||||||
},
|
|
||||||
"PostgreSQL connection pool initialized",
|
|
||||||
);
|
|
||||||
return new PostgresAdapter();
|
|
||||||
} else {
|
|
||||||
logger.info("Initializing SQLite adapter");
|
|
||||||
const db = getSqliteDatabase();
|
|
||||||
logger.info("SQLite database initialized");
|
|
||||||
return new SqliteAdapter(db);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get database adapter synchronously (for SQLite)
|
|
||||||
* Note: This only works for SQLite. PostgreSQL requires async initialization.
|
|
||||||
*/
|
|
||||||
export function getDatabaseSync(): DatabaseAdapter {
|
|
||||||
if (config.DATABASE_TYPE === "postgres") {
|
|
||||||
logger.warn(
|
|
||||||
"getDatabaseSync called with PostgreSQL - use getDatabase() instead for proper async handling",
|
|
||||||
);
|
|
||||||
return new PostgresAdapter();
|
|
||||||
} else {
|
|
||||||
const db = getSqliteDatabase();
|
|
||||||
return new SqliteAdapter(db);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,130 +0,0 @@
|
|||||||
import { createChildLogger } from "../logger";
|
|
||||||
import { query } from "./postgres";
|
|
||||||
|
|
||||||
const logger = createChildLogger("migrations");
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Run all database migrations to create schema
|
|
||||||
*/
|
|
||||||
export async function runMigrations(): Promise<void> {
|
|
||||||
logger.info("Starting database migrations");
|
|
||||||
|
|
||||||
try {
|
|
||||||
// Create muxer_jobs table
|
|
||||||
await query(`
|
|
||||||
CREATE TABLE IF NOT EXISTS muxer_jobs (
|
|
||||||
id TEXT PRIMARY KEY,
|
|
||||||
data TEXT NOT NULL,
|
|
||||||
status TEXT NOT NULL DEFAULT 'pending',
|
|
||||||
attempts INTEGER NOT NULL DEFAULT 0,
|
|
||||||
maxAttempts INTEGER NOT NULL DEFAULT 3,
|
|
||||||
createdAt BIGINT NOT NULL,
|
|
||||||
updatedAt BIGINT NOT NULL,
|
|
||||||
error TEXT
|
|
||||||
)
|
|
||||||
`);
|
|
||||||
logger.debug("Created muxer_jobs table");
|
|
||||||
|
|
||||||
await query(`
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_muxer_jobs_status ON muxer_jobs(status)
|
|
||||||
`);
|
|
||||||
await query(`
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_muxer_jobs_createdAt ON muxer_jobs(createdAt)
|
|
||||||
`);
|
|
||||||
logger.debug("Created muxer_jobs indexes");
|
|
||||||
|
|
||||||
// Create messages table
|
|
||||||
await query(`
|
|
||||||
CREATE TABLE IF NOT EXISTS messages (
|
|
||||||
id TEXT PRIMARY KEY,
|
|
||||||
guild_id TEXT NOT NULL,
|
|
||||||
channel_id TEXT NOT NULL,
|
|
||||||
thread_id TEXT,
|
|
||||||
user_id TEXT NOT NULL,
|
|
||||||
username TEXT NOT NULL,
|
|
||||||
avatar_url TEXT,
|
|
||||||
content TEXT NOT NULL,
|
|
||||||
edited_content TEXT,
|
|
||||||
created_at BIGINT NOT NULL,
|
|
||||||
edited_at BIGINT,
|
|
||||||
deleted_at BIGINT,
|
|
||||||
type TEXT NOT NULL DEFAULT 'text',
|
|
||||||
metadata TEXT,
|
|
||||||
ai_status TEXT NOT NULL DEFAULT 'pending',
|
|
||||||
ai_moderation_flags TEXT,
|
|
||||||
ai_moderation_score REAL,
|
|
||||||
ai_moderation_raw TEXT,
|
|
||||||
ai_analysis TEXT,
|
|
||||||
ai_analyzed_at BIGINT,
|
|
||||||
ai_error TEXT
|
|
||||||
)
|
|
||||||
`);
|
|
||||||
logger.debug("Created messages table");
|
|
||||||
|
|
||||||
await query(`
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_messages_channel ON messages(channel_id)
|
|
||||||
`);
|
|
||||||
await query(`
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_messages_user ON messages(user_id)
|
|
||||||
`);
|
|
||||||
await query(`
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_messages_created ON messages(created_at DESC)
|
|
||||||
`);
|
|
||||||
await query(`
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_messages_thread ON messages(thread_id)
|
|
||||||
`);
|
|
||||||
logger.debug("Created messages indexes");
|
|
||||||
|
|
||||||
// Create attachments table
|
|
||||||
await query(`
|
|
||||||
CREATE TABLE IF NOT EXISTS attachments (
|
|
||||||
id TEXT PRIMARY KEY,
|
|
||||||
message_id TEXT NOT NULL,
|
|
||||||
guild_id TEXT NOT NULL,
|
|
||||||
channel_id TEXT NOT NULL,
|
|
||||||
thread_id TEXT,
|
|
||||||
user_id TEXT NOT NULL,
|
|
||||||
filename TEXT NOT NULL,
|
|
||||||
size INTEGER NOT NULL,
|
|
||||||
type TEXT NOT NULL,
|
|
||||||
discord_url TEXT NOT NULL,
|
|
||||||
uploaded_url TEXT,
|
|
||||||
upload_status TEXT NOT NULL DEFAULT 'pending',
|
|
||||||
upload_error TEXT,
|
|
||||||
created_at BIGINT NOT NULL,
|
|
||||||
uploaded_at BIGINT,
|
|
||||||
FOREIGN KEY (message_id) REFERENCES messages(id) ON DELETE CASCADE
|
|
||||||
)
|
|
||||||
`);
|
|
||||||
logger.debug("Created attachments table");
|
|
||||||
|
|
||||||
await query(`
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_attachments_channel ON attachments(channel_id)
|
|
||||||
`);
|
|
||||||
await query(`
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_attachments_message ON attachments(message_id)
|
|
||||||
`);
|
|
||||||
await query(`
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_attachments_status ON attachments(upload_status)
|
|
||||||
`);
|
|
||||||
logger.debug("Created attachments indexes");
|
|
||||||
|
|
||||||
// Create ui_state table
|
|
||||||
await query(`
|
|
||||||
CREATE TABLE IF NOT EXISTS ui_state (
|
|
||||||
key TEXT PRIMARY KEY,
|
|
||||||
value TEXT NOT NULL,
|
|
||||||
updated_at BIGINT NOT NULL
|
|
||||||
)
|
|
||||||
`);
|
|
||||||
logger.debug("Created ui_state table");
|
|
||||||
|
|
||||||
logger.info("Database migrations completed successfully");
|
|
||||||
} catch (error) {
|
|
||||||
logger.error(
|
|
||||||
{ error: error instanceof Error ? error.message : String(error) },
|
|
||||||
"Database migrations failed",
|
|
||||||
);
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,171 +0,0 @@
|
|||||||
import { Pool, PoolClient, QueryResult, QueryResultRow } from "pg";
|
|
||||||
import { config } from "../config";
|
|
||||||
import { createChildLogger } from "../logger";
|
|
||||||
|
|
||||||
const logger = createChildLogger("postgres");
|
|
||||||
|
|
||||||
export interface PostgresConfig {
|
|
||||||
host: string;
|
|
||||||
port: number;
|
|
||||||
user?: string;
|
|
||||||
password?: string;
|
|
||||||
database?: string;
|
|
||||||
min: number;
|
|
||||||
max: number;
|
|
||||||
}
|
|
||||||
|
|
||||||
let pool: Pool | null = null;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Parse DATABASE_URL or build config from individual POSTGRES_* variables
|
|
||||||
*/
|
|
||||||
export function buildConfig(): PostgresConfig {
|
|
||||||
if (config.DATABASE_URL) {
|
|
||||||
try {
|
|
||||||
const url = new URL(config.DATABASE_URL);
|
|
||||||
return {
|
|
||||||
host: url.hostname || "localhost",
|
|
||||||
port: url.port ? parseInt(url.port, 10) : 5432,
|
|
||||||
user: url.username || undefined,
|
|
||||||
password: url.password || undefined,
|
|
||||||
database: url.pathname?.slice(1) || undefined,
|
|
||||||
min: config.POSTGRES_POOL_MIN,
|
|
||||||
max: config.POSTGRES_POOL_MAX,
|
|
||||||
};
|
|
||||||
} catch (error) {
|
|
||||||
logger.warn(
|
|
||||||
{ error: error instanceof Error ? error.message : String(error) },
|
|
||||||
"Failed to parse DATABASE_URL, falling back to individual variables",
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return {
|
|
||||||
host: config.POSTGRES_HOST,
|
|
||||||
port: config.POSTGRES_PORT,
|
|
||||||
user: config.POSTGRES_USER,
|
|
||||||
password: config.POSTGRES_PASSWORD,
|
|
||||||
database: config.POSTGRES_DB,
|
|
||||||
min: config.POSTGRES_POOL_MIN,
|
|
||||||
max: config.POSTGRES_POOL_MAX,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Initialize PostgreSQL connection pool
|
|
||||||
*/
|
|
||||||
export function initializePool(): Pool {
|
|
||||||
const pgConfig = buildConfig();
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
{
|
|
||||||
host: pgConfig.host,
|
|
||||||
port: pgConfig.port,
|
|
||||||
database: pgConfig.database,
|
|
||||||
min: pgConfig.min,
|
|
||||||
max: pgConfig.max,
|
|
||||||
},
|
|
||||||
"Initializing PostgreSQL connection pool",
|
|
||||||
);
|
|
||||||
|
|
||||||
const newPool = new Pool({
|
|
||||||
host: pgConfig.host,
|
|
||||||
port: pgConfig.port,
|
|
||||||
user: pgConfig.user,
|
|
||||||
password: pgConfig.password,
|
|
||||||
database: pgConfig.database,
|
|
||||||
min: pgConfig.min,
|
|
||||||
max: pgConfig.max,
|
|
||||||
idleTimeoutMillis: 30000,
|
|
||||||
connectionTimeoutMillis: 2000,
|
|
||||||
});
|
|
||||||
|
|
||||||
newPool.on("error", (error) => {
|
|
||||||
logger.error(
|
|
||||||
{ error: error instanceof Error ? error.message : String(error) },
|
|
||||||
"Unexpected error on idle client in pool",
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
newPool.on("connect", () => {
|
|
||||||
logger.debug("New client connected to pool");
|
|
||||||
});
|
|
||||||
|
|
||||||
newPool.on("remove", () => {
|
|
||||||
logger.debug("Client removed from pool");
|
|
||||||
});
|
|
||||||
|
|
||||||
return newPool;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get or initialize the connection pool
|
|
||||||
*/
|
|
||||||
export function getPool(): Pool {
|
|
||||||
if (!pool) {
|
|
||||||
pool = initializePool();
|
|
||||||
}
|
|
||||||
return pool;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Execute a query with type safety
|
|
||||||
*/
|
|
||||||
export async function query<T extends QueryResultRow = QueryResultRow>(
|
|
||||||
text: string,
|
|
||||||
values?: unknown[],
|
|
||||||
): Promise<QueryResult<T>> {
|
|
||||||
const client = await getPool().connect();
|
|
||||||
try {
|
|
||||||
logger.debug({ text, values: values?.length || 0 }, "Executing query");
|
|
||||||
return await client.query<T>(text, values);
|
|
||||||
} catch (error) {
|
|
||||||
logger.error(
|
|
||||||
{
|
|
||||||
text,
|
|
||||||
error: error instanceof Error ? error.message : String(error),
|
|
||||||
},
|
|
||||||
"Query execution failed",
|
|
||||||
);
|
|
||||||
throw error;
|
|
||||||
} finally {
|
|
||||||
client.release();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get a client from the pool for transaction or batch operations
|
|
||||||
*/
|
|
||||||
export async function getClient(): Promise<PoolClient> {
|
|
||||||
try {
|
|
||||||
const client = await getPool().connect();
|
|
||||||
logger.debug("Client acquired from pool");
|
|
||||||
return client;
|
|
||||||
} catch (error) {
|
|
||||||
logger.error(
|
|
||||||
{ error: error instanceof Error ? error.message : String(error) },
|
|
||||||
"Failed to acquire client from pool",
|
|
||||||
);
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Close the connection pool
|
|
||||||
*/
|
|
||||||
export async function closePool(): Promise<void> {
|
|
||||||
if (pool) {
|
|
||||||
try {
|
|
||||||
logger.info("Closing PostgreSQL connection pool");
|
|
||||||
await pool.end();
|
|
||||||
pool = null;
|
|
||||||
logger.info("PostgreSQL connection pool closed");
|
|
||||||
} catch (error) {
|
|
||||||
logger.error(
|
|
||||||
{ error: error instanceof Error ? error.message : String(error) },
|
|
||||||
"Error closing connection pool",
|
|
||||||
);
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user