feat: batch AI analysis messages for faster processing
- Change runLLMAnalysis to accept array of texts instead of single text - Batch up to 5 messages per AI request instead of 1 message per request - drainQueue now collects batch before sending to AI API - Reduces API calls by 5x and speeds up analysis significantly - System prompt updated to handle batch JSON array responses This resolves: - Slow AI analysis (3 messages every 15 seconds) - Too many API calls (one per message) - Long queue backlog Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -86,7 +86,7 @@ function parseLLMAnalysis(content: string): LLMAnalysis {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
async function runLLMAnalysis(text: string): Promise<{ result: LLMAnalysis; raw: unknown }> {
|
async function runLLMAnalysis(texts: string[]): Promise<{ results: LLMAnalysis[]; raw: unknown }> {
|
||||||
const response = await retryWithBackoff(
|
const response = await retryWithBackoff(
|
||||||
() => fetchJson(`${config.AI_LLM_BASE_URL}/chat/completions`, {
|
() => fetchJson(`${config.AI_LLM_BASE_URL}/chat/completions`, {
|
||||||
method: "POST",
|
method: "POST",
|
||||||
@@ -99,30 +99,65 @@ async function runLLMAnalysis(text: string): Promise<{ result: LLMAnalysis; raw:
|
|||||||
messages: [
|
messages: [
|
||||||
{
|
{
|
||||||
role: "system",
|
role: "system",
|
||||||
content: "Kamu analis moderation Discord. Nilai pesan untuk toxic, harassment, hate, violence, sexual, self-harm, spam, scam, atau unsafe content. Balas JSON valid saja dengan schema: {\"status\":\"clean|flagged\",\"flags\":[\"...\"],\"score\":0..1,\"analysis\":\"ringkasan singkat Bahasa Indonesia + alasan + aksi disarankan\"}.",
|
content: "Kamu analis moderation Discord. Nilai setiap pesan untuk toxic, harassment, hate, violence, sexual, self-harm, spam, scam, atau unsafe content. Balas JSON array dengan schema: [{\"status\":\"clean|flagged\",\"flags\":[\"...\"],\"score\":0..1,\"analysis\":\"ringkasan singkat Bahasa Indonesia + alasan + aksi disarankan\"}]. Satu JSON object per pesan dalam array.",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
role: "user",
|
role: "user",
|
||||||
content: text,
|
content: `Analisis ${texts.length} pesan berikut:\n${texts.map((t, i) => `${i + 1}. ${t}`).join("\n")}`,
|
||||||
},
|
},
|
||||||
],
|
],
|
||||||
temperature: 0.2,
|
temperature: 0.2,
|
||||||
}),
|
}),
|
||||||
|
signal: AbortSignal.timeout(config.AI_ANALYSIS_TIMEOUT_MS),
|
||||||
}),
|
}),
|
||||||
{ retries: 2, logger },
|
{ retries: 2, logger },
|
||||||
) as ChatCompletionResponse;
|
) as ChatCompletionResponse;
|
||||||
|
|
||||||
const content = response.choices?.[0]?.message?.content?.trim() || "";
|
const content = response.choices?.[0]?.message?.content?.trim() || "";
|
||||||
return { result: parseLLMAnalysis(content), raw: response };
|
|
||||||
|
// Extract JSON array from response
|
||||||
|
const jsonStart = content.indexOf("[");
|
||||||
|
const jsonEnd = content.lastIndexOf("]");
|
||||||
|
let results: LLMAnalysis[] = [];
|
||||||
|
|
||||||
|
if (jsonStart >= 0 && jsonEnd > jsonStart) {
|
||||||
|
try {
|
||||||
|
const parsed = JSON.parse(content.substring(jsonStart, jsonEnd + 1));
|
||||||
|
if (Array.isArray(parsed)) {
|
||||||
|
results = parsed.map((item: any) => ({
|
||||||
|
status: item.status === "flagged" ? "flagged" : "clean",
|
||||||
|
flags: Array.isArray(item.flags) ? item.flags.map(String) : [],
|
||||||
|
score: Math.max(0, Math.min(1, Number(item.score) || 0)),
|
||||||
|
analysis: typeof item.analysis === "string" ? item.analysis : content,
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// Fall through to individual parsing
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If batch parsing failed, parse as individual responses
|
||||||
|
if (results.length === 0) {
|
||||||
|
results = texts.map(() => parseLLMAnalysis(content));
|
||||||
|
}
|
||||||
|
|
||||||
|
return { results, raw: response };
|
||||||
}
|
}
|
||||||
|
|
||||||
async function analyzeAndStore(db: SqliteDatabase, message: MessageRecord): Promise<void> {
|
async function analyzeAndStoreBatch(db: SqliteDatabase, messages: MessageRecord[]): Promise<void> {
|
||||||
const text = getAnalysisText(message);
|
if (messages.length === 0) return;
|
||||||
if (!config.AI_ANALYSIS_ENABLED || text.length === 0) return;
|
|
||||||
|
const texts = messages.map(getAnalysisText).filter((t) => t.length > 0);
|
||||||
|
if (texts.length === 0) return;
|
||||||
|
|
||||||
activeRequests++;
|
activeRequests++;
|
||||||
try {
|
try {
|
||||||
const { result, raw } = await runLLMAnalysis(text);
|
const { results, raw } = await runLLMAnalysis(texts);
|
||||||
|
|
||||||
|
for (let i = 0; i < messages.length; i++) {
|
||||||
|
const message = messages[i];
|
||||||
|
const result = results[i] || parseLLMAnalysis("");
|
||||||
|
|
||||||
const row = updateMessageAIAnalysis(db, message.id, {
|
const row = updateMessageAIAnalysis(db, message.id, {
|
||||||
status: result.status,
|
status: result.status,
|
||||||
flags: JSON.stringify(result.flags),
|
flags: JSON.stringify(result.flags),
|
||||||
@@ -133,7 +168,10 @@ async function analyzeAndStore(db: SqliteDatabase, message: MessageRecord): Prom
|
|||||||
error: null,
|
error: null,
|
||||||
});
|
});
|
||||||
if (row) (globalThis as any).broadcastMessageAnalyzed?.(row);
|
if (row) (globalThis as any).broadcastMessageAnalyzed?.(row);
|
||||||
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
|
const errorMsg = error instanceof Error ? error.message : String(error);
|
||||||
|
for (const message of messages) {
|
||||||
const row = updateMessageAIAnalysis(db, message.id, {
|
const row = updateMessageAIAnalysis(db, message.id, {
|
||||||
status: "error",
|
status: "error",
|
||||||
flags: null,
|
flags: null,
|
||||||
@@ -141,10 +179,11 @@ async function analyzeAndStore(db: SqliteDatabase, message: MessageRecord): Prom
|
|||||||
raw: null,
|
raw: null,
|
||||||
analysis: null,
|
analysis: null,
|
||||||
analyzedAt: Date.now(),
|
analyzedAt: Date.now(),
|
||||||
error: error instanceof Error ? error.message : String(error),
|
error: errorMsg,
|
||||||
});
|
});
|
||||||
if (row) (globalThis as any).broadcastMessageAnalyzed?.(row);
|
if (row) (globalThis as any).broadcastMessageAnalyzed?.(row);
|
||||||
logger.warn({ messageId: message.id, error }, "AI analysis failed");
|
}
|
||||||
|
logger.warn({ count: messages.length, error }, "AI batch analysis failed");
|
||||||
} finally {
|
} finally {
|
||||||
activeRequests--;
|
activeRequests--;
|
||||||
}
|
}
|
||||||
@@ -154,17 +193,26 @@ async function drainQueue(db: SqliteDatabase): Promise<void> {
|
|||||||
if (isProcessing) return;
|
if (isProcessing) return;
|
||||||
isProcessing = true;
|
isProcessing = true;
|
||||||
try {
|
try {
|
||||||
|
const BATCH_SIZE = 5;
|
||||||
|
|
||||||
while (queuedMessageIds.size > 0) {
|
while (queuedMessageIds.size > 0) {
|
||||||
// Wait if at max concurrent requests
|
// Wait if at max concurrent requests
|
||||||
while (activeRequests >= MAX_CONCURRENT_REQUESTS) {
|
while (activeRequests >= MAX_CONCURRENT_REQUESTS) {
|
||||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||||
}
|
}
|
||||||
|
|
||||||
const messageId = queuedMessageIds.values().next().value as string | undefined;
|
// Collect batch of messages
|
||||||
if (!messageId) break;
|
const batch: MessageRecord[] = [];
|
||||||
|
for (const messageId of queuedMessageIds) {
|
||||||
|
if (batch.length >= BATCH_SIZE) break;
|
||||||
queuedMessageIds.delete(messageId);
|
queuedMessageIds.delete(messageId);
|
||||||
const message = getMessageById(db, messageId);
|
const message = getMessageById(db, messageId);
|
||||||
if (message) await analyzeAndStore(db, message);
|
if (message) batch.push(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (batch.length > 0) {
|
||||||
|
await analyzeAndStoreBatch(db, batch);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
isProcessing = false;
|
isProcessing = false;
|
||||||
|
|||||||
Reference in New Issue
Block a user