diff --git a/src/moderation/aiAnalyzer.ts b/src/moderation/aiAnalyzer.ts index 06d290a..0b95acd 100644 --- a/src/moderation/aiAnalyzer.ts +++ b/src/moderation/aiAnalyzer.ts @@ -86,7 +86,7 @@ function parseLLMAnalysis(content: string): LLMAnalysis { }; } -async function runLLMAnalysis(text: string): Promise<{ result: LLMAnalysis; raw: unknown }> { +async function runLLMAnalysis(texts: string[]): Promise<{ results: LLMAnalysis[]; raw: unknown }> { const response = await retryWithBackoff( () => fetchJson(`${config.AI_LLM_BASE_URL}/chat/completions`, { method: "POST", @@ -99,52 +99,91 @@ async function runLLMAnalysis(text: string): Promise<{ result: LLMAnalysis; raw: messages: [ { role: "system", - content: "Kamu analis moderation Discord. Nilai pesan untuk toxic, harassment, hate, violence, sexual, self-harm, spam, scam, atau unsafe content. Balas JSON valid saja dengan schema: {\"status\":\"clean|flagged\",\"flags\":[\"...\"],\"score\":0..1,\"analysis\":\"ringkasan singkat Bahasa Indonesia + alasan + aksi disarankan\"}.", + content: "Kamu analis moderation Discord. Nilai setiap pesan untuk toxic, harassment, hate, violence, sexual, self-harm, spam, scam, atau unsafe content. Balas JSON array dengan schema: [{\"status\":\"clean|flagged\",\"flags\":[\"...\"],\"score\":0..1,\"analysis\":\"ringkasan singkat Bahasa Indonesia + alasan + aksi disarankan\"}]. Satu JSON object per pesan dalam array.", }, { role: "user", - content: text, + content: `Analisis ${texts.length} pesan berikut:\n${texts.map((t, i) => `${i + 1}. ${t}`).join("\n")}`, }, ], temperature: 0.2, }), + signal: AbortSignal.timeout(config.AI_ANALYSIS_TIMEOUT_MS), }), { retries: 2, logger }, ) as ChatCompletionResponse; const content = response.choices?.[0]?.message?.content?.trim() || ""; - return { result: parseLLMAnalysis(content), raw: response }; + + // Extract JSON array from response + const jsonStart = content.indexOf("["); + const jsonEnd = content.lastIndexOf("]"); + let results: LLMAnalysis[] = []; + + if (jsonStart >= 0 && jsonEnd > jsonStart) { + try { + const parsed = JSON.parse(content.substring(jsonStart, jsonEnd + 1)); + if (Array.isArray(parsed)) { + results = parsed.map((item: any) => ({ + status: item.status === "flagged" ? "flagged" : "clean", + flags: Array.isArray(item.flags) ? item.flags.map(String) : [], + score: Math.max(0, Math.min(1, Number(item.score) || 0)), + analysis: typeof item.analysis === "string" ? item.analysis : content, + })); + } + } catch { + // Fall through to individual parsing + } + } + + // If batch parsing failed, parse as individual responses + if (results.length === 0) { + results = texts.map(() => parseLLMAnalysis(content)); + } + + return { results, raw: response }; } -async function analyzeAndStore(db: SqliteDatabase, message: MessageRecord): Promise { - const text = getAnalysisText(message); - if (!config.AI_ANALYSIS_ENABLED || text.length === 0) return; +async function analyzeAndStoreBatch(db: SqliteDatabase, messages: MessageRecord[]): Promise { + if (messages.length === 0) return; + + const texts = messages.map(getAnalysisText).filter((t) => t.length > 0); + if (texts.length === 0) return; activeRequests++; try { - const { result, raw } = await runLLMAnalysis(text); - const row = updateMessageAIAnalysis(db, message.id, { - status: result.status, - flags: JSON.stringify(result.flags), - score: result.score, - raw: JSON.stringify(raw), - analysis: result.analysis, - analyzedAt: Date.now(), - error: null, - }); - if (row) (globalThis as any).broadcastMessageAnalyzed?.(row); + const { results, raw } = await runLLMAnalysis(texts); + + for (let i = 0; i < messages.length; i++) { + const message = messages[i]; + const result = results[i] || parseLLMAnalysis(""); + + const row = updateMessageAIAnalysis(db, message.id, { + status: result.status, + flags: JSON.stringify(result.flags), + score: result.score, + raw: JSON.stringify(raw), + analysis: result.analysis, + analyzedAt: Date.now(), + error: null, + }); + if (row) (globalThis as any).broadcastMessageAnalyzed?.(row); + } } catch (error) { - const row = updateMessageAIAnalysis(db, message.id, { - status: "error", - flags: null, - score: null, - raw: null, - analysis: null, - analyzedAt: Date.now(), - error: error instanceof Error ? error.message : String(error), - }); - if (row) (globalThis as any).broadcastMessageAnalyzed?.(row); - logger.warn({ messageId: message.id, error }, "AI analysis failed"); + const errorMsg = error instanceof Error ? error.message : String(error); + for (const message of messages) { + const row = updateMessageAIAnalysis(db, message.id, { + status: "error", + flags: null, + score: null, + raw: null, + analysis: null, + analyzedAt: Date.now(), + error: errorMsg, + }); + if (row) (globalThis as any).broadcastMessageAnalyzed?.(row); + } + logger.warn({ count: messages.length, error }, "AI batch analysis failed"); } finally { activeRequests--; } @@ -154,17 +193,26 @@ async function drainQueue(db: SqliteDatabase): Promise { if (isProcessing) return; isProcessing = true; try { + const BATCH_SIZE = 5; + while (queuedMessageIds.size > 0) { // Wait if at max concurrent requests while (activeRequests >= MAX_CONCURRENT_REQUESTS) { await new Promise((resolve) => setTimeout(resolve, 100)); } - const messageId = queuedMessageIds.values().next().value as string | undefined; - if (!messageId) break; - queuedMessageIds.delete(messageId); - const message = getMessageById(db, messageId); - if (message) await analyzeAndStore(db, message); + // Collect batch of messages + const batch: MessageRecord[] = []; + for (const messageId of queuedMessageIds) { + if (batch.length >= BATCH_SIZE) break; + queuedMessageIds.delete(messageId); + const message = getMessageById(db, messageId); + if (message) batch.push(message); + } + + if (batch.length > 0) { + await analyzeAndStoreBatch(db, batch); + } } } finally { isProcessing = false;