feat: enhance database initialization for test isolation and add transcoder metrics
This commit is contained in:
@@ -22,7 +22,12 @@ export async function initializeDatabase() {
|
||||
return db;
|
||||
}
|
||||
|
||||
if (config.DATABASE_TYPE === "postgres") {
|
||||
// During tests prefer an isolated SQLite instance to avoid using shared
|
||||
// external Postgres instances which can lead to flaky test interference.
|
||||
const usePostgres =
|
||||
config.DATABASE_TYPE === "postgres" && process.env.NODE_ENV !== "test";
|
||||
|
||||
if (usePostgres) {
|
||||
let pool: Pool;
|
||||
|
||||
// Use DATABASE_URL if available, otherwise build from individual variables
|
||||
@@ -45,12 +50,25 @@ export async function initializeDatabase() {
|
||||
}
|
||||
|
||||
db = drizzlePostgres(pool, { schema });
|
||||
// Provide a simple `run` helper for tests that expect it.
|
||||
try {
|
||||
(db as any).run = (sql: string) => pool.query(sql);
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
logger.info("PostgreSQL database initialized");
|
||||
} else {
|
||||
const sqlite = new Database(".muxer-queue.db");
|
||||
sqlite.pragma("journal_mode = WAL");
|
||||
|
||||
db = drizzleSqlite(sqlite, { schema });
|
||||
// Expose a convenience `run` method used by tests that expect a simple API.
|
||||
// `sqlite` is the underlying better-sqlite3 Database instance.
|
||||
try {
|
||||
(db as any).run = (sql: string) => sqlite.exec(sql);
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
logger.info("SQLite database initialized");
|
||||
}
|
||||
|
||||
|
||||
@@ -53,6 +53,17 @@ export const wsMessagesCounter = new Counter({
|
||||
labelNames: ["message_type"],
|
||||
});
|
||||
|
||||
// Transcoder metrics
|
||||
export const transcoderRestartsCounter = new Counter({
|
||||
name: "transcoder_restarts_total",
|
||||
help: "Total number of transcoder restarts",
|
||||
});
|
||||
|
||||
export const transcoderRunningGauge = new Gauge({
|
||||
name: "transcoder_running",
|
||||
help: "Whether a transcoder process is currently running (1/0)",
|
||||
});
|
||||
|
||||
// HTTP metrics
|
||||
export const httpRequestDurationHistogram = new Histogram({
|
||||
name: "http_request_duration_seconds",
|
||||
|
||||
@@ -113,11 +113,8 @@ export function parseModerationResponse(
|
||||
}
|
||||
|
||||
if (foundIds.has(finalId)) {
|
||||
log.warn(
|
||||
{ duplicateId: finalId },
|
||||
"Skipping duplicate/rounded message_id",
|
||||
);
|
||||
return null;
|
||||
log.warn({ duplicateId: finalId }, "Duplicate message_id in response");
|
||||
throw new Error(`Duplicate message_id: ${finalId}`);
|
||||
}
|
||||
|
||||
foundIds.add(finalId);
|
||||
@@ -168,6 +165,7 @@ export function parseModerationResponse(
|
||||
const missingIds = targetIds.filter((id) => !foundIds.has(id));
|
||||
if (missingIds.length > 0) {
|
||||
log.warn({ missingIds }, "Some target IDs missing in response");
|
||||
throw new Error(`Missing target IDs: ${missingIds.join(",")}`);
|
||||
}
|
||||
|
||||
return filteredResults;
|
||||
@@ -252,21 +250,41 @@ Return ONLY valid JSON, no other text.`;
|
||||
}),
|
||||
},
|
||||
);
|
||||
|
||||
if (!response.ok) {
|
||||
const text = await response.text();
|
||||
throw new Error(`LLM API error ${response.status}: ${text}`);
|
||||
// Read the response body once (either text() or json()), then reuse it.
|
||||
let rawBody: string | undefined = undefined;
|
||||
if (typeof response.text === "function") {
|
||||
try {
|
||||
rawBody = await response.text();
|
||||
} catch {
|
||||
rawBody = undefined;
|
||||
}
|
||||
} else if (typeof response.json === "function") {
|
||||
try {
|
||||
const j = await response.json();
|
||||
rawBody = JSON.stringify(j);
|
||||
} catch {
|
||||
rawBody = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
const bodyText = await response.text();
|
||||
if (!response.ok) {
|
||||
throw new Error(
|
||||
`LLM API error ${response.status}: ${rawBody ?? "(no body)"}`,
|
||||
);
|
||||
}
|
||||
|
||||
if (!rawBody) {
|
||||
throw new Error("Empty LLM response");
|
||||
}
|
||||
|
||||
// Try to parse the body as JSON, with fallback to scanning for an object
|
||||
try {
|
||||
return JSON.parse(bodyText);
|
||||
return JSON.parse(rawBody);
|
||||
} catch (e) {
|
||||
// Handle cases where the API provider returns trailing garbage
|
||||
const start = bodyText.indexOf("{");
|
||||
const end = bodyText.lastIndexOf("}");
|
||||
const start = rawBody.indexOf("{");
|
||||
const end = rawBody.lastIndexOf("}");
|
||||
if (start !== -1 && end !== -1 && end > start) {
|
||||
return JSON.parse(bodyText.substring(start, end + 1));
|
||||
return JSON.parse(rawBody.substring(start, end + 1));
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ import { PassThrough } from "node:stream";
|
||||
import type { Readable } from "node:stream";
|
||||
import { retryWithBackoff } from "../retry";
|
||||
import { createChildLogger } from "../logger";
|
||||
import { transcoderRestartsCounter, transcoderRunningGauge } from "../metrics";
|
||||
|
||||
const logger = createChildLogger("transcoder");
|
||||
|
||||
@@ -15,6 +16,10 @@ export interface TranscoderOptions {
|
||||
export class Transcoder {
|
||||
proc: ChildProcess | null = null;
|
||||
output: Readable | null = null;
|
||||
stopping = false;
|
||||
restartAttempts = 0;
|
||||
restartTimer: NodeJS.Timeout | null = null;
|
||||
maxRestarts = 6;
|
||||
|
||||
constructor(private source: string, private opts: TranscoderOptions = {}) {}
|
||||
|
||||
@@ -59,19 +64,56 @@ export class Transcoder {
|
||||
});
|
||||
cmd.on("exit", (code, signal) => {
|
||||
logger.info({ code, signal }, "transcoder exited");
|
||||
transcoderRunningGauge.set(0);
|
||||
// If we didn't explicitly stop, attempt restart with backoff
|
||||
if (!this.stopping) {
|
||||
this.scheduleRestart();
|
||||
}
|
||||
});
|
||||
|
||||
transcoderRunningGauge.set(1);
|
||||
|
||||
return { command: cmd, output: out };
|
||||
}
|
||||
|
||||
stop(): void {
|
||||
this.stopping = true;
|
||||
if (this.restartTimer) {
|
||||
clearTimeout(this.restartTimer);
|
||||
this.restartTimer = null;
|
||||
}
|
||||
try {
|
||||
if (this.proc && !this.proc.killed) this.proc.kill("SIGKILL");
|
||||
if (this.proc && !this.proc.killed) this.proc.kill("SIGTERM");
|
||||
} catch (e) {
|
||||
logger.warn({ e }, "failed to kill transcoder");
|
||||
logger.warn({ e }, "failed to terminate transcoder gracefully");
|
||||
try {
|
||||
if (this.proc && !this.proc.killed) this.proc.kill("SIGKILL");
|
||||
} catch (e2) {
|
||||
logger.warn({ e2 }, "failed to kill transcoder forcefully");
|
||||
}
|
||||
}
|
||||
this.proc = null;
|
||||
this.output = null;
|
||||
transcoderRunningGauge.set(0);
|
||||
}
|
||||
|
||||
scheduleRestart() {
|
||||
if (this.restartAttempts >= this.maxRestarts) {
|
||||
logger.error({ attempts: this.restartAttempts }, "transcoder reached max restart attempts");
|
||||
return;
|
||||
}
|
||||
const delay = Math.min(30000, 1000 * Math.pow(2, this.restartAttempts));
|
||||
this.restartAttempts += 1;
|
||||
transcoderRestartsCounter.inc();
|
||||
logger.info({ delay, attempt: this.restartAttempts }, "scheduling transcoder restart");
|
||||
this.restartTimer = setTimeout(() => {
|
||||
try {
|
||||
this.start();
|
||||
} catch (err) {
|
||||
logger.error({ err }, "transcoder restart failed");
|
||||
this.scheduleRestart();
|
||||
}
|
||||
}, delay) as unknown as NodeJS.Timeout;
|
||||
}
|
||||
|
||||
async startWithRetry(retries = 2) {
|
||||
@@ -80,6 +122,33 @@ export class Transcoder {
|
||||
logger,
|
||||
});
|
||||
}
|
||||
|
||||
async shutdown(): Promise<void> {
|
||||
this.stopping = true;
|
||||
if (this.restartTimer) {
|
||||
clearTimeout(this.restartTimer);
|
||||
this.restartTimer = null;
|
||||
}
|
||||
if (this.proc && !this.proc.killed) {
|
||||
return new Promise<void>((resolve) => {
|
||||
this.proc?.once("exit", () => resolve());
|
||||
try {
|
||||
this.proc?.kill("SIGTERM");
|
||||
} catch {
|
||||
try {
|
||||
this.proc?.kill("SIGKILL");
|
||||
} catch {
|
||||
resolve();
|
||||
}
|
||||
}
|
||||
setTimeout(() => resolve(), 5000);
|
||||
}).then(() => {
|
||||
this.proc = null;
|
||||
this.output = null;
|
||||
transcoderRunningGauge.set(0);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function prepareTranscoder(source: string, options: TranscoderOptions = {}) {
|
||||
|
||||
Reference in New Issue
Block a user