diff --git a/src/moderation/aiAnalyzer.ts b/src/moderation/aiAnalyzer.ts index 6c09115..dc580a2 100644 --- a/src/moderation/aiAnalyzer.ts +++ b/src/moderation/aiAnalyzer.ts @@ -10,6 +10,8 @@ const queuedMessageIds = new Set(); let isProcessing = false; let activeRequests = 0; const MAX_CONCURRENT_REQUESTS = 1; +const MAX_AI_REQUEST_TOKENS = 80_000; +const AI_PROMPT_TOKEN_RESERVE = 6_000; interface ChatCompletionResponse { choices?: Array<{ @@ -30,6 +32,20 @@ function getAnalysisText(message: MessageRecord): string { return (message.edited_content || message.content || "").trim(); } +function estimateTokens(text: string): number { + return Math.ceil(text.length / 4); +} + +function formatMessageForAnalysis(message: MessageRecord, index: number): string { + const text = getAnalysisText(message); + const time = new Date(message.created_at).toISOString(); + return `${index + 1}. id=${message.id} time=${time} user=${message.username}: ${text}`; +} + +function estimateMessageTokens(message: MessageRecord): number { + return estimateTokens(formatMessageForAnalysis(message, 0)) + 16; +} + async function fetchJson(url: string, init: RequestInit): Promise { const controller = new AbortController(); const timeout = setTimeout(() => controller.abort(), config.AI_ANALYSIS_TIMEOUT_MS); @@ -86,7 +102,7 @@ function parseLLMAnalysis(content: string): LLMAnalysis { }; } -async function runLLMAnalysis(texts: string[]): Promise<{ results: LLMAnalysis[]; raw: unknown }> { +async function runLLMAnalysis(messages: MessageRecord[]): Promise<{ results: LLMAnalysis[]; raw: unknown }> { const response = await retryWithBackoff( () => fetchJson(`${config.AI_LLM_BASE_URL}/chat/completions`, { method: "POST", @@ -139,6 +155,8 @@ ATURAN KOMUNITAS LENGKAP: KONTEKS KOMUNITAS: - Ini grup bercanda/santai, jadi slang, candaan ringan, kata kasar ringan tanpa target, pesan pendek seperti "." atau "P", dan pertanyaan tidak jelas tetap CLEAN - Jangan beri WARN hanya karena pesan singkat, informal, ambigu, low-quality, atau kurang konteks +- Pahami alur pembahasan antar pesan: pesan yang sendiri terlihat normal bisa WARN/FLAGGED jika dalam konteks percakapan sedang memancing konflik, menormalisasi pelanggaran, atau melanjutkan provokasi +- Jangan menghukum orang yang sedang menasehati, menjelaskan bahaya, mengutip, atau menolak tindakan buruk; nilai maksud dan konteksnya - WARN hanya jika ada orang/kelompok yang diserang, dihina, diprovokasi, atau konflik mulai dipancing PENENTUAN STATUS: @@ -150,7 +168,7 @@ Satu JSON object per pesan dalam array.`, }, { role: "user", - content: `Analisis ${texts.length} pesan berikut:\n${texts.map((t, i) => `${i + 1}. ${t}`).join("\n")}`, + content: `Analisis ${messages.length} pesan berikut sebagai satu alur percakapan. Tetap kembalikan satu hasil per pesan dengan urutan yang sama:\n${messages.map(formatMessageForAnalysis).join("\n")}`, }, ], temperature: 0.2, @@ -188,7 +206,7 @@ Satu JSON object per pesan dalam array.`, // If batch parsing failed, parse as individual responses if (results.length === 0) { - results = texts.map(() => parseLLMAnalysis(content)); + results = messages.map(() => parseLLMAnalysis(content)); } return { results, raw: response }; @@ -197,15 +215,15 @@ Satu JSON object per pesan dalam array.`, async function analyzeAndStoreBatch(db: SqliteDatabase, messages: MessageRecord[]): Promise { if (messages.length === 0) return; - const texts = messages.map(getAnalysisText).filter((t) => t.length > 0); - if (texts.length === 0) return; + const analyzableMessages = messages.filter((message) => getAnalysisText(message).length > 0); + if (analyzableMessages.length === 0) return; activeRequests++; try { - const { results, raw } = await runLLMAnalysis(texts); + const { results, raw } = await runLLMAnalysis(analyzableMessages); - for (let i = 0; i < messages.length; i++) { - const message = messages[i]; + for (let i = 0; i < analyzableMessages.length; i++) { + const message = analyzableMessages[i]; const result = results[i] || parseLLMAnalysis(""); const row = updateMessageAIAnalysis(db, message.id, { @@ -221,7 +239,7 @@ async function analyzeAndStoreBatch(db: SqliteDatabase, messages: MessageRecord[ } } catch (error) { const errorMsg = error instanceof Error ? error.message : String(error); - for (const message of messages) { + for (const message of analyzableMessages) { const row = updateMessageAIAnalysis(db, message.id, { status: "error", flags: null, @@ -243,24 +261,32 @@ async function drainQueue(db: SqliteDatabase): Promise { if (isProcessing) return; isProcessing = true; try { - const BATCH_SIZE = 5; + const batchTokenLimit = MAX_AI_REQUEST_TOKENS - AI_PROMPT_TOKEN_RESERVE; while (queuedMessageIds.size > 0) { - // Wait if at max concurrent requests while (activeRequests >= MAX_CONCURRENT_REQUESTS) { await new Promise((resolve) => setTimeout(resolve, 100)); } - // Collect batch of messages const batch: MessageRecord[] = []; - for (const messageId of queuedMessageIds) { - if (batch.length >= BATCH_SIZE) break; - queuedMessageIds.delete(messageId); + let tokenEstimate = 0; + for (const messageId of Array.from(queuedMessageIds)) { const message = getMessageById(db, messageId); - if (message) batch.push(message); + queuedMessageIds.delete(messageId); + if (!message) continue; + + const messageTokens = estimateMessageTokens(message); + if (batch.length > 0 && tokenEstimate + messageTokens > batchTokenLimit) { + queuedMessageIds.add(messageId); + break; + } + + batch.push(message); + tokenEstimate += messageTokens; } if (batch.length > 0) { + logger.info({ count: batch.length, tokenEstimate }, "Processing AI analysis batch"); await analyzeAndStoreBatch(db, batch); } } @@ -287,7 +313,7 @@ export function startPendingAIAnalysisWorker(db: SqliteDatabase): void { logger.info("AI analysis worker started"); setInterval(() => { if (isProcessing) return; - const pendingMessages = getPendingAIAnalysisMessages(db, 3); + const pendingMessages = getPendingAIAnalysisMessages(db, 500); if (pendingMessages.length === 0) return; logger.info({ count: pendingMessages.length }, "Queueing pending AI analysis messages"); for (const message of pendingMessages) {